diff --git a/pacquet/tasks/integrated-benchmark/src/latency_proxy.rs b/pacquet/tasks/integrated-benchmark/src/latency_proxy.rs index 65cf7d0e61..dbcc77420d 100644 --- a/pacquet/tasks/integrated-benchmark/src/latency_proxy.rs +++ b/pacquet/tasks/integrated-benchmark/src/latency_proxy.rs @@ -25,7 +25,7 @@ use std::{ io::{Read as _, Write as _}, - net::{Shutdown, SocketAddr, TcpListener, TcpStream}, + net::{Ipv4Addr, Shutdown, SocketAddr, TcpListener, TcpStream}, sync::{ Arc, atomic::{AtomicBool, Ordering}, @@ -59,7 +59,16 @@ impl LatencyProxy { /// direction. Returns the local address callers should connect to /// instead of `upstream`. pub fn spawn(upstream: SocketAddr, profile: LinkProfile) -> std::io::Result { - let listener = TcpListener::bind(("127.0.0.1", 0))?; + Self::spawn_on(SocketAddr::from((Ipv4Addr::LOCALHOST, 0)), upstream, profile) + } + + /// Front `upstream` with a proxy bound to `listen`. + pub fn spawn_on( + listen: SocketAddr, + upstream: SocketAddr, + profile: LinkProfile, + ) -> std::io::Result { + let listener = TcpListener::bind(listen)?; let addr = listener.local_addr()?; listener.set_nonblocking(true)?; @@ -72,6 +81,18 @@ impl LatencyProxy { } } +/// Convert a megabits-per-second figure into a bytes-per-second cap for +/// [`LinkProfile`], or `None` for a non-positive / non-finite value (no +/// cap). 1 Mbit/s = 1_000_000 bits/s = 125_000 bytes/s. A positive rate +/// never collapses to `Some(0)`: a 0 byte/s cap would stall the proxy +/// (the pacing math divides by the rate), so it's floored at 1 byte/s. +pub fn mbps_to_bytes_per_sec(mbps: f64) -> Option { + if !mbps.is_finite() || mbps <= 0.0 { + return None; + } + Some(((mbps * 125_000.0).round() as u64).max(1)) +} + impl Drop for LatencyProxy { fn drop(&mut self) { self.stop.store(true, Ordering::SeqCst); diff --git a/pacquet/tasks/integrated-benchmark/src/latency_proxy/tests.rs b/pacquet/tasks/integrated-benchmark/src/latency_proxy/tests.rs index d74208dc8c..150a038026 100644 --- a/pacquet/tasks/integrated-benchmark/src/latency_proxy/tests.rs +++ b/pacquet/tasks/integrated-benchmark/src/latency_proxy/tests.rs @@ -1,4 +1,4 @@ -use super::{LatencyProxy, LinkProfile}; +use super::{LatencyProxy, LinkProfile, mbps_to_bytes_per_sec}; use std::{ io::{Read as _, Write as _}, net::{TcpListener, TcpStream}, @@ -6,6 +6,41 @@ use std::{ time::{Duration, Instant}, }; +#[test] +fn binds_to_requested_listen_addr() { + let upstream = TcpListener::bind(("127.0.0.1", 0)).expect("bind upstream"); + let upstream_addr = upstream.local_addr().expect("upstream addr"); + thread::spawn(move || { + let (mut socket, _) = upstream.accept().expect("accept"); + let mut buf = [0u8; 64]; + let read = socket.read(&mut buf).expect("read request"); + assert_eq!(&buf[..read], b"ping"); + socket.write_all(b"pong").expect("write reply"); + }); + + let reserved = TcpListener::bind(("127.0.0.1", 0)).expect("reserve listen port"); + let listen = reserved.local_addr().expect("listen addr"); + drop(reserved); + + let profile = LinkProfile { one_way: Duration::ZERO, rate_limit: None }; + let proxy = LatencyProxy::spawn_on(listen, upstream_addr, profile).expect("spawn proxy"); + assert_eq!(proxy.addr, listen); + + let mut client = TcpStream::connect(proxy.addr).expect("connect to proxy"); + client.write_all(b"ping").expect("send request"); + let mut reply = [0u8; 64]; + let read = client.read(&mut reply).expect("read reply"); + assert_eq!(&reply[..read], b"pong"); +} + +#[test] +fn converts_mbps_to_bytes_per_second() { + assert_eq!(mbps_to_bytes_per_sec(0.0), None); + assert_eq!(mbps_to_bytes_per_sec(f64::NAN), None); + assert_eq!(mbps_to_bytes_per_sec(8.0), Some(1_000_000)); + assert_eq!(mbps_to_bytes_per_sec(f64::MIN_POSITIVE), Some(1)); +} + /// A request → response exchange through the proxy pays the one-way /// delay in each direction, so its round trip is ≈ `2 × one_way`. #[test] diff --git a/pacquet/tasks/integrated-benchmark/src/main.rs b/pacquet/tasks/integrated-benchmark/src/main.rs index ae1a27c2b6..63c8287f5f 100644 --- a/pacquet/tasks/integrated-benchmark/src/main.rs +++ b/pacquet/tasks/integrated-benchmark/src/main.rs @@ -9,6 +9,11 @@ mod work_env; mod workspace_manifest; use cli_args::{RegistryMode, TargetKind}; +use latency_proxy::{LatencyProxy, LinkProfile, mbps_to_bytes_per_sec}; +use std::{ + net::{Ipv4Addr, SocketAddr}, + time::Duration, +}; #[tokio::main] async fn main() { @@ -46,6 +51,22 @@ async fn main() { } RegistryMode::Npm => "https://registry.npmjs.org/".to_string(), }; + let registry_rate_limit = mbps_to_bytes_per_sec(registry_bandwidth_mbps); + let proxy_spawned_registry = !build_only + && matches!(registry_mode, RegistryMode::Verdaccio) + && (registry_latency_ms > 0 || registry_rate_limit.is_some()); + let spawned_registry_port = if proxy_spawned_registry { + pacquet_registry_mock::pick_unused_port() + .expect("pick an unused port for the registry upstream") + } else { + registry_port + }; + let registry_cache_populator = if proxy_spawned_registry { + format!("http://localhost:{spawned_registry_port}/") + } else { + registry.clone() + }; + let registry_public_url = registry.trim_end_matches('/').to_string(); let verdaccio = if build_only { None @@ -57,11 +78,12 @@ async fn main() { .pipe(verify::executor("just install")); pacquet_registry_mock::MockInstanceOptions { client: &Default::default(), - port: registry_port, + port: spawned_registry_port, + public_url: proxy_spawned_registry.then_some(registry_public_url.as_str()), stdout: work_env.join("verdaccio.stdout.log").pipe(Some).as_deref(), stderr: work_env.join("verdaccio.stderr.log").pipe(Some).as_deref(), max_retries: 10, - retry_delay: tokio::time::Duration::from_millis(500), + retry_delay: Duration::from_millis(500), } .spawn_if_necessary() .await @@ -73,6 +95,26 @@ async fn main() { RegistryMode::Npm => None, } }; + let registry_proxy = proxy_spawned_registry.then(|| { + let upstream = SocketAddr::from((Ipv4Addr::LOCALHOST, spawned_registry_port)); + let listen = SocketAddr::from((Ipv4Addr::LOCALHOST, registry_port)); + let profile = LinkProfile { + one_way: Duration::from_millis(registry_latency_ms) / 2, + rate_limit: registry_rate_limit, + }; + let proxy = + LatencyProxy::spawn_on(listen, upstream, profile).expect("spawn registry proxy"); + eprintln!( + "Fronting the registry with {}ms round-trip latency + {} download cap (proxy at {})", + registry_latency_ms, + match registry_bandwidth_mbps { + mbps if mbps > 0.0 => format!("{mbps} Mbit/s"), + _ => "no".to_string(), + }, + proxy.addr, + ); + proxy + }); let has_pacquet_target = targets.iter().any(|target| target.kind == TargetKind::Pacquet); let has_pnpm_target = targets.iter().any(|target| target.kind == TargetKind::Pnpm); @@ -115,6 +157,7 @@ async fn main() { with_pnpm, targets, registry, + registry_cache_populator, registry_mode, repository, pnpm_repository, @@ -132,5 +175,6 @@ async fn main() { } else { env.run(); } + drop(registry_proxy); drop(verdaccio); // terminate verdaccio if exists } diff --git a/pacquet/tasks/integrated-benchmark/src/work_env.rs b/pacquet/tasks/integrated-benchmark/src/work_env.rs index 54fdad9a14..ae35efb738 100644 --- a/pacquet/tasks/integrated-benchmark/src/work_env.rs +++ b/pacquet/tasks/integrated-benchmark/src/work_env.rs @@ -3,7 +3,7 @@ use crate::{ BenchmarkScenario, Cleanup, HyperfineOptions, RegistryMode, TargetKind, TargetSpec, }, fixtures::{LOCKFILE, PACKAGE_JSON}, - latency_proxy::{LatencyProxy, LinkProfile}, + latency_proxy::{LatencyProxy, LinkProfile, mbps_to_bytes_per_sec}, verify::executor, workspace_manifest::MinimalWorkspaceManifest, }; @@ -29,7 +29,10 @@ pub struct WorkEnv { pub root: PathBuf, pub with_pnpm: bool, pub targets: Vec, + /// Registry URL used by benchmarked clients. pub registry: String, + /// Registry URL used only by the pre-benchmark cache populator. + pub registry_cache_populator: String, pub registry_mode: RegistryMode, pub repository: PathBuf, pub pnpm_repository: Option, @@ -554,13 +557,11 @@ impl WorkEnv { } pub fn run(&self) { - // Front the registry with the emulated link, when requested. The - // guard lives for the whole run so installs (in `benchmark`) cross - // it; the URL is baked into every target's config during `init`. - // Every client that touches the registry — direct pacquet/pnpm, - // the pnpr server resolving, and the pnpr client fetching tarballs - // — goes through it, so the registry-mock is uniformly as remote as - // the real npm registry (see [`Self::registry_for`]). + // Virtual mode points at an already-running registry, so this + // method can only wrap it with a random local proxy and bake that + // URL into the benchmark configs. Verdaccio mode is handled in + // `main`: the proxy must own the public registry port before + // registry-mock starts so packuments advertise proxied tarball URLs. let registry_proxy = self.start_registry_proxy(); let client_registry = registry_proxy .as_ref() @@ -594,13 +595,14 @@ impl WorkEnv { } } - /// Start a proxy in front of the registry that emulates a real link - /// (latency + bandwidth cap) for every client, or `None` when neither - /// is requested or the registry is already remote (`npm` mode). + /// Start a proxy in front of an external virtual registry that emulates + /// a real link (latency + bandwidth cap) for every client, or `None` + /// when neither is requested. Spawned Verdaccio registries are proxied + /// in `main` so their advertised tarball URLs use the proxied port. fn start_registry_proxy(&self) -> Option { let rate_limit = mbps_to_bytes_per_sec(self.registry_bandwidth_mbps); if (self.registry_latency_ms == 0 && rate_limit.is_none()) - || matches!(self.registry_mode, RegistryMode::Npm) + || matches!(self.registry_mode, RegistryMode::Npm | RegistryMode::Verdaccio) { return None; } @@ -626,11 +628,10 @@ impl WorkEnv { /// target — direct *and* pnpr — uses `client_registry` (the emulated /// link when throttling is on; the raw registry otherwise), because a /// request to the registry-mock should cost the same regardless of who - /// makes it. Only the proxy-cache populator keeps the raw (fast) link: - /// it warms the registry-mock's on-disk cache before timing starts, so - /// its cost isn't measured and there's no reason to slow it down. + /// makes it. The proxy-cache populator may use a separate registry URL + /// for untimed cache priming. fn registry_for<'a>(&'a self, id: BenchId, client_registry: &'a str) -> &'a str { - if id.is_proxy_cache_populator() { &self.registry } else { client_registry } + if id.is_proxy_cache_populator() { &self.registry_cache_populator } else { client_registry } } } @@ -653,18 +654,6 @@ fn dir_contains_file(dir: &Path) -> bool { false } -/// Convert a megabits-per-second figure into a bytes-per-second cap for -/// [`LinkProfile`], or `None` for a non-positive / non-finite value (no -/// cap). 1 Mbit/s = 1_000_000 bits/s = 125_000 bytes/s. A positive rate -/// never collapses to `Some(0)`: a 0 byte/s cap would stall the proxy -/// (the pacing math divides by the rate), so it's floored at 1 byte/s. -fn mbps_to_bytes_per_sec(mbps: f64) -> Option { - if !mbps.is_finite() || mbps <= 0.0 { - return None; - } - Some(((mbps * 125_000.0).round() as u64).max(1)) -} - /// A pnpr resolver server spawned for one `pnpr@` /// target. Killed on drop so it never outlives the benchmark run. struct PnprServer { diff --git a/pacquet/tasks/registry-mock/src/main.rs b/pacquet/tasks/registry-mock/src/main.rs index 4975d4ba17..cd4b0faf9c 100644 --- a/pacquet/tasks/registry-mock/src/main.rs +++ b/pacquet/tasks/registry-mock/src/main.rs @@ -30,6 +30,7 @@ async fn launch() { let options = MockInstanceOptions { client: &Client::new(), port: pick_unused_port().expect("pick an unused port"), + public_url: None, stdout: Some(&stdout), stderr: Some(&stderr), max_retries: 20, diff --git a/pacquet/tasks/registry-mock/src/mock_instance.rs b/pacquet/tasks/registry-mock/src/mock_instance.rs index 77ffe880e7..582a6a75ee 100644 --- a/pacquet/tasks/registry-mock/src/mock_instance.rs +++ b/pacquet/tasks/registry-mock/src/mock_instance.rs @@ -34,6 +34,7 @@ impl Drop for MockInstance { pub struct MockInstanceOptions<'a> { pub client: &'a Client, pub port: u16, + pub public_url: Option<&'a str>, pub stdout: Option<&'a Path>, pub stderr: Option<&'a Path>, pub max_retries: usize, @@ -71,7 +72,7 @@ impl<'a> MockInstanceOptions<'a> { } pub(crate) async fn spawn(self) -> MockInstance { - let MockInstanceOptions { port, stdout, stderr, .. } = self; + let MockInstanceOptions { port, public_url, stdout, stderr, .. } = self; let stdout = stdout.map_or_else(Stdio::null, |stdout| { File::create(stdout).expect("create file for stdout").into() @@ -84,7 +85,7 @@ impl<'a> MockInstanceOptions<'a> { // `pnpr_command`. pnpr runs in proxy mode // against npmjs.org so off-fixture packages fall through to // npm; see `pnpr_command` for the rationale. - let process = pnpr_command(port) + let process = pnpr_command(port, public_url) .stdin(Stdio::null()) .stdout(stdout) .stderr(stderr) @@ -131,6 +132,7 @@ impl AutoMockInstance { let anchor = RegistryAnchor::load_or_init(MockInstanceOptions { client: &client, port: pick_unused_port().expect("pick an unused port"), + public_url: None, stdout: None, stderr: None, max_retries: 20, diff --git a/pacquet/tasks/registry-mock/src/pnpr_command.rs b/pacquet/tasks/registry-mock/src/pnpr_command.rs index 3f3f181a14..ecb62155ca 100644 --- a/pacquet/tasks/registry-mock/src/pnpr_command.rs +++ b/pacquet/tasks/registry-mock/src/pnpr_command.rs @@ -58,11 +58,13 @@ fn pnpr_binary() -> PathBuf { /// /// `pnpr` is a workspace crate; run /// `cargo build -p pnpr` once before invoking the mock if -/// it isn't already built. `--public-url` is pinned to -/// `http://localhost:` so the tarball URLs the registry -/// rewrites match the URL pacquet's tests expect via -/// `port_to_url`. -pub fn pnpr_command(port: u16) -> Command { +/// it isn't already built. `--public-url` defaults to +/// `http://localhost:` so the tarball URLs the registry rewrites +/// match the URL pacquet's tests expect via `port_to_url`. The +/// integrated benchmark can override it when a proxy fronts the registry +/// port: packuments served by pnpr must advertise the proxy URL, or +/// tarball downloads bypass the emulated registry link. +pub fn pnpr_command(port: u16, public_url: Option<&str>) -> Command { let bin = pnpr_binary(); assert!( bin.is_file(), @@ -78,6 +80,14 @@ pub fn pnpr_command(port: u16) -> Command { if seeded > 0 { eprintln!("info: seeded {seeded} fixture file(s) into runtime storage"); } + let default_public_url; + let public_url = match public_url { + Some(public_url) => public_url.trim_end_matches('/'), + None => { + default_public_url = port_to_url(port); + default_public_url.trim_end_matches('/') + } + }; let mut cmd = Command::new(bin); // `pnpr` defaults to its bundled verdaccio-shaped config // (npmjs uplink + `**` proxy rule), which matches what the mock @@ -90,6 +100,6 @@ pub fn pnpr_command(port: u16) -> Command { .arg("--listen") .arg(format!("127.0.0.1:{port}")) .arg("--public-url") - .arg(port_to_url(port).trim_end_matches('/')); + .arg(public_url); cmd }