diff --git a/pnpr/crates/pnpr/src/config.rs b/pnpr/crates/pnpr/src/config.rs index 3f1c138347..ee8c4ffded 100644 --- a/pnpr/crates/pnpr/src/config.rs +++ b/pnpr/crates/pnpr/src/config.rs @@ -761,19 +761,20 @@ fn non_empty_token(token: &str) -> Option { /// `unpublish` are verdaccio permission lists (built-in groups like /// `$all` / `$authenticated` / `$anonymous`, plus usernames / group /// names), compiled into the [`PackagePolicies`] that gate reads and -/// writes. `proxy` selects the [`UplinkConfig`] by name. +/// writes. `proxy` names the uplinks to fall back through, in order +/// (`proxy: npmjs private`); see [`Config::resolve_uplinks`]. #[derive(Debug, Default, Clone, Deserialize)] pub struct PackageAccess { pub access: Option, pub publish: Option, pub unpublish: Option, - pub proxy: Option, + pub proxy: Option, } -/// A YAML permission value. Verdaccio accepts either a single -/// space-separated string (`access: $authenticated admin`) or a -/// sequence (`access: [$authenticated, admin]`); both normalize to the -/// same token list. +/// A YAML string-or-list value. Verdaccio accepts either a single +/// space-separated string (`access: $authenticated admin`, +/// `proxy: npmjs private`) or a sequence (`access: [$authenticated, +/// admin]`); both normalize to the same ordered token list. #[derive(Debug, Clone, Deserialize)] #[serde(untagged)] pub enum AccessSpec { @@ -792,6 +793,19 @@ impl AccessSpec { } } } + + /// The tokens in declared order, each element flattened on whitespace + /// (so `[a b, c]` and `[a, b, c]` agree). Unlike [`Self::to_access_list`] + /// — which builds an unordered permission *set* — this preserves order, + /// which `proxy:` relies on for its fallback chain. + fn to_ordered_tokens(&self) -> Vec<&str> { + match self { + AccessSpec::One(spec) => spec.split_whitespace().collect(), + AccessSpec::Many(items) => { + items.iter().flat_map(|item| item.split_whitespace()).collect() + } + } + } } /// Disk shape of the YAML file. Fields verdaccio supports but @@ -961,7 +975,10 @@ impl Config { let mut packages = IndexMap::new(); packages.insert( "**".to_string(), - PackageAccess { proxy: Some("npmjs".to_string()), ..Default::default() }, + PackageAccess { + proxy: Some(AccessSpec::One("npmjs".to_string())), + ..Default::default() + }, ); Self { listen, @@ -1270,20 +1287,43 @@ impl Config { } } - /// Find the uplink for `package_name` by walking [`Self::packages`] - /// in declared order: the first pattern that matches is the rule - /// that applies. If that rule has no `proxy:`, the package is - /// storage-only and this returns `None` — matching verdaccio's - /// first-match-wins semantics. The returned tuple's first element - /// is the uplink *name* (the key in [`Self::uplinks`]); callers - /// that have pre-built per-uplink state can use it as an index. + /// Find the uplink fallback chain for `package_name` by walking + /// [`Self::packages`] in declared order: the first pattern that matches + /// is the rule that applies (verdaccio's first-match-wins). That rule's + /// `proxy:` names the uplinks to try, **in order** — the registry walks + /// the returned list as a fallback chain (try the first, fall through to + /// the next on failure or 404). A proxy name absent from [`Self::uplinks`] + /// is silently skipped (as verdaccio does). An empty result means the + /// package is storage-only. + /// + /// Each tuple's first element is the uplink *name* (the key in + /// [`Self::uplinks`]), so callers with pre-built per-uplink state can use + /// it as an index. + #[must_use] + pub fn resolve_uplinks(&self, package_name: &str) -> Vec<(&str, &UplinkConfig)> { + let Some(access) = self + .packages + .iter() + .find_map(|(pattern, access)| pattern_matches(pattern, package_name).then_some(access)) + else { + return Vec::new(); + }; + let Some(proxy) = &access.proxy else { + return Vec::new(); + }; + proxy + .to_ordered_tokens() + .into_iter() + .filter_map(|name| self.uplinks.get_key_value(name).map(|(k, v)| (k.as_str(), v))) + .collect() + } + + /// The primary (first) uplink for `package_name`, or `None` when the + /// package is storage-only. A convenience over [`Self::resolve_uplinks`] + /// for the single-uplink question. #[must_use] pub fn resolve_uplink(&self, package_name: &str) -> Option<(&str, &UplinkConfig)> { - let access = self.packages.iter().find_map(|(pattern, access)| { - pattern_matches(pattern, package_name).then_some(access) - })?; - let proxy_name = access.proxy.as_deref()?; - self.uplinks.get_key_value(proxy_name).map(|(k, v)| (k.as_str(), v)) + self.resolve_uplinks(package_name).into_iter().next() } } diff --git a/pnpr/crates/pnpr/src/config/tests.rs b/pnpr/crates/pnpr/src/config/tests.rs index e478a11490..a3e44597b6 100644 --- a/pnpr/crates/pnpr/src/config/tests.rs +++ b/pnpr/crates/pnpr/src/config/tests.rs @@ -868,6 +868,106 @@ packages: assert_eq!(config.resolve_uplink("lodash").unwrap().0, "npmjs"); } +/// `proxy:` as a space-separated string lists the uplinks as an ordered +/// fallback chain (verdaccio's `proxy: npmjs private` shape). +#[test] +fn from_yaml_str_proxy_string_lists_uplinks_in_order() { + let yaml = "\ +storage: ./s +uplinks: + npmjs: { url: https://registry.npmjs.org/ } + private: { url: https://private.example/ } +packages: + '**': + proxy: npmjs private +"; + let config = Config::from_yaml_str(yaml, Path::new("/x"), listen(), None).unwrap(); + let names: Vec<_> = config.resolve_uplinks("anything").into_iter().map(|(n, _)| n).collect(); + assert_eq!(names, ["npmjs", "private"]); + // The convenience singular accessor returns the primary (first). + assert_eq!(config.resolve_uplink("anything").unwrap().0, "npmjs"); +} + +/// `proxy:` as a YAML sequence parses to the same ordered chain as the +/// space-separated string form. +#[test] +fn from_yaml_str_proxy_sequence_lists_uplinks_in_order() { + let yaml = "\ +storage: ./s +uplinks: + npmjs: { url: https://registry.npmjs.org/ } + private: { url: https://private.example/ } +packages: + '**': + proxy: [private, npmjs] +"; + let config = Config::from_yaml_str(yaml, Path::new("/x"), listen(), None).unwrap(); + let names: Vec<_> = config.resolve_uplinks("anything").into_iter().map(|(n, _)| n).collect(); + assert_eq!(names, ["private", "npmjs"]); +} + +/// A proxy name with no matching `uplinks:` entry is silently skipped +/// (verdaccio ignores unknown proxy names), and the rest of the chain is +/// preserved in order. +#[test] +fn from_yaml_str_proxy_skips_unknown_uplink_names() { + let yaml = "\ +storage: ./s +uplinks: + npmjs: { url: https://registry.npmjs.org/ } +packages: + '**': + proxy: ghost npmjs +"; + let config = Config::from_yaml_str(yaml, Path::new("/x"), listen(), None).unwrap(); + let names: Vec<_> = config.resolve_uplinks("anything").into_iter().map(|(n, _)| n).collect(); + assert_eq!(names, ["npmjs"]); +} + +/// First-match-wins still selects a single rule, then expands *that* rule's +/// proxy list — a later catch-all's uplinks don't get appended. +#[test] +fn from_yaml_str_resolve_uplinks_expands_only_the_matched_rule() { + let yaml = "\ +storage: ./s +uplinks: + mirror: { url: https://mirror.example/ } + npmjs: { url: https://registry.npmjs.org/ } + private: { url: https://private.example/ } +packages: + '@private/*': + proxy: private mirror + '**': + proxy: npmjs +"; + let config = Config::from_yaml_str(yaml, Path::new("/x"), listen(), None).unwrap(); + let scoped: Vec<_> = + config.resolve_uplinks("@private/foo").into_iter().map(|(n, _)| n).collect(); + assert_eq!(scoped, ["private", "mirror"]); + let other: Vec<_> = config.resolve_uplinks("lodash").into_iter().map(|(n, _)| n).collect(); + assert_eq!(other, ["npmjs"]); +} + +/// A matched rule with no `proxy:` (or no matching rule at all) yields an +/// empty chain — the package is storage-only. +#[test] +fn from_yaml_str_resolve_uplinks_empty_when_no_proxy() { + let yaml = "\ +storage: ./s +uplinks: + npmjs: { url: https://registry.npmjs.org/ } +packages: + '@private/*': + access: $authenticated + '**': + proxy: npmjs +"; + let config = Config::from_yaml_str(yaml, Path::new("/x"), listen(), None).unwrap(); + assert!(config.resolve_uplinks("@private/foo").is_empty()); + let other: Vec<_> = config.resolve_uplinks("lodash").into_iter().map(|(n, _)| n).collect(); + assert_eq!(other, ["npmjs"]); +} + #[test] fn from_yaml_str_public_url_defaults_to_listen_when_none_passed() { let yaml = "storage: ./s\nuplinks: {}\npackages: {}\n"; diff --git a/pnpr/crates/pnpr/src/error.rs b/pnpr/crates/pnpr/src/error.rs index fcfab07a56..322703561b 100644 --- a/pnpr/crates/pnpr/src/error.rs +++ b/pnpr/crates/pnpr/src/error.rs @@ -214,6 +214,27 @@ pub enum RegistryError { } impl RegistryError { + /// Whether a failed upstream fetch may fall through to the next uplink + /// in a package's `proxy:` fallback chain. Only *availability* failures + /// are retryable this way: a transport error, an open circuit breaker, + /// or an upstream `5xx`. Any `4xx` is an authoritative response about + /// *this* request — `401`/`403` (auth), `429` (throttle), `400`/`410`, + /// etc. — and is **not** eligible: a later uplink must never mask it, + /// which would let a public mirror answer for a package the primary + /// scoped to an authenticated private uplink, or silently bypass a + /// rate-limit. Such an error surfaces immediately. + /// + /// A `404` never reaches here — it is modeled as a distinct not-found + /// outcome, not an error, and the chain walks past it on its own. + #[must_use] + pub fn allows_uplink_fallthrough(&self) -> bool { + match self { + RegistryError::Upstream { .. } | RegistryError::UpstreamUnavailable { .. } => true, + RegistryError::UpstreamStatus { status, .. } => *status >= 500, + _ => false, + } + } + #[must_use] pub fn log_kind(&self) -> &'static str { match self { diff --git a/pnpr/crates/pnpr/src/error/tests.rs b/pnpr/crates/pnpr/src/error/tests.rs index 3fb97b2d79..83a0cca7cb 100644 --- a/pnpr/crates/pnpr/src/error/tests.rs +++ b/pnpr/crates/pnpr/src/error/tests.rs @@ -152,3 +152,36 @@ fn log_message_redacts_malformed_database_url_fragment_secrets() { assert!(!message.contains("fragment-secret")); assert!(!message.contains("#password")); } + +#[test] +fn allows_uplink_fallthrough_only_for_availability_failures() { + // 5xx is an availability signal: a later uplink may serve the package, + // so the chain falls through. + for status in [500, 502, 503, 504] { + let err = RegistryError::UpstreamStatus { + url: "https://up.example/foo".to_string(), + status, + body: String::new(), + }; + assert!(err.allows_uplink_fallthrough(), "status {status} should fall through"); + } + + // An open circuit is an availability failure too. + let circuit_open = RegistryError::UpstreamUnavailable { uplink: "npmjs".to_string() }; + assert!(circuit_open.allows_uplink_fallthrough()); + + // Every 4xx is an authoritative response about this request — including + // 429 (throttle) and 408 (timeout) — and must NOT be masked by a later + // uplink. + for status in [400, 401, 403, 408, 410, 429, 451] { + let err = RegistryError::UpstreamStatus { + url: "https://up.example/foo".to_string(), + status, + body: String::new(), + }; + assert!(!err.allows_uplink_fallthrough(), "status {status} must not fall through"); + } + + // A non-upstream error is never a fall-through candidate. + assert!(!RegistryError::InvalidConfig { reason: "x".to_string() }.allows_uplink_fallthrough()); +} diff --git a/pnpr/crates/pnpr/src/lib.rs b/pnpr/crates/pnpr/src/lib.rs index dff7cb28c8..8fd674f9e8 100644 --- a/pnpr/crates/pnpr/src/lib.rs +++ b/pnpr/crates/pnpr/src/lib.rs @@ -27,10 +27,10 @@ pub use auth::{ identify, }; pub use config::{ - AuthConfig, BackendConfig, Config, ConfigSource, DEFAULT_CONFIG_YAML, FeatureOverrides, - HostedStoreConfig, HtpasswdConfig, LibsqlSettings, LogConfig, LogFormat, LogLevel, MaxUsers, - OsvConfig, PackageAccess, RegistryFeature, ResolverFeature, SqlBackendSettings, TokensConfig, - UplinkConfig, default_cache_dir, + AccessSpec, AuthConfig, BackendConfig, Config, ConfigSource, DEFAULT_CONFIG_YAML, + FeatureOverrides, HostedStoreConfig, HtpasswdConfig, LibsqlSettings, LogConfig, LogFormat, + LogLevel, MaxUsers, OsvConfig, PackageAccess, RegistryFeature, ResolverFeature, + SqlBackendSettings, TokensConfig, UplinkConfig, default_cache_dir, }; pub use error::{RegistryError, Result}; pub use journal::recover_publish_journal; diff --git a/pnpr/crates/pnpr/src/server.rs b/pnpr/crates/pnpr/src/server.rs index 7f7d997369..1addca7ca0 100644 --- a/pnpr/crates/pnpr/src/server.rs +++ b/pnpr/crates/pnpr/src/server.rs @@ -13,7 +13,8 @@ use crate::{ streaming, upstream::{ CacheValidators, FetchOutcome, FetchedPackument, PackumentFetch, Upstream, - abbreviate_packument, extract_version_manifest, rewrite_tarball_urls, tarball_basename, + ValidatorsByUplink, abbreviate_packument, extract_version_manifest, rewrite_tarball_urls, + tarball_basename, }, }; use axum::{ @@ -884,8 +885,12 @@ async fn serve_tarball( return error_response(&err); } - let upstream = resolve_upstream(state, &name); - let should_read_cache = upstream.as_ref().is_none_or(|upstream| upstream.caches()); + let upstreams = resolve_upstreams(state, &name); + // Read the cache if any uplink in the chain mirrors tarballs (or there's + // no upstream left at all — a leftover mirror). Reading is harmless: the + // bytes are verified against the version's `dist.integrity` before being + // served, whichever uplink originally wrote them. + let should_read_cache = upstreams.is_empty() || upstreams.iter().any(|u| u.caches()); if should_read_cache { match state.inner.storage.open_cached_tarball(&name, &filename).await { Ok(Some((file, len))) => { @@ -918,14 +923,20 @@ async fn serve_tarball( } } - let Some(upstream) = upstream else { + if upstreams.is_empty() { return not_found(); - }; + } - let response = match upstream.fetch_tarball_response(&name, &filename).await { - Ok(FetchOutcome::Ok(response)) => response, - Ok(FetchOutcome::NotFound) => return not_found(), - Err(err) => return error_response(&err), + // Walk the uplink fallback chain in declared order: the first uplink to + // return the tarball wins; a `NotFound` falls through to the next; a + // transport error is remembered and the next uplink tried. Integrity is + // enforced on the streamed bytes below regardless of which uplink served + // them, so trying several is safe. + let (upstream, response) = match fetch_tarball_with_fallback(&upstreams, &name, &filename).await + { + TarballFetch::Ok(upstream, response) => (upstream, response), + TarballFetch::NotFound => return not_found(), + TarballFetch::Err(err) => return error_response(&err), }; let write = match state.inner.storage.open_cached_tarball_tmp(&name, &filename).await { @@ -2594,14 +2605,68 @@ fn wants_abbreviated(headers: &HeaderMap) -> bool { .is_some_and(|accept| accept.contains(ABBREVIATED_CONTENT_TYPE)) } -/// Resolve which prebuilt [`Upstream`] should serve `package`, by -/// walking the verdaccio-style `packages` rules in declared order and -/// looking up the resolved uplink name in [`AppInner::upstreams`]. -/// Returns `None` when no rule with a `proxy:` field matches the -/// package, leaving the request to fall through to a not-found. -fn resolve_upstream<'a>(state: &'a AppState, package: &PackageName) -> Option<&'a Upstream> { - let (uplink_name, _) = state.inner.config.resolve_uplink(package.as_str())?; - state.inner.upstreams.get(uplink_name) +/// Resolve the ordered [`Upstream`] fallback chain that should serve +/// `package`, by walking the verdaccio-style `packages` rules in declared +/// order ([`Config::resolve_uplinks`]) and looking each resolved uplink +/// name up in [`AppInner::upstreams`], preserving order. Returns an empty +/// vec when no rule with a `proxy:` field matches, leaving the request to +/// fall through to a not-found. +fn resolve_upstreams<'a>(state: &'a AppState, package: &PackageName) -> Vec<&'a Upstream> { + state + .inner + .config + .resolve_uplinks(package.as_str()) + .into_iter() + .filter_map(|(uplink_name, _)| state.inner.upstreams.get(uplink_name)) + .collect() +} + +/// Outcome of walking the uplink fallback chain for a tarball. +enum TarballFetch<'a> { + /// `upstream` returned the tarball; the caller streams `response` and + /// keys the cache-write decision off `upstream.caches()`. + Ok(&'a Upstream, reqwest::Response), + /// Every uplink answered `404` — the tarball isn't anywhere in the chain. + NotFound, + /// No uplink served it and at least one errored. Surfaced rather than a + /// `404` so a transient failure isn't reported as "gone". + Err(RegistryError), +} + +/// Try each uplink in `upstreams` in order until one returns the tarball. +/// A `NotFound` falls through to the next uplink (a private uplink later in +/// the chain may host it); an *availability* error (transport, circuit, 5xx) +/// is remembered and the next uplink tried. An authoritative upstream error +/// (401/403/other hard 4xx) stops the walk immediately — see +/// [`RegistryError::allows_uplink_fallthrough`] — so a later mirror can't +/// mask the primary's rejection. After the chain is exhausted, an error (if +/// any was seen) takes precedence over `NotFound`, so a tarball that a +/// momentarily failing uplink really hosts isn't masked as a hard 404. +async fn fetch_tarball_with_fallback<'a>( + upstreams: &[&'a Upstream], + name: &PackageName, + filename: &str, +) -> TarballFetch<'a> { + let mut last_err = None; + for upstream in upstreams { + match upstream.fetch_tarball_response(name, filename).await { + Ok(FetchOutcome::Ok(response)) => return TarballFetch::Ok(upstream, response), + Ok(FetchOutcome::NotFound) => continue, + Err(err) => { + tracing::warn!(?err, package = %name.as_str(), %filename, uplink = upstream.name(), "upstream tarball fetch failed"); + if !err.allows_uplink_fallthrough() { + // Hard upstream rejection — surface it rather than letting + // a later uplink answer for a tarball this one refused. + return TarballFetch::Err(err); + } + last_err = Some(err); + } + } + } + match last_err { + Some(err) => TarballFetch::Err(err), + None => TarballFetch::NotFound, + } } /// Result of loading the packument for a package — either bytes (raw, @@ -2630,7 +2695,8 @@ async fn load_packument_bytes(state: &AppState, name: &PackageName) -> Packument } } - let Some(upstream) = resolve_upstream(state, name) else { + let upstreams = resolve_upstreams(state, name); + if upstreams.is_empty() { // Nothing published and no upstream to proxy. The only thing // left is a leftover cache entry (e.g. a `proxy:` rule was // removed after the package was mirrored). @@ -2644,100 +2710,164 @@ async fn load_packument_bytes(state: &AppState, name: &PackageName) -> Packument Ok(None) => PackumentLoad::NotFound, Err(err) => PackumentLoad::Err(err), }; - }; + } // Freshness window for the proxy cache: a cached packument younger // than `ttl` is served straight from disk; older than `ttl` it's - // "stale" and revalidated against the upstream below. Lower = newer + // "stale" and revalidated against an upstream below. Lower = newer // versions surface sooner but more upstream traffic; higher = the // reverse. The conditional GET on the stale path keeps a high `ttl` // cheap (a `304` refreshes the entry without re-downloading it). // - // The uplink's per-uplink `maxage` (verdaccio) wins when set; - // otherwise the global `packument_ttl` (the `--packument-ttl-secs` - // flag) applies. - let ttl = upstream.maxage().unwrap_or(state.inner.config.packument_ttl); + // The TTL is a per-package cache property decided before any uplink is + // contacted, so it comes from the primary (first) uplink's `maxage` + // (verdaccio's knob) when it sets one; otherwise the global + // `packument_ttl` (the `--packument-ttl-secs` flag) applies. Only the + // primary governs freshness — a secondary's `maxage` must not control the + // shared cache it rarely fills. + let ttl = upstreams + .first() + .and_then(|upstream| upstream.maxage()) + .unwrap_or(state.inner.config.packument_ttl); // A fresh entry serves immediately (and moves its bytes out — a // packument can be multiple MB). A stale entry yields only its - // validators; its body stays on disk until a `304`/error path below - // actually needs it, so the common stale→`200` refresh never reads it. + // per-uplink validators; its body stays on disk until a `304`/error + // path below actually needs it, so the common stale→`200` refresh never + // reads it. let validators = match state.inner.storage.read_cached_packument_entry(name, ttl).await { Ok(Some(CachedPackument::Fresh(bytes))) => { record_cache_status("hit"); return PackumentLoad::Ok(bytes); } Ok(Some(CachedPackument::Stale(validators))) => validators, - Ok(None) => CacheValidators::default(), + Ok(None) => ValidatorsByUplink::default(), Err(err) => { tracing::warn!(?err, package = %name.as_str(), "cache read failed"); - CacheValidators::default() + ValidatorsByUplink::default() } }; - // Revalidate conditionally when we hold a stale copy: the upstream - // can answer `304` and save us re-downloading an unchanged packument. - match upstream.fetch_packument(name, &validators).await { - Ok(PackumentFetch::Modified(fetched)) => { - store_fetched_packument(state, name, fetched).await - } - // `304` confirmed our stale copy is current: read it now (deferred - // until here), re-write it to bump the cache mtime so it's fresh - // again until the next TTL window, and serve it. - Ok(PackumentFetch::NotModified) => { - match state.inner.storage.read_cached_packument(name).await { - Ok(Some(bytes)) => { - if let Err(err) = - state.inner.storage.write_cached_packument(name, &bytes, &validators).await - { - tracing::warn!(?err, package = %name.as_str(), "packument cache refresh failed"); - } - record_cache_status("revalidated"); - PackumentLoad::Ok(bytes) - } - // The body vanished between the freshness check and this read - // (cache wiped concurrently). The upstream just confirmed the - // package exists, so re-fetch it unconditionally and self-heal - // rather than 404-ing a present package. - Ok(None) => match upstream.fetch_packument(name, &CacheValidators::default()).await - { - Ok(PackumentFetch::Modified(fetched)) => { - store_fetched_packument(state, name, fetched).await - } - Ok(_) => PackumentLoad::NotFound, - Err(err) => PackumentLoad::Err(err), - }, - Err(err) => PackumentLoad::Err(err), + // Walk the uplink fallback chain in declared order. The first uplink to + // return a body (`Modified`) or confirm our cache (`NotModified`) wins; + // a `NotFound` falls through to the next (a private uplink later in the + // chain may host the package); an *availability* error (transport, + // circuit, 5xx) is remembered and the next uplink tried. An authoritative + // upstream error (401/403/other hard 4xx) stops the walk immediately — + // see [`RegistryError::allows_uplink_fallthrough`] — so a later public + // mirror can never mask the primary's rejection of a scoped package. Each + // uplink is sent only *its own* cached validators — an `ETag` is + // origin-specific, so replaying one uplink's against another could + // spuriously `304` and serve stale data. + let mut last_err = None; + for upstream in &upstreams { + // Revalidate conditionally when we hold this uplink's validators: + // the upstream can answer `304` and save re-downloading an unchanged + // packument. + let uplink_validators = validators.get(upstream.name()); + match upstream.fetch_packument(name, &uplink_validators).await { + Ok(PackumentFetch::Modified(fetched)) => { + return store_fetched_packument(state, name, upstream.name(), fetched).await; } - } - Ok(PackumentFetch::NotFound) => PackumentLoad::NotFound, - Err(err) => { - tracing::warn!(?err, package = %name.as_str(), "upstream packument fetch failed"); - match state.inner.storage.read_cached_packument(name).await { - Ok(Some(bytes)) => { - record_cache_status("stale"); - PackumentLoad::Ok(bytes) + Ok(PackumentFetch::NotModified) => { + return serve_revalidated(state, name, upstream, &uplink_validators).await; + } + Ok(PackumentFetch::NotFound) => continue, + Err(err) => { + tracing::warn!(?err, package = %name.as_str(), uplink = upstream.name(), "upstream packument fetch failed"); + if !err.allows_uplink_fallthrough() { + // Hard upstream rejection (auth/throttle/other 4xx). Surface + // it immediately: don't try later uplinks (which could + // answer for a package this one authoritatively refused) and + // don't fall back to a stale cached body either — serving + // cache here would mask an authoritative denial and keep + // handing out content the upstream is now refusing. + return PackumentLoad::Err(err); } - // No cache to fall back on: surface the upstream failure. - Ok(None) => PackumentLoad::Err(err), - // The cache itself is unreadable: surface that I/O error - // rather than the upstream one — it's the more actionable - // failure when both go wrong. - Err(cache_err) => PackumentLoad::Err(cache_err), + last_err = Some(err); } } } + + // Chain exhausted. If any uplink errored, fall back to a stale cached + // body — a transient failure must never be reported as an authoritative + // 404 the client would cache as "gone". If every uplink answered a clean + // `NotFound`, the package genuinely isn't anywhere in the chain. + match last_err { + Some(err) => serve_stale_or_error(state, name, err).await, + None => PackumentLoad::NotFound, + } } -/// Persist a freshly fetched packument to the proxy cache and return it, -/// tagging the access record as a `miss`. A cache-write failure is logged -/// but not fatal — the fetched bytes are still served. +/// Handle a `304 Not Modified` from `upstream`: the cached body is still +/// current, so read it (deferred until now), re-write it to bump the cache +/// mtime and re-record this uplink's validators, and serve it +/// (`revalidated`). If the body vanished between the freshness check and +/// this read (cache wiped concurrently), re-fetch it unconditionally from +/// the same uplink and self-heal rather than 404-ing a present package. +async fn serve_revalidated( + state: &AppState, + name: &PackageName, + upstream: &Upstream, + validators: &CacheValidators, +) -> PackumentLoad { + match state.inner.storage.read_cached_packument(name).await { + Ok(Some(bytes)) => { + if let Err(err) = state + .inner + .storage + .write_cached_packument(name, &bytes, upstream.name(), validators) + .await + { + tracing::warn!(?err, package = %name.as_str(), "packument cache refresh failed"); + } + record_cache_status("revalidated"); + PackumentLoad::Ok(bytes) + } + Ok(None) => match upstream.fetch_packument(name, &CacheValidators::default()).await { + Ok(PackumentFetch::Modified(fetched)) => { + store_fetched_packument(state, name, upstream.name(), fetched).await + } + Ok(_) => PackumentLoad::NotFound, + Err(err) => PackumentLoad::Err(err), + }, + Err(err) => PackumentLoad::Err(err), + } +} + +/// Every uplink in the chain failed. Fall back to a stale cached body if one +/// exists (tagging the access `stale`); otherwise surface the upstream +/// error — unless the cache read itself failed, in which case that I/O error +/// is the more actionable one to surface. +async fn serve_stale_or_error( + state: &AppState, + name: &PackageName, + err: RegistryError, +) -> PackumentLoad { + match state.inner.storage.read_cached_packument(name).await { + Ok(Some(bytes)) => { + record_cache_status("stale"); + PackumentLoad::Ok(bytes) + } + Ok(None) => PackumentLoad::Err(err), + Err(cache_err) => PackumentLoad::Err(cache_err), + } +} + +/// Persist a freshly fetched packument to the proxy cache (recording +/// `uplink_name`'s validators in the package's per-uplink validator map) +/// and return it, tagging the access record as a `miss`. A cache-write +/// failure is logged but not fatal — the fetched bytes are still served. async fn store_fetched_packument( state: &AppState, name: &PackageName, + uplink_name: &str, fetched: FetchedPackument, ) -> PackumentLoad { - if let Err(err) = - state.inner.storage.write_cached_packument(name, &fetched.bytes, &fetched.validators).await + if let Err(err) = state + .inner + .storage + .write_cached_packument(name, &fetched.bytes, uplink_name, &fetched.validators) + .await { tracing::warn!(?err, package = %name.as_str(), "packument cache write failed"); } @@ -2754,8 +2884,8 @@ async fn store_fetched_packument( /// * `revalidated` — entry was stale; the upstream answered `304 Not /// Modified`, so the cached body was reused. /// * `miss` — fetched a fresh body from the upstream. -/// * `stale` — upstream was unreachable; a stale cached body was served -/// as a fallback. +/// * `stale` — every uplink in the chain errored; a stale cached body was +/// served as a fallback. /// * `orphaned` — a leftover mirror served with no upstream left to /// revalidate against (its `proxy:` rule was removed after the package /// was mirrored). Served regardless of age, so distinct from `hit`. diff --git a/pnpr/crates/pnpr/src/storage.rs b/pnpr/crates/pnpr/src/storage.rs index 04bbf1fc1c..9b3193396d 100644 --- a/pnpr/crates/pnpr/src/storage.rs +++ b/pnpr/crates/pnpr/src/storage.rs @@ -4,7 +4,7 @@ use crate::{ package_name::PackageName, s3::S3Store, streaming, - upstream::CacheValidators, + upstream::{CacheValidators, ValidatorsByUplink}, }; use axum::body::Body; use serde::{Deserialize, Serialize}; @@ -151,14 +151,15 @@ impl Drop for TarballWrite { /// /// * `Fresh` — within the TTL; the body is read and ready to serve. /// * `Stale` — past the TTL; only the small conditional-GET validators -/// are loaded. The body is left on disk and pulled on demand by the -/// caller (via [`Storage::read_cached_packument`]) only if the upstream -/// answers `304` or is unreachable — the common stale→`200` refresh -/// discards the old body, so it's never read. +/// (one set per uplink in the package's fallback chain) are loaded. The +/// body is left on disk and pulled on demand by the caller (via +/// [`Storage::read_cached_packument`]) only if the upstream answers `304` +/// or is unreachable — the common stale→`200` refresh discards the old +/// body, so it's never read. #[derive(Debug)] pub enum CachedPackument { Fresh(Vec), - Stale(CacheValidators), + Stale(ValidatorsByUplink), } #[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] @@ -386,17 +387,29 @@ impl Storage { self.cached.read_packument_any_age(name).await } - /// Write a cached upstream packument and its validators, refreshing - /// the entry's freshness. Called both on a fresh upstream body and - /// on a `304` revalidation (re-written with the unchanged bytes to - /// bump the cache mtime). + /// Write a cached upstream packument, recording **only** `uplink`'s + /// validators (the origin of these bytes) and refreshing the entry's + /// freshness. Called both on a fresh upstream body and on a `304` + /// revalidation (re-written with the unchanged bytes to bump the mtime). + /// + /// The validator map is *replaced*, not merged: the cache holds a single + /// shared packument body, so it must carry validators for exactly the + /// uplink that fetched that body. If validators from other uplinks + /// survived here, a later `304` from one of them would revalidate *these* + /// bytes — which that uplink never served — and we'd keep serving another + /// origin's body under its confirmation. Scoping validators to the body's + /// origin guarantees a conditional GET only ever goes to that origin, so a + /// `304` can only confirm the body actually on disk. pub async fn write_cached_packument( &self, name: &PackageName, bytes: &[u8], + uplink: &str, validators: &CacheValidators, ) -> Result<()> { - self.cached.write_packument_with_meta(name, bytes, validators).await + let mut map = ValidatorsByUplink::default(); + map.set(uplink, validators.clone()); + self.cached.write_packument_with_meta(name, bytes, &map).await } /// Create and open a per-request temp file for a proxied tarball. @@ -497,13 +510,14 @@ impl Store { } } - /// Best-effort read of the validator sidecar. A missing, unreadable, - /// or malformed sidecar yields empty validators so the next refresh - /// falls back to an unconditional GET rather than failing. - async fn read_validators(&self, name: &PackageName) -> CacheValidators { + /// Best-effort read of the validator sidecar. A missing, unreadable, or + /// malformed sidecar — including an older single-object sidecar written + /// before the per-uplink map shape — yields an empty map so the next + /// refresh falls back to an unconditional GET rather than failing. + async fn read_validators(&self, name: &PackageName) -> ValidatorsByUplink { match fs::read(self.packument_meta_path(name)).await { Ok(bytes) => serde_json::from_slice(&bytes).unwrap_or_default(), - Err(_) => CacheValidators::default(), + Err(_) => ValidatorsByUplink::default(), } } @@ -530,7 +544,7 @@ impl Store { &self, name: &PackageName, bytes: &[u8], - validators: &CacheValidators, + validators: &ValidatorsByUplink, ) -> Result<()> { write_atomic(&self.packument_path(name), bytes).await?; let meta_path = self.packument_meta_path(name); diff --git a/pnpr/crates/pnpr/src/storage/tests.rs b/pnpr/crates/pnpr/src/storage/tests.rs index 1f5cbd4d30..c81cf4c8d8 100644 --- a/pnpr/crates/pnpr/src/storage/tests.rs +++ b/pnpr/crates/pnpr/src/storage/tests.rs @@ -25,12 +25,15 @@ fn tarball_sidecar_path(tmp: &TempDir, name: &str, filename: &str) -> PathBuf { tmp.path().join("cache").join(name).join(format!("{filename}{TARBALL_INTEGRITY_SUFFIX}")) } -/// Read an entry back as `Stale` so its validators can be inspected. The -/// write is aged past a 1ms TTL with a short sleep. +/// The uplink name the cache tests key their validators under. +const UPLINK: &str = "npmjs"; + +/// Read an entry back as `Stale` and return `UPLINK`'s validators so they +/// can be inspected. The write is aged past a 1ms TTL with a short sleep. async fn read_stale_validators(storage: &Storage, name: &PackageName) -> CacheValidators { tokio::time::sleep(Duration::from_millis(20)).await; match storage.read_cached_packument_entry(name, Duration::from_millis(1)).await.unwrap() { - Some(CachedPackument::Stale(validators)) => validators, + Some(CachedPackument::Stale(validators)) => validators.get(UPLINK), other => panic!("expected a stale entry, got {other:?}"), } } @@ -42,7 +45,10 @@ async fn fresh_entry_returns_its_body() { let name = pkg("foo"); let body = br#"{"name":"foo"}"#; - storage.write_cached_packument(&name, body, &validators(Some(r#""abc""#), None)).await.unwrap(); + storage + .write_cached_packument(&name, body, UPLINK, &validators(Some(r#""abc""#), None)) + .await + .unwrap(); match storage.read_cached_packument_entry(&name, Duration::from_mins(1)).await.unwrap() { Some(CachedPackument::Fresh(bytes)) => assert_eq!(bytes, body), @@ -60,6 +66,7 @@ async fn stale_entry_returns_its_validators() { .write_cached_packument( &name, b"{}", + UPLINK, &validators(Some(r#""abc""#), Some("Wed, 21 Oct 2015 07:28:00 GMT")), ) .await @@ -85,17 +92,62 @@ async fn empty_validators_remove_a_previously_written_sidecar() { let storage = storage_in(&tmp); let name = pkg("foo"); - storage.write_cached_packument(&name, b"{}", &validators(Some(r#""v1""#), None)).await.unwrap(); + storage + .write_cached_packument(&name, b"{}", UPLINK, &validators(Some(r#""v1""#), None)) + .await + .unwrap(); assert!(sidecar_path(&tmp, "foo").exists(), "validators write a sidecar"); // A later refresh whose upstream sends no validators must drop the // stale sidecar so the next read can't replay an outdated ETag. - storage.write_cached_packument(&name, b"{}", &CacheValidators::default()).await.unwrap(); + storage + .write_cached_packument(&name, b"{}", UPLINK, &CacheValidators::default()) + .await + .unwrap(); assert!(!sidecar_path(&tmp, "foo").exists(), "empty validators remove the sidecar"); assert!(read_stale_validators(&storage, &name).await.is_empty()); } +#[tokio::test] +async fn writing_a_new_body_scopes_validators_to_its_origin_uplink() { + let tmp = TempDir::new().unwrap(); + let storage = storage_in(&tmp); + let name = pkg("foo"); + + // The primary fills the cache, then a secondary replaces the shared body. + storage + .write_cached_packument( + &name, + br#"{"v":"a"}"#, + "primary", + &validators(Some(r#""a""#), None), + ) + .await + .unwrap(); + storage + .write_cached_packument( + &name, + br#"{"v":"b"}"#, + "secondary", + &validators(Some(r#""b""#), None), + ) + .await + .unwrap(); + + tokio::time::sleep(Duration::from_millis(20)).await; + let map = + match storage.read_cached_packument_entry(&name, Duration::from_millis(1)).await.unwrap() { + Some(CachedPackument::Stale(map)) => map, + other => panic!("expected a stale entry, got {other:?}"), + }; + // Only the body's origin keeps validators: the primary's are dropped, so a + // later refresh sends the primary an unconditional GET and a 304 can only + // come from the secondary — the uplink that actually wrote the body. + assert_eq!(map.get("secondary").etag.as_deref(), Some(r#""b""#)); + assert!(map.get("primary").is_empty()); +} + #[tokio::test] async fn writing_without_validators_is_a_noop_when_no_sidecar_exists() { let tmp = TempDir::new().unwrap(); @@ -104,7 +156,10 @@ async fn writing_without_validators_is_a_noop_when_no_sidecar_exists() { // First write already carries no validators: removing the (absent) // sidecar must be a benign no-op, not an error. - storage.write_cached_packument(&name, b"{}", &CacheValidators::default()).await.unwrap(); + storage + .write_cached_packument(&name, b"{}", UPLINK, &CacheValidators::default()) + .await + .unwrap(); assert!(!sidecar_path(&tmp, "foo").exists()); assert!(read_stale_validators(&storage, &name).await.is_empty()); @@ -115,7 +170,10 @@ async fn malformed_sidecar_reads_as_empty_validators() { let tmp = TempDir::new().unwrap(); let storage = storage_in(&tmp); let name = pkg("foo"); - storage.write_cached_packument(&name, b"{}", &validators(Some(r#""v1""#), None)).await.unwrap(); + storage + .write_cached_packument(&name, b"{}", UPLINK, &validators(Some(r#""v1""#), None)) + .await + .unwrap(); // A damaged sidecar must degrade to empty validators (forcing an // unconditional refresh) rather than failing the read. @@ -130,7 +188,7 @@ async fn read_cached_packument_returns_bytes_regardless_of_age() { let storage = storage_in(&tmp); let name = pkg("foo"); storage - .write_cached_packument(&name, br#"{"v":1}"#, &CacheValidators::default()) + .write_cached_packument(&name, br#"{"v":1}"#, UPLINK, &CacheValidators::default()) .await .unwrap(); diff --git a/pnpr/crates/pnpr/src/upstream.rs b/pnpr/crates/pnpr/src/upstream.rs index f091a461d9..38844cca57 100644 --- a/pnpr/crates/pnpr/src/upstream.rs +++ b/pnpr/crates/pnpr/src/upstream.rs @@ -4,6 +4,7 @@ use crate::{ package_name::PackageName, }; use chrono::{DateTime, Timelike, Utc}; +use indexmap::IndexMap; use pacquet_network::ThrottledClient; use reqwest::{ StatusCode, @@ -184,6 +185,53 @@ impl CacheValidators { } } +/// The conditional-GET validators a package's cached packument carries, +/// keyed by uplink name. A `proxy:` rule can list several uplinks as a +/// fallback chain, and each upstream's `ETag`/`Last-Modified` is its own — +/// replaying one origin's validators against another risks a spurious +/// `304` (another origin's body served under that confirmation). A refresh +/// therefore sends each uplink only [`Self::get`]'s entry for it. +/// +/// In practice the cache holds a single shared packument body, so +/// [`crate::storage::Storage::write_cached_packument`] keeps validators for +/// exactly the uplink that fetched that body — the map carries at most one +/// entry (the body's origin), and every other uplink resolves to empty +/// validators (an unconditional GET). Modelling it as a map keeps the +/// `get`/`set` plumbing origin-agnostic and tolerates a stray multi-entry +/// sidecar from a future change without ever sending the wrong origin's +/// validators. +/// +/// Persisted as the packument's validator sidecar (see [`crate::storage`]), +/// a JSON object `{ "": { "etag": ..., "last_modified": ... } }`. +/// An older single-object sidecar (a bare [`CacheValidators`]) fails to +/// deserialize into this shape; the storage layer degrades that to an empty +/// map, costing one unconditional refetch after an upgrade. +#[derive(Debug, Default, Clone, Serialize, Deserialize)] +pub struct ValidatorsByUplink(IndexMap); + +impl ValidatorsByUplink { + pub fn is_empty(&self) -> bool { + self.0.is_empty() + } + + /// The validators recorded for `uplink`, or empty defaults when there + /// are none — the signal for an unconditional GET to that uplink. + pub fn get(&self, uplink: &str) -> CacheValidators { + self.0.get(uplink).cloned().unwrap_or_default() + } + + /// Record (replacing) one uplink's validators. Empty validators clear + /// the entry, so a later read can't replay an `ETag` the upstream no + /// longer sends. + pub fn set(&mut self, uplink: &str, validators: CacheValidators) { + if validators.is_empty() { + self.0.shift_remove(uplink); + } else { + self.0.insert(uplink.to_string(), validators); + } + } +} + /// A packument fetched (or revalidated) against an upstream. #[derive(Debug)] pub struct FetchedPackument { @@ -221,6 +269,13 @@ impl Upstream { } } + /// The configured uplink name (the YAML `uplinks:` key). Used to key + /// this uplink's entry in the cache's per-uplink validator map + /// ([`ValidatorsByUplink`]). + pub fn name(&self) -> &str { + &self.name + } + /// Per-uplink packument freshness window (`maxage`), or `None` to /// defer to the global [`crate::config::Config::packument_ttl`]. pub fn maxage(&self) -> Option { diff --git a/pnpr/crates/pnpr/tests/server.rs b/pnpr/crates/pnpr/tests/server.rs index 4d18110de5..a2380e385e 100644 --- a/pnpr/crates/pnpr/tests/server.rs +++ b/pnpr/crates/pnpr/tests/server.rs @@ -4,7 +4,7 @@ use axum::{ }; use flate2::read::GzDecoder; use futures_util::stream; -use pnpr::{AuthState, Config, router, router_with_auth}; +use pnpr::{AccessSpec, AuthState, Config, PackageAccess, router, router_with_auth}; use serde_json::{Value, json}; use ssri::{Algorithm, IntegrityOpts}; use std::{ @@ -35,6 +35,24 @@ fn config_for(upstream: &str, storage: PathBuf) -> Config { config } +/// A proxy config whose `**` rule lists two uplinks — `npmjs` (primary) +/// then `private` (secondary) — as an ordered fallback chain, so the +/// registry tries `primary` first and falls through to `secondary`. +fn config_for_two(primary: &str, secondary: &str, storage: PathBuf) -> Config { + let mut config = config_for(primary, storage); + let mut secondary_uplink = config.uplinks.get("npmjs").expect("default `npmjs` uplink").clone(); + secondary_uplink.url = secondary.to_string(); + config.uplinks.insert("private".to_string(), secondary_uplink); + config.packages.insert( + "**".to_string(), + PackageAccess { + proxy: Some(AccessSpec::Many(vec!["npmjs".to_string(), "private".to_string()])), + ..Default::default() + }, + ); + config +} + async fn body_bytes(body: Body) -> Vec { to_bytes(body, usize::MAX).await.expect("read body").to_vec() } @@ -640,6 +658,242 @@ async fn tarball_is_proxied_and_cached() { mock.assert_async().await; } +/// Builds a packument whose single `1.0.0` version points its tarball at +/// `upstream_url`. Used by the multi-uplink tests, which need to control the +/// upstream the packument is served from independently of the helper that +/// also installs a mock. +fn packument_json(upstream_url: &str) -> Value { + json!({ + "name": "foo", + "dist-tags": { "latest": "1.0.0" }, + "versions": { + "1.0.0": { + "name": "foo", + "version": "1.0.0", + "dist": { + "tarball": format!("{upstream_url}/foo/-/foo-1.0.0.tgz"), + "integrity": "sha512-deadbeef", + }, + }, + }, + }) +} + +#[tokio::test] +async fn packument_falls_back_to_secondary_uplink_on_primary_error() { + let mut primary = mockito::Server::new_async().await; + let mut secondary = mockito::Server::new_async().await; + // Primary errors (5xx) once; the registry must fall through to the + // secondary rather than surfacing the error. + let primary_mock = primary.mock("GET", "/foo").with_status(500).expect(1).create_async().await; + let secondary_mock = secondary + .mock("GET", "/foo") + .with_status(200) + .with_header("content-type", "application/json") + .with_body(packument_json(&secondary.url()).to_string()) + .expect(1) + .create_async() + .await; + + let tmp = TempDir::new().unwrap(); + let app = router(config_for_two(&primary.url(), &secondary.url(), tmp.path().to_path_buf())); + let response = app.oneshot(Request::get("/foo").body(Body::empty()).unwrap()).await.unwrap(); + assert_eq!(response.status(), StatusCode::OK); + let body: Value = serde_json::from_slice(&body_bytes(response.into_body()).await).unwrap(); + assert_eq!(body["versions"]["1.0.0"]["version"], "1.0.0"); + + primary_mock.assert_async().await; + secondary_mock.assert_async().await; +} + +#[tokio::test] +async fn packument_falls_back_to_secondary_uplink_on_primary_404() { + let mut primary = mockito::Server::new_async().await; + let mut secondary = mockito::Server::new_async().await; + // A 404 from the primary is not authoritative across the chain: a + // private uplink later in the list may still host the package. + let primary_mock = primary.mock("GET", "/foo").with_status(404).expect(1).create_async().await; + let secondary_mock = secondary + .mock("GET", "/foo") + .with_status(200) + .with_header("content-type", "application/json") + .with_body(packument_json(&secondary.url()).to_string()) + .expect(1) + .create_async() + .await; + + let tmp = TempDir::new().unwrap(); + let app = router(config_for_two(&primary.url(), &secondary.url(), tmp.path().to_path_buf())); + let response = app.oneshot(Request::get("/foo").body(Body::empty()).unwrap()).await.unwrap(); + assert_eq!(response.status(), StatusCode::OK); + let body: Value = serde_json::from_slice(&body_bytes(response.into_body()).await).unwrap(); + assert_eq!(body["versions"]["1.0.0"]["version"], "1.0.0"); + + primary_mock.assert_async().await; + secondary_mock.assert_async().await; +} + +#[tokio::test] +async fn packument_does_not_fall_through_on_primary_hard_4xx() { + let mut primary = mockito::Server::new_async().await; + let mut secondary = mockito::Server::new_async().await; + // A hard authoritative rejection (403) from the primary must not be + // masked by a later uplink: the secondary is never contacted, and the + // rejection surfaces as a gateway error rather than the secondary's body. + let primary_mock = primary.mock("GET", "/foo").with_status(403).expect(1).create_async().await; + let secondary_mock = secondary + .mock("GET", "/foo") + .with_status(200) + .with_header("content-type", "application/json") + .with_body(packument_json(&secondary.url()).to_string()) + .expect(0) + .create_async() + .await; + + let tmp = TempDir::new().unwrap(); + let app = router(config_for_two(&primary.url(), &secondary.url(), tmp.path().to_path_buf())); + let response = app.oneshot(Request::get("/foo").body(Body::empty()).unwrap()).await.unwrap(); + assert_eq!(response.status(), StatusCode::BAD_GATEWAY); + + primary_mock.assert_async().await; + // `expect(0)` asserts the secondary was never reached. + secondary_mock.assert_async().await; +} + +#[tokio::test] +async fn packument_hard_4xx_does_not_serve_stale_cache() { + // Warm the cache from a live primary, then make the primary return a hard + // 403: the authoritative rejection must surface as a gateway error, not be + // masked by the stale cached body. + let mut warm_primary = mockito::Server::new_async().await; + let warm_mock = warm_primary + .mock("GET", "/foo") + .with_status(200) + .with_header("content-type", "application/json") + .with_body(packument_json(&warm_primary.url()).to_string()) + .expect(1) + .create_async() + .await; + + let tmp = TempDir::new().unwrap(); + let storage = tmp.path().to_path_buf(); + + let secondary = mockito::Server::new_async().await; + let warm = router(config_for_two(&warm_primary.url(), &secondary.url(), storage.clone())); + let first = warm.oneshot(Request::get("/foo").body(Body::empty()).unwrap()).await.unwrap(); + assert_eq!(first.status(), StatusCode::OK); + warm_mock.assert_async().await; + + // Primary now rejects with 403; the secondary must never be consulted and + // the stale cache must not be served. + tokio::time::sleep(Duration::from_millis(20)).await; + let mut denying_primary = mockito::Server::new_async().await; + let deny_mock = + denying_primary.mock("GET", "/foo").with_status(403).expect(1).create_async().await; + let mut secondary = mockito::Server::new_async().await; + let secondary_mock = secondary + .mock("GET", "/foo") + .with_status(200) + .with_header("content-type", "application/json") + .with_body(packument_json(&secondary.url()).to_string()) + .expect(0) + .create_async() + .await; + + let mut config = config_for_two(&denying_primary.url(), &secondary.url(), storage); + config.packument_ttl = Duration::from_millis(1); + let response = + router(config).oneshot(Request::get("/foo").body(Body::empty()).unwrap()).await.unwrap(); + assert_eq!(response.status(), StatusCode::BAD_GATEWAY); + + deny_mock.assert_async().await; + secondary_mock.assert_async().await; +} + +#[tokio::test] +async fn packument_is_not_found_when_all_uplinks_404() { + let mut primary = mockito::Server::new_async().await; + let mut secondary = mockito::Server::new_async().await; + let primary_mock = primary.mock("GET", "/foo").with_status(404).expect(1).create_async().await; + let secondary_mock = + secondary.mock("GET", "/foo").with_status(404).expect(1).create_async().await; + + let tmp = TempDir::new().unwrap(); + let app = router(config_for_two(&primary.url(), &secondary.url(), tmp.path().to_path_buf())); + let response = app.oneshot(Request::get("/foo").body(Body::empty()).unwrap()).await.unwrap(); + assert_eq!(response.status(), StatusCode::NOT_FOUND); + + primary_mock.assert_async().await; + secondary_mock.assert_async().await; +} + +#[tokio::test] +async fn tarball_falls_back_to_secondary_uplink() { + let mut primary = mockito::Server::new_async().await; + let mut secondary = mockito::Server::new_async().await; + let bytes = b"fallback-tarball-bytes"; + // The primary serves the packument (so version + integrity resolve) but + // 404s the tarball; the secondary holds the actual bytes. + let packument_mock = mock_packument_for_tarball(&mut primary, "foo", "1.0.0", bytes).await; + let primary_tarball = + primary.mock("GET", "/foo/-/foo-1.0.0.tgz").with_status(404).expect(1).create_async().await; + let secondary_tarball = secondary + .mock("GET", "/foo/-/foo-1.0.0.tgz") + .with_status(200) + .with_header("content-type", "application/octet-stream") + .with_body(bytes) + .expect(1) + .create_async() + .await; + + let tmp = TempDir::new().unwrap(); + let app = router(config_for_two(&primary.url(), &secondary.url(), tmp.path().to_path_buf())); + let response = app + .oneshot(Request::get("/foo/-/foo-1.0.0.tgz").body(Body::empty()).unwrap()) + .await + .unwrap(); + assert_eq!(response.status(), StatusCode::OK); + assert_eq!(body_bytes(response.into_body()).await, bytes); + + packument_mock.assert_async().await; + primary_tarball.assert_async().await; + secondary_tarball.assert_async().await; +} + +#[tokio::test] +async fn packument_serves_stale_cache_when_all_uplinks_fail() { + let mut primary = mockito::Server::new_async().await; + let secondary = mockito::Server::new_async().await; + let packument_mock = primary + .mock("GET", "/foo") + .with_status(200) + .with_header("content-type", "application/json") + .with_body(packument_json(&primary.url()).to_string()) + .expect(1) + .create_async() + .await; + + let tmp = TempDir::new().unwrap(); + let storage = tmp.path().to_path_buf(); + + // Populate the cache from the live uplinks. + let warm = router(config_for_two(&primary.url(), &secondary.url(), storage.clone())); + let first = warm.oneshot(Request::get("/foo").body(Body::empty()).unwrap()).await.unwrap(); + assert_eq!(first.status(), StatusCode::OK); + packument_mock.assert_async().await; + + // Both uplinks now unreachable and the cache is past its TTL: the stale + // body must still be served rather than erroring. + tokio::time::sleep(Duration::from_millis(20)).await; + let mut dead = config_for_two("http://127.0.0.1:1", "http://127.0.0.1:1", storage); + dead.packument_ttl = Duration::from_millis(1); + let stale = + router(dead).oneshot(Request::get("/foo").body(Body::empty()).unwrap()).await.unwrap(); + assert_eq!(stale.status(), StatusCode::OK); + let body: Value = serde_json::from_slice(&body_bytes(stale.into_body()).await).unwrap(); + assert_eq!(body["versions"]["1.0.0"]["version"], "1.0.0"); +} + #[tokio::test] async fn osv_refuses_vulnerable_tarball_before_upstream_fetch() { let mut upstream = mockito::Server::new_async().await;