mirror of
https://github.com/pnpm/pnpm.git
synced 2026-06-28 01:45:30 -04:00
fix: route benchmark registry tarballs through proxy (#12257)
This commit is contained in:
@@ -25,7 +25,7 @@
|
||||
|
||||
use std::{
|
||||
io::{Read as _, Write as _},
|
||||
net::{Shutdown, SocketAddr, TcpListener, TcpStream},
|
||||
net::{Ipv4Addr, Shutdown, SocketAddr, TcpListener, TcpStream},
|
||||
sync::{
|
||||
Arc,
|
||||
atomic::{AtomicBool, Ordering},
|
||||
@@ -59,7 +59,16 @@ impl LatencyProxy {
|
||||
/// direction. Returns the local address callers should connect to
|
||||
/// instead of `upstream`.
|
||||
pub fn spawn(upstream: SocketAddr, profile: LinkProfile) -> std::io::Result<LatencyProxy> {
|
||||
let listener = TcpListener::bind(("127.0.0.1", 0))?;
|
||||
Self::spawn_on(SocketAddr::from((Ipv4Addr::LOCALHOST, 0)), upstream, profile)
|
||||
}
|
||||
|
||||
/// Front `upstream` with a proxy bound to `listen`.
|
||||
pub fn spawn_on(
|
||||
listen: SocketAddr,
|
||||
upstream: SocketAddr,
|
||||
profile: LinkProfile,
|
||||
) -> std::io::Result<LatencyProxy> {
|
||||
let listener = TcpListener::bind(listen)?;
|
||||
let addr = listener.local_addr()?;
|
||||
listener.set_nonblocking(true)?;
|
||||
|
||||
@@ -72,6 +81,18 @@ impl LatencyProxy {
|
||||
}
|
||||
}
|
||||
|
||||
/// Convert a megabits-per-second figure into a bytes-per-second cap for
|
||||
/// [`LinkProfile`], or `None` for a non-positive / non-finite value (no
|
||||
/// cap). 1 Mbit/s = 1_000_000 bits/s = 125_000 bytes/s. A positive rate
|
||||
/// never collapses to `Some(0)`: a 0 byte/s cap would stall the proxy
|
||||
/// (the pacing math divides by the rate), so it's floored at 1 byte/s.
|
||||
pub fn mbps_to_bytes_per_sec(mbps: f64) -> Option<u64> {
|
||||
if !mbps.is_finite() || mbps <= 0.0 {
|
||||
return None;
|
||||
}
|
||||
Some(((mbps * 125_000.0).round() as u64).max(1))
|
||||
}
|
||||
|
||||
impl Drop for LatencyProxy {
|
||||
fn drop(&mut self) {
|
||||
self.stop.store(true, Ordering::SeqCst);
|
||||
|
||||
@@ -1,4 +1,4 @@
|
||||
use super::{LatencyProxy, LinkProfile};
|
||||
use super::{LatencyProxy, LinkProfile, mbps_to_bytes_per_sec};
|
||||
use std::{
|
||||
io::{Read as _, Write as _},
|
||||
net::{TcpListener, TcpStream},
|
||||
@@ -6,6 +6,41 @@ use std::{
|
||||
time::{Duration, Instant},
|
||||
};
|
||||
|
||||
#[test]
|
||||
fn binds_to_requested_listen_addr() {
|
||||
let upstream = TcpListener::bind(("127.0.0.1", 0)).expect("bind upstream");
|
||||
let upstream_addr = upstream.local_addr().expect("upstream addr");
|
||||
thread::spawn(move || {
|
||||
let (mut socket, _) = upstream.accept().expect("accept");
|
||||
let mut buf = [0u8; 64];
|
||||
let read = socket.read(&mut buf).expect("read request");
|
||||
assert_eq!(&buf[..read], b"ping");
|
||||
socket.write_all(b"pong").expect("write reply");
|
||||
});
|
||||
|
||||
let reserved = TcpListener::bind(("127.0.0.1", 0)).expect("reserve listen port");
|
||||
let listen = reserved.local_addr().expect("listen addr");
|
||||
drop(reserved);
|
||||
|
||||
let profile = LinkProfile { one_way: Duration::ZERO, rate_limit: None };
|
||||
let proxy = LatencyProxy::spawn_on(listen, upstream_addr, profile).expect("spawn proxy");
|
||||
assert_eq!(proxy.addr, listen);
|
||||
|
||||
let mut client = TcpStream::connect(proxy.addr).expect("connect to proxy");
|
||||
client.write_all(b"ping").expect("send request");
|
||||
let mut reply = [0u8; 64];
|
||||
let read = client.read(&mut reply).expect("read reply");
|
||||
assert_eq!(&reply[..read], b"pong");
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn converts_mbps_to_bytes_per_second() {
|
||||
assert_eq!(mbps_to_bytes_per_sec(0.0), None);
|
||||
assert_eq!(mbps_to_bytes_per_sec(f64::NAN), None);
|
||||
assert_eq!(mbps_to_bytes_per_sec(8.0), Some(1_000_000));
|
||||
assert_eq!(mbps_to_bytes_per_sec(f64::MIN_POSITIVE), Some(1));
|
||||
}
|
||||
|
||||
/// A request → response exchange through the proxy pays the one-way
|
||||
/// delay in each direction, so its round trip is ≈ `2 × one_way`.
|
||||
#[test]
|
||||
|
||||
@@ -9,6 +9,11 @@ mod work_env;
|
||||
mod workspace_manifest;
|
||||
|
||||
use cli_args::{RegistryMode, TargetKind};
|
||||
use latency_proxy::{LatencyProxy, LinkProfile, mbps_to_bytes_per_sec};
|
||||
use std::{
|
||||
net::{Ipv4Addr, SocketAddr},
|
||||
time::Duration,
|
||||
};
|
||||
|
||||
#[tokio::main]
|
||||
async fn main() {
|
||||
@@ -46,6 +51,22 @@ async fn main() {
|
||||
}
|
||||
RegistryMode::Npm => "https://registry.npmjs.org/".to_string(),
|
||||
};
|
||||
let registry_rate_limit = mbps_to_bytes_per_sec(registry_bandwidth_mbps);
|
||||
let proxy_spawned_registry = !build_only
|
||||
&& matches!(registry_mode, RegistryMode::Verdaccio)
|
||||
&& (registry_latency_ms > 0 || registry_rate_limit.is_some());
|
||||
let spawned_registry_port = if proxy_spawned_registry {
|
||||
pacquet_registry_mock::pick_unused_port()
|
||||
.expect("pick an unused port for the registry upstream")
|
||||
} else {
|
||||
registry_port
|
||||
};
|
||||
let registry_cache_populator = if proxy_spawned_registry {
|
||||
format!("http://localhost:{spawned_registry_port}/")
|
||||
} else {
|
||||
registry.clone()
|
||||
};
|
||||
let registry_public_url = registry.trim_end_matches('/').to_string();
|
||||
|
||||
let verdaccio = if build_only {
|
||||
None
|
||||
@@ -57,11 +78,12 @@ async fn main() {
|
||||
.pipe(verify::executor("just install"));
|
||||
pacquet_registry_mock::MockInstanceOptions {
|
||||
client: &Default::default(),
|
||||
port: registry_port,
|
||||
port: spawned_registry_port,
|
||||
public_url: proxy_spawned_registry.then_some(registry_public_url.as_str()),
|
||||
stdout: work_env.join("verdaccio.stdout.log").pipe(Some).as_deref(),
|
||||
stderr: work_env.join("verdaccio.stderr.log").pipe(Some).as_deref(),
|
||||
max_retries: 10,
|
||||
retry_delay: tokio::time::Duration::from_millis(500),
|
||||
retry_delay: Duration::from_millis(500),
|
||||
}
|
||||
.spawn_if_necessary()
|
||||
.await
|
||||
@@ -73,6 +95,26 @@ async fn main() {
|
||||
RegistryMode::Npm => None,
|
||||
}
|
||||
};
|
||||
let registry_proxy = proxy_spawned_registry.then(|| {
|
||||
let upstream = SocketAddr::from((Ipv4Addr::LOCALHOST, spawned_registry_port));
|
||||
let listen = SocketAddr::from((Ipv4Addr::LOCALHOST, registry_port));
|
||||
let profile = LinkProfile {
|
||||
one_way: Duration::from_millis(registry_latency_ms) / 2,
|
||||
rate_limit: registry_rate_limit,
|
||||
};
|
||||
let proxy =
|
||||
LatencyProxy::spawn_on(listen, upstream, profile).expect("spawn registry proxy");
|
||||
eprintln!(
|
||||
"Fronting the registry with {}ms round-trip latency + {} download cap (proxy at {})",
|
||||
registry_latency_ms,
|
||||
match registry_bandwidth_mbps {
|
||||
mbps if mbps > 0.0 => format!("{mbps} Mbit/s"),
|
||||
_ => "no".to_string(),
|
||||
},
|
||||
proxy.addr,
|
||||
);
|
||||
proxy
|
||||
});
|
||||
|
||||
let has_pacquet_target = targets.iter().any(|target| target.kind == TargetKind::Pacquet);
|
||||
let has_pnpm_target = targets.iter().any(|target| target.kind == TargetKind::Pnpm);
|
||||
@@ -115,6 +157,7 @@ async fn main() {
|
||||
with_pnpm,
|
||||
targets,
|
||||
registry,
|
||||
registry_cache_populator,
|
||||
registry_mode,
|
||||
repository,
|
||||
pnpm_repository,
|
||||
@@ -132,5 +175,6 @@ async fn main() {
|
||||
} else {
|
||||
env.run();
|
||||
}
|
||||
drop(registry_proxy);
|
||||
drop(verdaccio); // terminate verdaccio if exists
|
||||
}
|
||||
|
||||
@@ -3,7 +3,7 @@ use crate::{
|
||||
BenchmarkScenario, Cleanup, HyperfineOptions, RegistryMode, TargetKind, TargetSpec,
|
||||
},
|
||||
fixtures::{LOCKFILE, PACKAGE_JSON},
|
||||
latency_proxy::{LatencyProxy, LinkProfile},
|
||||
latency_proxy::{LatencyProxy, LinkProfile, mbps_to_bytes_per_sec},
|
||||
verify::executor,
|
||||
workspace_manifest::MinimalWorkspaceManifest,
|
||||
};
|
||||
@@ -29,7 +29,10 @@ pub struct WorkEnv {
|
||||
pub root: PathBuf,
|
||||
pub with_pnpm: bool,
|
||||
pub targets: Vec<TargetSpec>,
|
||||
/// Registry URL used by benchmarked clients.
|
||||
pub registry: String,
|
||||
/// Registry URL used only by the pre-benchmark cache populator.
|
||||
pub registry_cache_populator: String,
|
||||
pub registry_mode: RegistryMode,
|
||||
pub repository: PathBuf,
|
||||
pub pnpm_repository: Option<PathBuf>,
|
||||
@@ -554,13 +557,11 @@ impl WorkEnv {
|
||||
}
|
||||
|
||||
pub fn run(&self) {
|
||||
// Front the registry with the emulated link, when requested. The
|
||||
// guard lives for the whole run so installs (in `benchmark`) cross
|
||||
// it; the URL is baked into every target's config during `init`.
|
||||
// Every client that touches the registry — direct pacquet/pnpm,
|
||||
// the pnpr server resolving, and the pnpr client fetching tarballs
|
||||
// — goes through it, so the registry-mock is uniformly as remote as
|
||||
// the real npm registry (see [`Self::registry_for`]).
|
||||
// Virtual mode points at an already-running registry, so this
|
||||
// method can only wrap it with a random local proxy and bake that
|
||||
// URL into the benchmark configs. Verdaccio mode is handled in
|
||||
// `main`: the proxy must own the public registry port before
|
||||
// registry-mock starts so packuments advertise proxied tarball URLs.
|
||||
let registry_proxy = self.start_registry_proxy();
|
||||
let client_registry = registry_proxy
|
||||
.as_ref()
|
||||
@@ -594,13 +595,14 @@ impl WorkEnv {
|
||||
}
|
||||
}
|
||||
|
||||
/// Start a proxy in front of the registry that emulates a real link
|
||||
/// (latency + bandwidth cap) for every client, or `None` when neither
|
||||
/// is requested or the registry is already remote (`npm` mode).
|
||||
/// Start a proxy in front of an external virtual registry that emulates
|
||||
/// a real link (latency + bandwidth cap) for every client, or `None`
|
||||
/// when neither is requested. Spawned Verdaccio registries are proxied
|
||||
/// in `main` so their advertised tarball URLs use the proxied port.
|
||||
fn start_registry_proxy(&self) -> Option<LatencyProxy> {
|
||||
let rate_limit = mbps_to_bytes_per_sec(self.registry_bandwidth_mbps);
|
||||
if (self.registry_latency_ms == 0 && rate_limit.is_none())
|
||||
|| matches!(self.registry_mode, RegistryMode::Npm)
|
||||
|| matches!(self.registry_mode, RegistryMode::Npm | RegistryMode::Verdaccio)
|
||||
{
|
||||
return None;
|
||||
}
|
||||
@@ -626,11 +628,10 @@ impl WorkEnv {
|
||||
/// target — direct *and* pnpr — uses `client_registry` (the emulated
|
||||
/// link when throttling is on; the raw registry otherwise), because a
|
||||
/// request to the registry-mock should cost the same regardless of who
|
||||
/// makes it. Only the proxy-cache populator keeps the raw (fast) link:
|
||||
/// it warms the registry-mock's on-disk cache before timing starts, so
|
||||
/// its cost isn't measured and there's no reason to slow it down.
|
||||
/// makes it. The proxy-cache populator may use a separate registry URL
|
||||
/// for untimed cache priming.
|
||||
fn registry_for<'a>(&'a self, id: BenchId, client_registry: &'a str) -> &'a str {
|
||||
if id.is_proxy_cache_populator() { &self.registry } else { client_registry }
|
||||
if id.is_proxy_cache_populator() { &self.registry_cache_populator } else { client_registry }
|
||||
}
|
||||
}
|
||||
|
||||
@@ -653,18 +654,6 @@ fn dir_contains_file(dir: &Path) -> bool {
|
||||
false
|
||||
}
|
||||
|
||||
/// Convert a megabits-per-second figure into a bytes-per-second cap for
|
||||
/// [`LinkProfile`], or `None` for a non-positive / non-finite value (no
|
||||
/// cap). 1 Mbit/s = 1_000_000 bits/s = 125_000 bytes/s. A positive rate
|
||||
/// never collapses to `Some(0)`: a 0 byte/s cap would stall the proxy
|
||||
/// (the pacing math divides by the rate), so it's floored at 1 byte/s.
|
||||
fn mbps_to_bytes_per_sec(mbps: f64) -> Option<u64> {
|
||||
if !mbps.is_finite() || mbps <= 0.0 {
|
||||
return None;
|
||||
}
|
||||
Some(((mbps * 125_000.0).round() as u64).max(1))
|
||||
}
|
||||
|
||||
/// A pnpr resolver server spawned for one `pnpr@<rev>`
|
||||
/// target. Killed on drop so it never outlives the benchmark run.
|
||||
struct PnprServer {
|
||||
|
||||
@@ -30,6 +30,7 @@ async fn launch() {
|
||||
let options = MockInstanceOptions {
|
||||
client: &Client::new(),
|
||||
port: pick_unused_port().expect("pick an unused port"),
|
||||
public_url: None,
|
||||
stdout: Some(&stdout),
|
||||
stderr: Some(&stderr),
|
||||
max_retries: 20,
|
||||
|
||||
@@ -34,6 +34,7 @@ impl Drop for MockInstance {
|
||||
pub struct MockInstanceOptions<'a> {
|
||||
pub client: &'a Client,
|
||||
pub port: u16,
|
||||
pub public_url: Option<&'a str>,
|
||||
pub stdout: Option<&'a Path>,
|
||||
pub stderr: Option<&'a Path>,
|
||||
pub max_retries: usize,
|
||||
@@ -71,7 +72,7 @@ impl<'a> MockInstanceOptions<'a> {
|
||||
}
|
||||
|
||||
pub(crate) async fn spawn(self) -> MockInstance {
|
||||
let MockInstanceOptions { port, stdout, stderr, .. } = self;
|
||||
let MockInstanceOptions { port, public_url, stdout, stderr, .. } = self;
|
||||
|
||||
let stdout = stdout.map_or_else(Stdio::null, |stdout| {
|
||||
File::create(stdout).expect("create file for stdout").into()
|
||||
@@ -84,7 +85,7 @@ impl<'a> MockInstanceOptions<'a> {
|
||||
// `pnpr_command`. pnpr runs in proxy mode
|
||||
// against npmjs.org so off-fixture packages fall through to
|
||||
// npm; see `pnpr_command` for the rationale.
|
||||
let process = pnpr_command(port)
|
||||
let process = pnpr_command(port, public_url)
|
||||
.stdin(Stdio::null())
|
||||
.stdout(stdout)
|
||||
.stderr(stderr)
|
||||
@@ -131,6 +132,7 @@ impl AutoMockInstance {
|
||||
let anchor = RegistryAnchor::load_or_init(MockInstanceOptions {
|
||||
client: &client,
|
||||
port: pick_unused_port().expect("pick an unused port"),
|
||||
public_url: None,
|
||||
stdout: None,
|
||||
stderr: None,
|
||||
max_retries: 20,
|
||||
|
||||
@@ -58,11 +58,13 @@ fn pnpr_binary() -> PathBuf {
|
||||
///
|
||||
/// `pnpr` is a workspace crate; run
|
||||
/// `cargo build -p pnpr` once before invoking the mock if
|
||||
/// it isn't already built. `--public-url` is pinned to
|
||||
/// `http://localhost:<port>` so the tarball URLs the registry
|
||||
/// rewrites match the URL pacquet's tests expect via
|
||||
/// `port_to_url`.
|
||||
pub fn pnpr_command(port: u16) -> Command {
|
||||
/// it isn't already built. `--public-url` defaults to
|
||||
/// `http://localhost:<port>` so the tarball URLs the registry rewrites
|
||||
/// match the URL pacquet's tests expect via `port_to_url`. The
|
||||
/// integrated benchmark can override it when a proxy fronts the registry
|
||||
/// port: packuments served by pnpr must advertise the proxy URL, or
|
||||
/// tarball downloads bypass the emulated registry link.
|
||||
pub fn pnpr_command(port: u16, public_url: Option<&str>) -> Command {
|
||||
let bin = pnpr_binary();
|
||||
assert!(
|
||||
bin.is_file(),
|
||||
@@ -78,6 +80,14 @@ pub fn pnpr_command(port: u16) -> Command {
|
||||
if seeded > 0 {
|
||||
eprintln!("info: seeded {seeded} fixture file(s) into runtime storage");
|
||||
}
|
||||
let default_public_url;
|
||||
let public_url = match public_url {
|
||||
Some(public_url) => public_url.trim_end_matches('/'),
|
||||
None => {
|
||||
default_public_url = port_to_url(port);
|
||||
default_public_url.trim_end_matches('/')
|
||||
}
|
||||
};
|
||||
let mut cmd = Command::new(bin);
|
||||
// `pnpr` defaults to its bundled verdaccio-shaped config
|
||||
// (npmjs uplink + `**` proxy rule), which matches what the mock
|
||||
@@ -90,6 +100,6 @@ pub fn pnpr_command(port: u16) -> Command {
|
||||
.arg("--listen")
|
||||
.arg(format!("127.0.0.1:{port}"))
|
||||
.arg("--public-url")
|
||||
.arg(port_to_url(port).trim_end_matches('/'));
|
||||
.arg(public_url);
|
||||
cmd
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user