From 6aa5b7059598774cfbb00a2938f209090732844d Mon Sep 17 00:00:00 2001 From: Juan Picado Date: Thu, 25 Jun 2026 20:53:27 +0200 Subject: [PATCH] feat(pnpr): multiple-uplink fallback ordering per package (#12648) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Let a `proxy:` rule name an ordered list of uplinks (`proxy: npmjs private`, or the YAML sequence form) and walk it as a fallback chain, matching verdaccio's serial first-success semantics: try uplinks in declared order, stop at the first that answers, and fall through to the next on a 404 or an availability failure. A later private uplink can host a package the primary 404s. - config: PackageAccess.proxy becomes an ordered AccessSpec list; Config::resolve_uplinks returns the chain (skipping unknown names); resolve_uplink stays as a primary-only convenience. - packument: walk the chain, sending each uplink only its own cached validators. The cache holds one shared body, so its validators are scoped to that body's origin uplink (the validator map is replaced, not merged, on every write) — a conditional GET only ever reaches the origin, so a 304 can only confirm the bytes actually on disk and can't revalidate another origin's body. The freshness window (maxage) comes from the primary uplink only. - tarball: try each uplink in order; integrity binds the streamed bytes, so the serving uplink decides whether they are mirrored (caches()) while the cache read is gated on whether any uplink caches. - fallthrough is restricted to availability failures (RegistryError::allows_uplink_fallthrough): transport errors, an open circuit, and upstream 5xx. Any authoritative 4xx (401/403 auth, 429 throttle, 400/410, ...) stops the walk immediately so a later uplink can't mask the primary's rejection of a scoped or rate-limited request. Out of scope (left to a later change): verdaccio's full cross-uplink metadata merge and per-version origin tracking. pnpr stays first-success fallback with one cache entry per package. Part of pnpm/pnpm#11973. --- pnpr/crates/pnpr/src/config.rs | 78 +++++-- pnpr/crates/pnpr/src/config/tests.rs | 100 +++++++++ pnpr/crates/pnpr/src/error.rs | 21 ++ pnpr/crates/pnpr/src/error/tests.rs | 33 +++ pnpr/crates/pnpr/src/lib.rs | 8 +- pnpr/crates/pnpr/src/server.rs | 294 +++++++++++++++++++------- pnpr/crates/pnpr/src/storage.rs | 48 +++-- pnpr/crates/pnpr/src/storage/tests.rs | 76 ++++++- pnpr/crates/pnpr/src/upstream.rs | 55 +++++ pnpr/crates/pnpr/tests/server.rs | 256 +++++++++++++++++++++- 10 files changed, 837 insertions(+), 132 deletions(-) diff --git a/pnpr/crates/pnpr/src/config.rs b/pnpr/crates/pnpr/src/config.rs index 3f1c138347..ee8c4ffded 100644 --- a/pnpr/crates/pnpr/src/config.rs +++ b/pnpr/crates/pnpr/src/config.rs @@ -761,19 +761,20 @@ fn non_empty_token(token: &str) -> Option { /// `unpublish` are verdaccio permission lists (built-in groups like /// `$all` / `$authenticated` / `$anonymous`, plus usernames / group /// names), compiled into the [`PackagePolicies`] that gate reads and -/// writes. `proxy` selects the [`UplinkConfig`] by name. +/// writes. `proxy` names the uplinks to fall back through, in order +/// (`proxy: npmjs private`); see [`Config::resolve_uplinks`]. #[derive(Debug, Default, Clone, Deserialize)] pub struct PackageAccess { pub access: Option, pub publish: Option, pub unpublish: Option, - pub proxy: Option, + pub proxy: Option, } -/// A YAML permission value. Verdaccio accepts either a single -/// space-separated string (`access: $authenticated admin`) or a -/// sequence (`access: [$authenticated, admin]`); both normalize to the -/// same token list. +/// A YAML string-or-list value. Verdaccio accepts either a single +/// space-separated string (`access: $authenticated admin`, +/// `proxy: npmjs private`) or a sequence (`access: [$authenticated, +/// admin]`); both normalize to the same ordered token list. #[derive(Debug, Clone, Deserialize)] #[serde(untagged)] pub enum AccessSpec { @@ -792,6 +793,19 @@ impl AccessSpec { } } } + + /// The tokens in declared order, each element flattened on whitespace + /// (so `[a b, c]` and `[a, b, c]` agree). Unlike [`Self::to_access_list`] + /// — which builds an unordered permission *set* — this preserves order, + /// which `proxy:` relies on for its fallback chain. + fn to_ordered_tokens(&self) -> Vec<&str> { + match self { + AccessSpec::One(spec) => spec.split_whitespace().collect(), + AccessSpec::Many(items) => { + items.iter().flat_map(|item| item.split_whitespace()).collect() + } + } + } } /// Disk shape of the YAML file. Fields verdaccio supports but @@ -961,7 +975,10 @@ impl Config { let mut packages = IndexMap::new(); packages.insert( "**".to_string(), - PackageAccess { proxy: Some("npmjs".to_string()), ..Default::default() }, + PackageAccess { + proxy: Some(AccessSpec::One("npmjs".to_string())), + ..Default::default() + }, ); Self { listen, @@ -1270,20 +1287,43 @@ impl Config { } } - /// Find the uplink for `package_name` by walking [`Self::packages`] - /// in declared order: the first pattern that matches is the rule - /// that applies. If that rule has no `proxy:`, the package is - /// storage-only and this returns `None` — matching verdaccio's - /// first-match-wins semantics. The returned tuple's first element - /// is the uplink *name* (the key in [`Self::uplinks`]); callers - /// that have pre-built per-uplink state can use it as an index. + /// Find the uplink fallback chain for `package_name` by walking + /// [`Self::packages`] in declared order: the first pattern that matches + /// is the rule that applies (verdaccio's first-match-wins). That rule's + /// `proxy:` names the uplinks to try, **in order** — the registry walks + /// the returned list as a fallback chain (try the first, fall through to + /// the next on failure or 404). A proxy name absent from [`Self::uplinks`] + /// is silently skipped (as verdaccio does). An empty result means the + /// package is storage-only. + /// + /// Each tuple's first element is the uplink *name* (the key in + /// [`Self::uplinks`]), so callers with pre-built per-uplink state can use + /// it as an index. + #[must_use] + pub fn resolve_uplinks(&self, package_name: &str) -> Vec<(&str, &UplinkConfig)> { + let Some(access) = self + .packages + .iter() + .find_map(|(pattern, access)| pattern_matches(pattern, package_name).then_some(access)) + else { + return Vec::new(); + }; + let Some(proxy) = &access.proxy else { + return Vec::new(); + }; + proxy + .to_ordered_tokens() + .into_iter() + .filter_map(|name| self.uplinks.get_key_value(name).map(|(k, v)| (k.as_str(), v))) + .collect() + } + + /// The primary (first) uplink for `package_name`, or `None` when the + /// package is storage-only. A convenience over [`Self::resolve_uplinks`] + /// for the single-uplink question. #[must_use] pub fn resolve_uplink(&self, package_name: &str) -> Option<(&str, &UplinkConfig)> { - let access = self.packages.iter().find_map(|(pattern, access)| { - pattern_matches(pattern, package_name).then_some(access) - })?; - let proxy_name = access.proxy.as_deref()?; - self.uplinks.get_key_value(proxy_name).map(|(k, v)| (k.as_str(), v)) + self.resolve_uplinks(package_name).into_iter().next() } } diff --git a/pnpr/crates/pnpr/src/config/tests.rs b/pnpr/crates/pnpr/src/config/tests.rs index e478a11490..a3e44597b6 100644 --- a/pnpr/crates/pnpr/src/config/tests.rs +++ b/pnpr/crates/pnpr/src/config/tests.rs @@ -868,6 +868,106 @@ packages: assert_eq!(config.resolve_uplink("lodash").unwrap().0, "npmjs"); } +/// `proxy:` as a space-separated string lists the uplinks as an ordered +/// fallback chain (verdaccio's `proxy: npmjs private` shape). +#[test] +fn from_yaml_str_proxy_string_lists_uplinks_in_order() { + let yaml = "\ +storage: ./s +uplinks: + npmjs: { url: https://registry.npmjs.org/ } + private: { url: https://private.example/ } +packages: + '**': + proxy: npmjs private +"; + let config = Config::from_yaml_str(yaml, Path::new("/x"), listen(), None).unwrap(); + let names: Vec<_> = config.resolve_uplinks("anything").into_iter().map(|(n, _)| n).collect(); + assert_eq!(names, ["npmjs", "private"]); + // The convenience singular accessor returns the primary (first). + assert_eq!(config.resolve_uplink("anything").unwrap().0, "npmjs"); +} + +/// `proxy:` as a YAML sequence parses to the same ordered chain as the +/// space-separated string form. +#[test] +fn from_yaml_str_proxy_sequence_lists_uplinks_in_order() { + let yaml = "\ +storage: ./s +uplinks: + npmjs: { url: https://registry.npmjs.org/ } + private: { url: https://private.example/ } +packages: + '**': + proxy: [private, npmjs] +"; + let config = Config::from_yaml_str(yaml, Path::new("/x"), listen(), None).unwrap(); + let names: Vec<_> = config.resolve_uplinks("anything").into_iter().map(|(n, _)| n).collect(); + assert_eq!(names, ["private", "npmjs"]); +} + +/// A proxy name with no matching `uplinks:` entry is silently skipped +/// (verdaccio ignores unknown proxy names), and the rest of the chain is +/// preserved in order. +#[test] +fn from_yaml_str_proxy_skips_unknown_uplink_names() { + let yaml = "\ +storage: ./s +uplinks: + npmjs: { url: https://registry.npmjs.org/ } +packages: + '**': + proxy: ghost npmjs +"; + let config = Config::from_yaml_str(yaml, Path::new("/x"), listen(), None).unwrap(); + let names: Vec<_> = config.resolve_uplinks("anything").into_iter().map(|(n, _)| n).collect(); + assert_eq!(names, ["npmjs"]); +} + +/// First-match-wins still selects a single rule, then expands *that* rule's +/// proxy list — a later catch-all's uplinks don't get appended. +#[test] +fn from_yaml_str_resolve_uplinks_expands_only_the_matched_rule() { + let yaml = "\ +storage: ./s +uplinks: + mirror: { url: https://mirror.example/ } + npmjs: { url: https://registry.npmjs.org/ } + private: { url: https://private.example/ } +packages: + '@private/*': + proxy: private mirror + '**': + proxy: npmjs +"; + let config = Config::from_yaml_str(yaml, Path::new("/x"), listen(), None).unwrap(); + let scoped: Vec<_> = + config.resolve_uplinks("@private/foo").into_iter().map(|(n, _)| n).collect(); + assert_eq!(scoped, ["private", "mirror"]); + let other: Vec<_> = config.resolve_uplinks("lodash").into_iter().map(|(n, _)| n).collect(); + assert_eq!(other, ["npmjs"]); +} + +/// A matched rule with no `proxy:` (or no matching rule at all) yields an +/// empty chain — the package is storage-only. +#[test] +fn from_yaml_str_resolve_uplinks_empty_when_no_proxy() { + let yaml = "\ +storage: ./s +uplinks: + npmjs: { url: https://registry.npmjs.org/ } +packages: + '@private/*': + access: $authenticated + '**': + proxy: npmjs +"; + let config = Config::from_yaml_str(yaml, Path::new("/x"), listen(), None).unwrap(); + assert!(config.resolve_uplinks("@private/foo").is_empty()); + let other: Vec<_> = config.resolve_uplinks("lodash").into_iter().map(|(n, _)| n).collect(); + assert_eq!(other, ["npmjs"]); +} + #[test] fn from_yaml_str_public_url_defaults_to_listen_when_none_passed() { let yaml = "storage: ./s\nuplinks: {}\npackages: {}\n"; diff --git a/pnpr/crates/pnpr/src/error.rs b/pnpr/crates/pnpr/src/error.rs index fcfab07a56..322703561b 100644 --- a/pnpr/crates/pnpr/src/error.rs +++ b/pnpr/crates/pnpr/src/error.rs @@ -214,6 +214,27 @@ pub enum RegistryError { } impl RegistryError { + /// Whether a failed upstream fetch may fall through to the next uplink + /// in a package's `proxy:` fallback chain. Only *availability* failures + /// are retryable this way: a transport error, an open circuit breaker, + /// or an upstream `5xx`. Any `4xx` is an authoritative response about + /// *this* request — `401`/`403` (auth), `429` (throttle), `400`/`410`, + /// etc. — and is **not** eligible: a later uplink must never mask it, + /// which would let a public mirror answer for a package the primary + /// scoped to an authenticated private uplink, or silently bypass a + /// rate-limit. Such an error surfaces immediately. + /// + /// A `404` never reaches here — it is modeled as a distinct not-found + /// outcome, not an error, and the chain walks past it on its own. + #[must_use] + pub fn allows_uplink_fallthrough(&self) -> bool { + match self { + RegistryError::Upstream { .. } | RegistryError::UpstreamUnavailable { .. } => true, + RegistryError::UpstreamStatus { status, .. } => *status >= 500, + _ => false, + } + } + #[must_use] pub fn log_kind(&self) -> &'static str { match self { diff --git a/pnpr/crates/pnpr/src/error/tests.rs b/pnpr/crates/pnpr/src/error/tests.rs index 3fb97b2d79..83a0cca7cb 100644 --- a/pnpr/crates/pnpr/src/error/tests.rs +++ b/pnpr/crates/pnpr/src/error/tests.rs @@ -152,3 +152,36 @@ fn log_message_redacts_malformed_database_url_fragment_secrets() { assert!(!message.contains("fragment-secret")); assert!(!message.contains("#password")); } + +#[test] +fn allows_uplink_fallthrough_only_for_availability_failures() { + // 5xx is an availability signal: a later uplink may serve the package, + // so the chain falls through. + for status in [500, 502, 503, 504] { + let err = RegistryError::UpstreamStatus { + url: "https://up.example/foo".to_string(), + status, + body: String::new(), + }; + assert!(err.allows_uplink_fallthrough(), "status {status} should fall through"); + } + + // An open circuit is an availability failure too. + let circuit_open = RegistryError::UpstreamUnavailable { uplink: "npmjs".to_string() }; + assert!(circuit_open.allows_uplink_fallthrough()); + + // Every 4xx is an authoritative response about this request — including + // 429 (throttle) and 408 (timeout) — and must NOT be masked by a later + // uplink. + for status in [400, 401, 403, 408, 410, 429, 451] { + let err = RegistryError::UpstreamStatus { + url: "https://up.example/foo".to_string(), + status, + body: String::new(), + }; + assert!(!err.allows_uplink_fallthrough(), "status {status} must not fall through"); + } + + // A non-upstream error is never a fall-through candidate. + assert!(!RegistryError::InvalidConfig { reason: "x".to_string() }.allows_uplink_fallthrough()); +} diff --git a/pnpr/crates/pnpr/src/lib.rs b/pnpr/crates/pnpr/src/lib.rs index dff7cb28c8..8fd674f9e8 100644 --- a/pnpr/crates/pnpr/src/lib.rs +++ b/pnpr/crates/pnpr/src/lib.rs @@ -27,10 +27,10 @@ pub use auth::{ identify, }; pub use config::{ - AuthConfig, BackendConfig, Config, ConfigSource, DEFAULT_CONFIG_YAML, FeatureOverrides, - HostedStoreConfig, HtpasswdConfig, LibsqlSettings, LogConfig, LogFormat, LogLevel, MaxUsers, - OsvConfig, PackageAccess, RegistryFeature, ResolverFeature, SqlBackendSettings, TokensConfig, - UplinkConfig, default_cache_dir, + AccessSpec, AuthConfig, BackendConfig, Config, ConfigSource, DEFAULT_CONFIG_YAML, + FeatureOverrides, HostedStoreConfig, HtpasswdConfig, LibsqlSettings, LogConfig, LogFormat, + LogLevel, MaxUsers, OsvConfig, PackageAccess, RegistryFeature, ResolverFeature, + SqlBackendSettings, TokensConfig, UplinkConfig, default_cache_dir, }; pub use error::{RegistryError, Result}; pub use journal::recover_publish_journal; diff --git a/pnpr/crates/pnpr/src/server.rs b/pnpr/crates/pnpr/src/server.rs index 7f7d997369..1addca7ca0 100644 --- a/pnpr/crates/pnpr/src/server.rs +++ b/pnpr/crates/pnpr/src/server.rs @@ -13,7 +13,8 @@ use crate::{ streaming, upstream::{ CacheValidators, FetchOutcome, FetchedPackument, PackumentFetch, Upstream, - abbreviate_packument, extract_version_manifest, rewrite_tarball_urls, tarball_basename, + ValidatorsByUplink, abbreviate_packument, extract_version_manifest, rewrite_tarball_urls, + tarball_basename, }, }; use axum::{ @@ -884,8 +885,12 @@ async fn serve_tarball( return error_response(&err); } - let upstream = resolve_upstream(state, &name); - let should_read_cache = upstream.as_ref().is_none_or(|upstream| upstream.caches()); + let upstreams = resolve_upstreams(state, &name); + // Read the cache if any uplink in the chain mirrors tarballs (or there's + // no upstream left at all — a leftover mirror). Reading is harmless: the + // bytes are verified against the version's `dist.integrity` before being + // served, whichever uplink originally wrote them. + let should_read_cache = upstreams.is_empty() || upstreams.iter().any(|u| u.caches()); if should_read_cache { match state.inner.storage.open_cached_tarball(&name, &filename).await { Ok(Some((file, len))) => { @@ -918,14 +923,20 @@ async fn serve_tarball( } } - let Some(upstream) = upstream else { + if upstreams.is_empty() { return not_found(); - }; + } - let response = match upstream.fetch_tarball_response(&name, &filename).await { - Ok(FetchOutcome::Ok(response)) => response, - Ok(FetchOutcome::NotFound) => return not_found(), - Err(err) => return error_response(&err), + // Walk the uplink fallback chain in declared order: the first uplink to + // return the tarball wins; a `NotFound` falls through to the next; a + // transport error is remembered and the next uplink tried. Integrity is + // enforced on the streamed bytes below regardless of which uplink served + // them, so trying several is safe. + let (upstream, response) = match fetch_tarball_with_fallback(&upstreams, &name, &filename).await + { + TarballFetch::Ok(upstream, response) => (upstream, response), + TarballFetch::NotFound => return not_found(), + TarballFetch::Err(err) => return error_response(&err), }; let write = match state.inner.storage.open_cached_tarball_tmp(&name, &filename).await { @@ -2594,14 +2605,68 @@ fn wants_abbreviated(headers: &HeaderMap) -> bool { .is_some_and(|accept| accept.contains(ABBREVIATED_CONTENT_TYPE)) } -/// Resolve which prebuilt [`Upstream`] should serve `package`, by -/// walking the verdaccio-style `packages` rules in declared order and -/// looking up the resolved uplink name in [`AppInner::upstreams`]. -/// Returns `None` when no rule with a `proxy:` field matches the -/// package, leaving the request to fall through to a not-found. -fn resolve_upstream<'a>(state: &'a AppState, package: &PackageName) -> Option<&'a Upstream> { - let (uplink_name, _) = state.inner.config.resolve_uplink(package.as_str())?; - state.inner.upstreams.get(uplink_name) +/// Resolve the ordered [`Upstream`] fallback chain that should serve +/// `package`, by walking the verdaccio-style `packages` rules in declared +/// order ([`Config::resolve_uplinks`]) and looking each resolved uplink +/// name up in [`AppInner::upstreams`], preserving order. Returns an empty +/// vec when no rule with a `proxy:` field matches, leaving the request to +/// fall through to a not-found. +fn resolve_upstreams<'a>(state: &'a AppState, package: &PackageName) -> Vec<&'a Upstream> { + state + .inner + .config + .resolve_uplinks(package.as_str()) + .into_iter() + .filter_map(|(uplink_name, _)| state.inner.upstreams.get(uplink_name)) + .collect() +} + +/// Outcome of walking the uplink fallback chain for a tarball. +enum TarballFetch<'a> { + /// `upstream` returned the tarball; the caller streams `response` and + /// keys the cache-write decision off `upstream.caches()`. + Ok(&'a Upstream, reqwest::Response), + /// Every uplink answered `404` — the tarball isn't anywhere in the chain. + NotFound, + /// No uplink served it and at least one errored. Surfaced rather than a + /// `404` so a transient failure isn't reported as "gone". + Err(RegistryError), +} + +/// Try each uplink in `upstreams` in order until one returns the tarball. +/// A `NotFound` falls through to the next uplink (a private uplink later in +/// the chain may host it); an *availability* error (transport, circuit, 5xx) +/// is remembered and the next uplink tried. An authoritative upstream error +/// (401/403/other hard 4xx) stops the walk immediately — see +/// [`RegistryError::allows_uplink_fallthrough`] — so a later mirror can't +/// mask the primary's rejection. After the chain is exhausted, an error (if +/// any was seen) takes precedence over `NotFound`, so a tarball that a +/// momentarily failing uplink really hosts isn't masked as a hard 404. +async fn fetch_tarball_with_fallback<'a>( + upstreams: &[&'a Upstream], + name: &PackageName, + filename: &str, +) -> TarballFetch<'a> { + let mut last_err = None; + for upstream in upstreams { + match upstream.fetch_tarball_response(name, filename).await { + Ok(FetchOutcome::Ok(response)) => return TarballFetch::Ok(upstream, response), + Ok(FetchOutcome::NotFound) => continue, + Err(err) => { + tracing::warn!(?err, package = %name.as_str(), %filename, uplink = upstream.name(), "upstream tarball fetch failed"); + if !err.allows_uplink_fallthrough() { + // Hard upstream rejection — surface it rather than letting + // a later uplink answer for a tarball this one refused. + return TarballFetch::Err(err); + } + last_err = Some(err); + } + } + } + match last_err { + Some(err) => TarballFetch::Err(err), + None => TarballFetch::NotFound, + } } /// Result of loading the packument for a package — either bytes (raw, @@ -2630,7 +2695,8 @@ async fn load_packument_bytes(state: &AppState, name: &PackageName) -> Packument } } - let Some(upstream) = resolve_upstream(state, name) else { + let upstreams = resolve_upstreams(state, name); + if upstreams.is_empty() { // Nothing published and no upstream to proxy. The only thing // left is a leftover cache entry (e.g. a `proxy:` rule was // removed after the package was mirrored). @@ -2644,100 +2710,164 @@ async fn load_packument_bytes(state: &AppState, name: &PackageName) -> Packument Ok(None) => PackumentLoad::NotFound, Err(err) => PackumentLoad::Err(err), }; - }; + } // Freshness window for the proxy cache: a cached packument younger // than `ttl` is served straight from disk; older than `ttl` it's - // "stale" and revalidated against the upstream below. Lower = newer + // "stale" and revalidated against an upstream below. Lower = newer // versions surface sooner but more upstream traffic; higher = the // reverse. The conditional GET on the stale path keeps a high `ttl` // cheap (a `304` refreshes the entry without re-downloading it). // - // The uplink's per-uplink `maxage` (verdaccio) wins when set; - // otherwise the global `packument_ttl` (the `--packument-ttl-secs` - // flag) applies. - let ttl = upstream.maxage().unwrap_or(state.inner.config.packument_ttl); + // The TTL is a per-package cache property decided before any uplink is + // contacted, so it comes from the primary (first) uplink's `maxage` + // (verdaccio's knob) when it sets one; otherwise the global + // `packument_ttl` (the `--packument-ttl-secs` flag) applies. Only the + // primary governs freshness — a secondary's `maxage` must not control the + // shared cache it rarely fills. + let ttl = upstreams + .first() + .and_then(|upstream| upstream.maxage()) + .unwrap_or(state.inner.config.packument_ttl); // A fresh entry serves immediately (and moves its bytes out — a // packument can be multiple MB). A stale entry yields only its - // validators; its body stays on disk until a `304`/error path below - // actually needs it, so the common stale→`200` refresh never reads it. + // per-uplink validators; its body stays on disk until a `304`/error + // path below actually needs it, so the common stale→`200` refresh never + // reads it. let validators = match state.inner.storage.read_cached_packument_entry(name, ttl).await { Ok(Some(CachedPackument::Fresh(bytes))) => { record_cache_status("hit"); return PackumentLoad::Ok(bytes); } Ok(Some(CachedPackument::Stale(validators))) => validators, - Ok(None) => CacheValidators::default(), + Ok(None) => ValidatorsByUplink::default(), Err(err) => { tracing::warn!(?err, package = %name.as_str(), "cache read failed"); - CacheValidators::default() + ValidatorsByUplink::default() } }; - // Revalidate conditionally when we hold a stale copy: the upstream - // can answer `304` and save us re-downloading an unchanged packument. - match upstream.fetch_packument(name, &validators).await { - Ok(PackumentFetch::Modified(fetched)) => { - store_fetched_packument(state, name, fetched).await - } - // `304` confirmed our stale copy is current: read it now (deferred - // until here), re-write it to bump the cache mtime so it's fresh - // again until the next TTL window, and serve it. - Ok(PackumentFetch::NotModified) => { - match state.inner.storage.read_cached_packument(name).await { - Ok(Some(bytes)) => { - if let Err(err) = - state.inner.storage.write_cached_packument(name, &bytes, &validators).await - { - tracing::warn!(?err, package = %name.as_str(), "packument cache refresh failed"); - } - record_cache_status("revalidated"); - PackumentLoad::Ok(bytes) - } - // The body vanished between the freshness check and this read - // (cache wiped concurrently). The upstream just confirmed the - // package exists, so re-fetch it unconditionally and self-heal - // rather than 404-ing a present package. - Ok(None) => match upstream.fetch_packument(name, &CacheValidators::default()).await - { - Ok(PackumentFetch::Modified(fetched)) => { - store_fetched_packument(state, name, fetched).await - } - Ok(_) => PackumentLoad::NotFound, - Err(err) => PackumentLoad::Err(err), - }, - Err(err) => PackumentLoad::Err(err), + // Walk the uplink fallback chain in declared order. The first uplink to + // return a body (`Modified`) or confirm our cache (`NotModified`) wins; + // a `NotFound` falls through to the next (a private uplink later in the + // chain may host the package); an *availability* error (transport, + // circuit, 5xx) is remembered and the next uplink tried. An authoritative + // upstream error (401/403/other hard 4xx) stops the walk immediately — + // see [`RegistryError::allows_uplink_fallthrough`] — so a later public + // mirror can never mask the primary's rejection of a scoped package. Each + // uplink is sent only *its own* cached validators — an `ETag` is + // origin-specific, so replaying one uplink's against another could + // spuriously `304` and serve stale data. + let mut last_err = None; + for upstream in &upstreams { + // Revalidate conditionally when we hold this uplink's validators: + // the upstream can answer `304` and save re-downloading an unchanged + // packument. + let uplink_validators = validators.get(upstream.name()); + match upstream.fetch_packument(name, &uplink_validators).await { + Ok(PackumentFetch::Modified(fetched)) => { + return store_fetched_packument(state, name, upstream.name(), fetched).await; } - } - Ok(PackumentFetch::NotFound) => PackumentLoad::NotFound, - Err(err) => { - tracing::warn!(?err, package = %name.as_str(), "upstream packument fetch failed"); - match state.inner.storage.read_cached_packument(name).await { - Ok(Some(bytes)) => { - record_cache_status("stale"); - PackumentLoad::Ok(bytes) + Ok(PackumentFetch::NotModified) => { + return serve_revalidated(state, name, upstream, &uplink_validators).await; + } + Ok(PackumentFetch::NotFound) => continue, + Err(err) => { + tracing::warn!(?err, package = %name.as_str(), uplink = upstream.name(), "upstream packument fetch failed"); + if !err.allows_uplink_fallthrough() { + // Hard upstream rejection (auth/throttle/other 4xx). Surface + // it immediately: don't try later uplinks (which could + // answer for a package this one authoritatively refused) and + // don't fall back to a stale cached body either — serving + // cache here would mask an authoritative denial and keep + // handing out content the upstream is now refusing. + return PackumentLoad::Err(err); } - // No cache to fall back on: surface the upstream failure. - Ok(None) => PackumentLoad::Err(err), - // The cache itself is unreadable: surface that I/O error - // rather than the upstream one — it's the more actionable - // failure when both go wrong. - Err(cache_err) => PackumentLoad::Err(cache_err), + last_err = Some(err); } } } + + // Chain exhausted. If any uplink errored, fall back to a stale cached + // body — a transient failure must never be reported as an authoritative + // 404 the client would cache as "gone". If every uplink answered a clean + // `NotFound`, the package genuinely isn't anywhere in the chain. + match last_err { + Some(err) => serve_stale_or_error(state, name, err).await, + None => PackumentLoad::NotFound, + } } -/// Persist a freshly fetched packument to the proxy cache and return it, -/// tagging the access record as a `miss`. A cache-write failure is logged -/// but not fatal — the fetched bytes are still served. +/// Handle a `304 Not Modified` from `upstream`: the cached body is still +/// current, so read it (deferred until now), re-write it to bump the cache +/// mtime and re-record this uplink's validators, and serve it +/// (`revalidated`). If the body vanished between the freshness check and +/// this read (cache wiped concurrently), re-fetch it unconditionally from +/// the same uplink and self-heal rather than 404-ing a present package. +async fn serve_revalidated( + state: &AppState, + name: &PackageName, + upstream: &Upstream, + validators: &CacheValidators, +) -> PackumentLoad { + match state.inner.storage.read_cached_packument(name).await { + Ok(Some(bytes)) => { + if let Err(err) = state + .inner + .storage + .write_cached_packument(name, &bytes, upstream.name(), validators) + .await + { + tracing::warn!(?err, package = %name.as_str(), "packument cache refresh failed"); + } + record_cache_status("revalidated"); + PackumentLoad::Ok(bytes) + } + Ok(None) => match upstream.fetch_packument(name, &CacheValidators::default()).await { + Ok(PackumentFetch::Modified(fetched)) => { + store_fetched_packument(state, name, upstream.name(), fetched).await + } + Ok(_) => PackumentLoad::NotFound, + Err(err) => PackumentLoad::Err(err), + }, + Err(err) => PackumentLoad::Err(err), + } +} + +/// Every uplink in the chain failed. Fall back to a stale cached body if one +/// exists (tagging the access `stale`); otherwise surface the upstream +/// error — unless the cache read itself failed, in which case that I/O error +/// is the more actionable one to surface. +async fn serve_stale_or_error( + state: &AppState, + name: &PackageName, + err: RegistryError, +) -> PackumentLoad { + match state.inner.storage.read_cached_packument(name).await { + Ok(Some(bytes)) => { + record_cache_status("stale"); + PackumentLoad::Ok(bytes) + } + Ok(None) => PackumentLoad::Err(err), + Err(cache_err) => PackumentLoad::Err(cache_err), + } +} + +/// Persist a freshly fetched packument to the proxy cache (recording +/// `uplink_name`'s validators in the package's per-uplink validator map) +/// and return it, tagging the access record as a `miss`. A cache-write +/// failure is logged but not fatal — the fetched bytes are still served. async fn store_fetched_packument( state: &AppState, name: &PackageName, + uplink_name: &str, fetched: FetchedPackument, ) -> PackumentLoad { - if let Err(err) = - state.inner.storage.write_cached_packument(name, &fetched.bytes, &fetched.validators).await + if let Err(err) = state + .inner + .storage + .write_cached_packument(name, &fetched.bytes, uplink_name, &fetched.validators) + .await { tracing::warn!(?err, package = %name.as_str(), "packument cache write failed"); } @@ -2754,8 +2884,8 @@ async fn store_fetched_packument( /// * `revalidated` — entry was stale; the upstream answered `304 Not /// Modified`, so the cached body was reused. /// * `miss` — fetched a fresh body from the upstream. -/// * `stale` — upstream was unreachable; a stale cached body was served -/// as a fallback. +/// * `stale` — every uplink in the chain errored; a stale cached body was +/// served as a fallback. /// * `orphaned` — a leftover mirror served with no upstream left to /// revalidate against (its `proxy:` rule was removed after the package /// was mirrored). Served regardless of age, so distinct from `hit`. diff --git a/pnpr/crates/pnpr/src/storage.rs b/pnpr/crates/pnpr/src/storage.rs index 04bbf1fc1c..9b3193396d 100644 --- a/pnpr/crates/pnpr/src/storage.rs +++ b/pnpr/crates/pnpr/src/storage.rs @@ -4,7 +4,7 @@ use crate::{ package_name::PackageName, s3::S3Store, streaming, - upstream::CacheValidators, + upstream::{CacheValidators, ValidatorsByUplink}, }; use axum::body::Body; use serde::{Deserialize, Serialize}; @@ -151,14 +151,15 @@ impl Drop for TarballWrite { /// /// * `Fresh` — within the TTL; the body is read and ready to serve. /// * `Stale` — past the TTL; only the small conditional-GET validators -/// are loaded. The body is left on disk and pulled on demand by the -/// caller (via [`Storage::read_cached_packument`]) only if the upstream -/// answers `304` or is unreachable — the common stale→`200` refresh -/// discards the old body, so it's never read. +/// (one set per uplink in the package's fallback chain) are loaded. The +/// body is left on disk and pulled on demand by the caller (via +/// [`Storage::read_cached_packument`]) only if the upstream answers `304` +/// or is unreachable — the common stale→`200` refresh discards the old +/// body, so it's never read. #[derive(Debug)] pub enum CachedPackument { Fresh(Vec), - Stale(CacheValidators), + Stale(ValidatorsByUplink), } #[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] @@ -386,17 +387,29 @@ impl Storage { self.cached.read_packument_any_age(name).await } - /// Write a cached upstream packument and its validators, refreshing - /// the entry's freshness. Called both on a fresh upstream body and - /// on a `304` revalidation (re-written with the unchanged bytes to - /// bump the cache mtime). + /// Write a cached upstream packument, recording **only** `uplink`'s + /// validators (the origin of these bytes) and refreshing the entry's + /// freshness. Called both on a fresh upstream body and on a `304` + /// revalidation (re-written with the unchanged bytes to bump the mtime). + /// + /// The validator map is *replaced*, not merged: the cache holds a single + /// shared packument body, so it must carry validators for exactly the + /// uplink that fetched that body. If validators from other uplinks + /// survived here, a later `304` from one of them would revalidate *these* + /// bytes — which that uplink never served — and we'd keep serving another + /// origin's body under its confirmation. Scoping validators to the body's + /// origin guarantees a conditional GET only ever goes to that origin, so a + /// `304` can only confirm the body actually on disk. pub async fn write_cached_packument( &self, name: &PackageName, bytes: &[u8], + uplink: &str, validators: &CacheValidators, ) -> Result<()> { - self.cached.write_packument_with_meta(name, bytes, validators).await + let mut map = ValidatorsByUplink::default(); + map.set(uplink, validators.clone()); + self.cached.write_packument_with_meta(name, bytes, &map).await } /// Create and open a per-request temp file for a proxied tarball. @@ -497,13 +510,14 @@ impl Store { } } - /// Best-effort read of the validator sidecar. A missing, unreadable, - /// or malformed sidecar yields empty validators so the next refresh - /// falls back to an unconditional GET rather than failing. - async fn read_validators(&self, name: &PackageName) -> CacheValidators { + /// Best-effort read of the validator sidecar. A missing, unreadable, or + /// malformed sidecar — including an older single-object sidecar written + /// before the per-uplink map shape — yields an empty map so the next + /// refresh falls back to an unconditional GET rather than failing. + async fn read_validators(&self, name: &PackageName) -> ValidatorsByUplink { match fs::read(self.packument_meta_path(name)).await { Ok(bytes) => serde_json::from_slice(&bytes).unwrap_or_default(), - Err(_) => CacheValidators::default(), + Err(_) => ValidatorsByUplink::default(), } } @@ -530,7 +544,7 @@ impl Store { &self, name: &PackageName, bytes: &[u8], - validators: &CacheValidators, + validators: &ValidatorsByUplink, ) -> Result<()> { write_atomic(&self.packument_path(name), bytes).await?; let meta_path = self.packument_meta_path(name); diff --git a/pnpr/crates/pnpr/src/storage/tests.rs b/pnpr/crates/pnpr/src/storage/tests.rs index 1f5cbd4d30..c81cf4c8d8 100644 --- a/pnpr/crates/pnpr/src/storage/tests.rs +++ b/pnpr/crates/pnpr/src/storage/tests.rs @@ -25,12 +25,15 @@ fn tarball_sidecar_path(tmp: &TempDir, name: &str, filename: &str) -> PathBuf { tmp.path().join("cache").join(name).join(format!("{filename}{TARBALL_INTEGRITY_SUFFIX}")) } -/// Read an entry back as `Stale` so its validators can be inspected. The -/// write is aged past a 1ms TTL with a short sleep. +/// The uplink name the cache tests key their validators under. +const UPLINK: &str = "npmjs"; + +/// Read an entry back as `Stale` and return `UPLINK`'s validators so they +/// can be inspected. The write is aged past a 1ms TTL with a short sleep. async fn read_stale_validators(storage: &Storage, name: &PackageName) -> CacheValidators { tokio::time::sleep(Duration::from_millis(20)).await; match storage.read_cached_packument_entry(name, Duration::from_millis(1)).await.unwrap() { - Some(CachedPackument::Stale(validators)) => validators, + Some(CachedPackument::Stale(validators)) => validators.get(UPLINK), other => panic!("expected a stale entry, got {other:?}"), } } @@ -42,7 +45,10 @@ async fn fresh_entry_returns_its_body() { let name = pkg("foo"); let body = br#"{"name":"foo"}"#; - storage.write_cached_packument(&name, body, &validators(Some(r#""abc""#), None)).await.unwrap(); + storage + .write_cached_packument(&name, body, UPLINK, &validators(Some(r#""abc""#), None)) + .await + .unwrap(); match storage.read_cached_packument_entry(&name, Duration::from_mins(1)).await.unwrap() { Some(CachedPackument::Fresh(bytes)) => assert_eq!(bytes, body), @@ -60,6 +66,7 @@ async fn stale_entry_returns_its_validators() { .write_cached_packument( &name, b"{}", + UPLINK, &validators(Some(r#""abc""#), Some("Wed, 21 Oct 2015 07:28:00 GMT")), ) .await @@ -85,17 +92,62 @@ async fn empty_validators_remove_a_previously_written_sidecar() { let storage = storage_in(&tmp); let name = pkg("foo"); - storage.write_cached_packument(&name, b"{}", &validators(Some(r#""v1""#), None)).await.unwrap(); + storage + .write_cached_packument(&name, b"{}", UPLINK, &validators(Some(r#""v1""#), None)) + .await + .unwrap(); assert!(sidecar_path(&tmp, "foo").exists(), "validators write a sidecar"); // A later refresh whose upstream sends no validators must drop the // stale sidecar so the next read can't replay an outdated ETag. - storage.write_cached_packument(&name, b"{}", &CacheValidators::default()).await.unwrap(); + storage + .write_cached_packument(&name, b"{}", UPLINK, &CacheValidators::default()) + .await + .unwrap(); assert!(!sidecar_path(&tmp, "foo").exists(), "empty validators remove the sidecar"); assert!(read_stale_validators(&storage, &name).await.is_empty()); } +#[tokio::test] +async fn writing_a_new_body_scopes_validators_to_its_origin_uplink() { + let tmp = TempDir::new().unwrap(); + let storage = storage_in(&tmp); + let name = pkg("foo"); + + // The primary fills the cache, then a secondary replaces the shared body. + storage + .write_cached_packument( + &name, + br#"{"v":"a"}"#, + "primary", + &validators(Some(r#""a""#), None), + ) + .await + .unwrap(); + storage + .write_cached_packument( + &name, + br#"{"v":"b"}"#, + "secondary", + &validators(Some(r#""b""#), None), + ) + .await + .unwrap(); + + tokio::time::sleep(Duration::from_millis(20)).await; + let map = + match storage.read_cached_packument_entry(&name, Duration::from_millis(1)).await.unwrap() { + Some(CachedPackument::Stale(map)) => map, + other => panic!("expected a stale entry, got {other:?}"), + }; + // Only the body's origin keeps validators: the primary's are dropped, so a + // later refresh sends the primary an unconditional GET and a 304 can only + // come from the secondary — the uplink that actually wrote the body. + assert_eq!(map.get("secondary").etag.as_deref(), Some(r#""b""#)); + assert!(map.get("primary").is_empty()); +} + #[tokio::test] async fn writing_without_validators_is_a_noop_when_no_sidecar_exists() { let tmp = TempDir::new().unwrap(); @@ -104,7 +156,10 @@ async fn writing_without_validators_is_a_noop_when_no_sidecar_exists() { // First write already carries no validators: removing the (absent) // sidecar must be a benign no-op, not an error. - storage.write_cached_packument(&name, b"{}", &CacheValidators::default()).await.unwrap(); + storage + .write_cached_packument(&name, b"{}", UPLINK, &CacheValidators::default()) + .await + .unwrap(); assert!(!sidecar_path(&tmp, "foo").exists()); assert!(read_stale_validators(&storage, &name).await.is_empty()); @@ -115,7 +170,10 @@ async fn malformed_sidecar_reads_as_empty_validators() { let tmp = TempDir::new().unwrap(); let storage = storage_in(&tmp); let name = pkg("foo"); - storage.write_cached_packument(&name, b"{}", &validators(Some(r#""v1""#), None)).await.unwrap(); + storage + .write_cached_packument(&name, b"{}", UPLINK, &validators(Some(r#""v1""#), None)) + .await + .unwrap(); // A damaged sidecar must degrade to empty validators (forcing an // unconditional refresh) rather than failing the read. @@ -130,7 +188,7 @@ async fn read_cached_packument_returns_bytes_regardless_of_age() { let storage = storage_in(&tmp); let name = pkg("foo"); storage - .write_cached_packument(&name, br#"{"v":1}"#, &CacheValidators::default()) + .write_cached_packument(&name, br#"{"v":1}"#, UPLINK, &CacheValidators::default()) .await .unwrap(); diff --git a/pnpr/crates/pnpr/src/upstream.rs b/pnpr/crates/pnpr/src/upstream.rs index f091a461d9..38844cca57 100644 --- a/pnpr/crates/pnpr/src/upstream.rs +++ b/pnpr/crates/pnpr/src/upstream.rs @@ -4,6 +4,7 @@ use crate::{ package_name::PackageName, }; use chrono::{DateTime, Timelike, Utc}; +use indexmap::IndexMap; use pacquet_network::ThrottledClient; use reqwest::{ StatusCode, @@ -184,6 +185,53 @@ impl CacheValidators { } } +/// The conditional-GET validators a package's cached packument carries, +/// keyed by uplink name. A `proxy:` rule can list several uplinks as a +/// fallback chain, and each upstream's `ETag`/`Last-Modified` is its own — +/// replaying one origin's validators against another risks a spurious +/// `304` (another origin's body served under that confirmation). A refresh +/// therefore sends each uplink only [`Self::get`]'s entry for it. +/// +/// In practice the cache holds a single shared packument body, so +/// [`crate::storage::Storage::write_cached_packument`] keeps validators for +/// exactly the uplink that fetched that body — the map carries at most one +/// entry (the body's origin), and every other uplink resolves to empty +/// validators (an unconditional GET). Modelling it as a map keeps the +/// `get`/`set` plumbing origin-agnostic and tolerates a stray multi-entry +/// sidecar from a future change without ever sending the wrong origin's +/// validators. +/// +/// Persisted as the packument's validator sidecar (see [`crate::storage`]), +/// a JSON object `{ "": { "etag": ..., "last_modified": ... } }`. +/// An older single-object sidecar (a bare [`CacheValidators`]) fails to +/// deserialize into this shape; the storage layer degrades that to an empty +/// map, costing one unconditional refetch after an upgrade. +#[derive(Debug, Default, Clone, Serialize, Deserialize)] +pub struct ValidatorsByUplink(IndexMap); + +impl ValidatorsByUplink { + pub fn is_empty(&self) -> bool { + self.0.is_empty() + } + + /// The validators recorded for `uplink`, or empty defaults when there + /// are none — the signal for an unconditional GET to that uplink. + pub fn get(&self, uplink: &str) -> CacheValidators { + self.0.get(uplink).cloned().unwrap_or_default() + } + + /// Record (replacing) one uplink's validators. Empty validators clear + /// the entry, so a later read can't replay an `ETag` the upstream no + /// longer sends. + pub fn set(&mut self, uplink: &str, validators: CacheValidators) { + if validators.is_empty() { + self.0.shift_remove(uplink); + } else { + self.0.insert(uplink.to_string(), validators); + } + } +} + /// A packument fetched (or revalidated) against an upstream. #[derive(Debug)] pub struct FetchedPackument { @@ -221,6 +269,13 @@ impl Upstream { } } + /// The configured uplink name (the YAML `uplinks:` key). Used to key + /// this uplink's entry in the cache's per-uplink validator map + /// ([`ValidatorsByUplink`]). + pub fn name(&self) -> &str { + &self.name + } + /// Per-uplink packument freshness window (`maxage`), or `None` to /// defer to the global [`crate::config::Config::packument_ttl`]. pub fn maxage(&self) -> Option { diff --git a/pnpr/crates/pnpr/tests/server.rs b/pnpr/crates/pnpr/tests/server.rs index 4d18110de5..a2380e385e 100644 --- a/pnpr/crates/pnpr/tests/server.rs +++ b/pnpr/crates/pnpr/tests/server.rs @@ -4,7 +4,7 @@ use axum::{ }; use flate2::read::GzDecoder; use futures_util::stream; -use pnpr::{AuthState, Config, router, router_with_auth}; +use pnpr::{AccessSpec, AuthState, Config, PackageAccess, router, router_with_auth}; use serde_json::{Value, json}; use ssri::{Algorithm, IntegrityOpts}; use std::{ @@ -35,6 +35,24 @@ fn config_for(upstream: &str, storage: PathBuf) -> Config { config } +/// A proxy config whose `**` rule lists two uplinks — `npmjs` (primary) +/// then `private` (secondary) — as an ordered fallback chain, so the +/// registry tries `primary` first and falls through to `secondary`. +fn config_for_two(primary: &str, secondary: &str, storage: PathBuf) -> Config { + let mut config = config_for(primary, storage); + let mut secondary_uplink = config.uplinks.get("npmjs").expect("default `npmjs` uplink").clone(); + secondary_uplink.url = secondary.to_string(); + config.uplinks.insert("private".to_string(), secondary_uplink); + config.packages.insert( + "**".to_string(), + PackageAccess { + proxy: Some(AccessSpec::Many(vec!["npmjs".to_string(), "private".to_string()])), + ..Default::default() + }, + ); + config +} + async fn body_bytes(body: Body) -> Vec { to_bytes(body, usize::MAX).await.expect("read body").to_vec() } @@ -640,6 +658,242 @@ async fn tarball_is_proxied_and_cached() { mock.assert_async().await; } +/// Builds a packument whose single `1.0.0` version points its tarball at +/// `upstream_url`. Used by the multi-uplink tests, which need to control the +/// upstream the packument is served from independently of the helper that +/// also installs a mock. +fn packument_json(upstream_url: &str) -> Value { + json!({ + "name": "foo", + "dist-tags": { "latest": "1.0.0" }, + "versions": { + "1.0.0": { + "name": "foo", + "version": "1.0.0", + "dist": { + "tarball": format!("{upstream_url}/foo/-/foo-1.0.0.tgz"), + "integrity": "sha512-deadbeef", + }, + }, + }, + }) +} + +#[tokio::test] +async fn packument_falls_back_to_secondary_uplink_on_primary_error() { + let mut primary = mockito::Server::new_async().await; + let mut secondary = mockito::Server::new_async().await; + // Primary errors (5xx) once; the registry must fall through to the + // secondary rather than surfacing the error. + let primary_mock = primary.mock("GET", "/foo").with_status(500).expect(1).create_async().await; + let secondary_mock = secondary + .mock("GET", "/foo") + .with_status(200) + .with_header("content-type", "application/json") + .with_body(packument_json(&secondary.url()).to_string()) + .expect(1) + .create_async() + .await; + + let tmp = TempDir::new().unwrap(); + let app = router(config_for_two(&primary.url(), &secondary.url(), tmp.path().to_path_buf())); + let response = app.oneshot(Request::get("/foo").body(Body::empty()).unwrap()).await.unwrap(); + assert_eq!(response.status(), StatusCode::OK); + let body: Value = serde_json::from_slice(&body_bytes(response.into_body()).await).unwrap(); + assert_eq!(body["versions"]["1.0.0"]["version"], "1.0.0"); + + primary_mock.assert_async().await; + secondary_mock.assert_async().await; +} + +#[tokio::test] +async fn packument_falls_back_to_secondary_uplink_on_primary_404() { + let mut primary = mockito::Server::new_async().await; + let mut secondary = mockito::Server::new_async().await; + // A 404 from the primary is not authoritative across the chain: a + // private uplink later in the list may still host the package. + let primary_mock = primary.mock("GET", "/foo").with_status(404).expect(1).create_async().await; + let secondary_mock = secondary + .mock("GET", "/foo") + .with_status(200) + .with_header("content-type", "application/json") + .with_body(packument_json(&secondary.url()).to_string()) + .expect(1) + .create_async() + .await; + + let tmp = TempDir::new().unwrap(); + let app = router(config_for_two(&primary.url(), &secondary.url(), tmp.path().to_path_buf())); + let response = app.oneshot(Request::get("/foo").body(Body::empty()).unwrap()).await.unwrap(); + assert_eq!(response.status(), StatusCode::OK); + let body: Value = serde_json::from_slice(&body_bytes(response.into_body()).await).unwrap(); + assert_eq!(body["versions"]["1.0.0"]["version"], "1.0.0"); + + primary_mock.assert_async().await; + secondary_mock.assert_async().await; +} + +#[tokio::test] +async fn packument_does_not_fall_through_on_primary_hard_4xx() { + let mut primary = mockito::Server::new_async().await; + let mut secondary = mockito::Server::new_async().await; + // A hard authoritative rejection (403) from the primary must not be + // masked by a later uplink: the secondary is never contacted, and the + // rejection surfaces as a gateway error rather than the secondary's body. + let primary_mock = primary.mock("GET", "/foo").with_status(403).expect(1).create_async().await; + let secondary_mock = secondary + .mock("GET", "/foo") + .with_status(200) + .with_header("content-type", "application/json") + .with_body(packument_json(&secondary.url()).to_string()) + .expect(0) + .create_async() + .await; + + let tmp = TempDir::new().unwrap(); + let app = router(config_for_two(&primary.url(), &secondary.url(), tmp.path().to_path_buf())); + let response = app.oneshot(Request::get("/foo").body(Body::empty()).unwrap()).await.unwrap(); + assert_eq!(response.status(), StatusCode::BAD_GATEWAY); + + primary_mock.assert_async().await; + // `expect(0)` asserts the secondary was never reached. + secondary_mock.assert_async().await; +} + +#[tokio::test] +async fn packument_hard_4xx_does_not_serve_stale_cache() { + // Warm the cache from a live primary, then make the primary return a hard + // 403: the authoritative rejection must surface as a gateway error, not be + // masked by the stale cached body. + let mut warm_primary = mockito::Server::new_async().await; + let warm_mock = warm_primary + .mock("GET", "/foo") + .with_status(200) + .with_header("content-type", "application/json") + .with_body(packument_json(&warm_primary.url()).to_string()) + .expect(1) + .create_async() + .await; + + let tmp = TempDir::new().unwrap(); + let storage = tmp.path().to_path_buf(); + + let secondary = mockito::Server::new_async().await; + let warm = router(config_for_two(&warm_primary.url(), &secondary.url(), storage.clone())); + let first = warm.oneshot(Request::get("/foo").body(Body::empty()).unwrap()).await.unwrap(); + assert_eq!(first.status(), StatusCode::OK); + warm_mock.assert_async().await; + + // Primary now rejects with 403; the secondary must never be consulted and + // the stale cache must not be served. + tokio::time::sleep(Duration::from_millis(20)).await; + let mut denying_primary = mockito::Server::new_async().await; + let deny_mock = + denying_primary.mock("GET", "/foo").with_status(403).expect(1).create_async().await; + let mut secondary = mockito::Server::new_async().await; + let secondary_mock = secondary + .mock("GET", "/foo") + .with_status(200) + .with_header("content-type", "application/json") + .with_body(packument_json(&secondary.url()).to_string()) + .expect(0) + .create_async() + .await; + + let mut config = config_for_two(&denying_primary.url(), &secondary.url(), storage); + config.packument_ttl = Duration::from_millis(1); + let response = + router(config).oneshot(Request::get("/foo").body(Body::empty()).unwrap()).await.unwrap(); + assert_eq!(response.status(), StatusCode::BAD_GATEWAY); + + deny_mock.assert_async().await; + secondary_mock.assert_async().await; +} + +#[tokio::test] +async fn packument_is_not_found_when_all_uplinks_404() { + let mut primary = mockito::Server::new_async().await; + let mut secondary = mockito::Server::new_async().await; + let primary_mock = primary.mock("GET", "/foo").with_status(404).expect(1).create_async().await; + let secondary_mock = + secondary.mock("GET", "/foo").with_status(404).expect(1).create_async().await; + + let tmp = TempDir::new().unwrap(); + let app = router(config_for_two(&primary.url(), &secondary.url(), tmp.path().to_path_buf())); + let response = app.oneshot(Request::get("/foo").body(Body::empty()).unwrap()).await.unwrap(); + assert_eq!(response.status(), StatusCode::NOT_FOUND); + + primary_mock.assert_async().await; + secondary_mock.assert_async().await; +} + +#[tokio::test] +async fn tarball_falls_back_to_secondary_uplink() { + let mut primary = mockito::Server::new_async().await; + let mut secondary = mockito::Server::new_async().await; + let bytes = b"fallback-tarball-bytes"; + // The primary serves the packument (so version + integrity resolve) but + // 404s the tarball; the secondary holds the actual bytes. + let packument_mock = mock_packument_for_tarball(&mut primary, "foo", "1.0.0", bytes).await; + let primary_tarball = + primary.mock("GET", "/foo/-/foo-1.0.0.tgz").with_status(404).expect(1).create_async().await; + let secondary_tarball = secondary + .mock("GET", "/foo/-/foo-1.0.0.tgz") + .with_status(200) + .with_header("content-type", "application/octet-stream") + .with_body(bytes) + .expect(1) + .create_async() + .await; + + let tmp = TempDir::new().unwrap(); + let app = router(config_for_two(&primary.url(), &secondary.url(), tmp.path().to_path_buf())); + let response = app + .oneshot(Request::get("/foo/-/foo-1.0.0.tgz").body(Body::empty()).unwrap()) + .await + .unwrap(); + assert_eq!(response.status(), StatusCode::OK); + assert_eq!(body_bytes(response.into_body()).await, bytes); + + packument_mock.assert_async().await; + primary_tarball.assert_async().await; + secondary_tarball.assert_async().await; +} + +#[tokio::test] +async fn packument_serves_stale_cache_when_all_uplinks_fail() { + let mut primary = mockito::Server::new_async().await; + let secondary = mockito::Server::new_async().await; + let packument_mock = primary + .mock("GET", "/foo") + .with_status(200) + .with_header("content-type", "application/json") + .with_body(packument_json(&primary.url()).to_string()) + .expect(1) + .create_async() + .await; + + let tmp = TempDir::new().unwrap(); + let storage = tmp.path().to_path_buf(); + + // Populate the cache from the live uplinks. + let warm = router(config_for_two(&primary.url(), &secondary.url(), storage.clone())); + let first = warm.oneshot(Request::get("/foo").body(Body::empty()).unwrap()).await.unwrap(); + assert_eq!(first.status(), StatusCode::OK); + packument_mock.assert_async().await; + + // Both uplinks now unreachable and the cache is past its TTL: the stale + // body must still be served rather than erroring. + tokio::time::sleep(Duration::from_millis(20)).await; + let mut dead = config_for_two("http://127.0.0.1:1", "http://127.0.0.1:1", storage); + dead.packument_ttl = Duration::from_millis(1); + let stale = + router(dead).oneshot(Request::get("/foo").body(Body::empty()).unwrap()).await.unwrap(); + assert_eq!(stale.status(), StatusCode::OK); + let body: Value = serde_json::from_slice(&body_bytes(stale.into_body()).await).unwrap(); + assert_eq!(body["versions"]["1.0.0"]["version"], "1.0.0"); +} + #[tokio::test] async fn osv_refuses_vulnerable_tarball_before_upstream_fetch() { let mut upstream = mockito::Server::new_async().await;