diff --git a/Cargo.lock b/Cargo.lock index ebe37a9fcb..77894998da 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -281,6 +281,15 @@ dependencies = [ "syn", ] +[[package]] +name = "atoi" +version = "2.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f28d99ec8bfea296261ca1af174f24225171fea9664ba9003cbebee704810528" +dependencies = [ + "num-traits", +] + [[package]] name = "atomic-waker" version = "1.1.2" @@ -533,6 +542,9 @@ name = "bitflags" version = "2.11.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c4512299f36f043ab09a583e57bceb5a5aab7a73db1805848e8fef3c9e8c78b3" +dependencies = [ + "serde_core", +] [[package]] name = "bitvec" @@ -869,7 +881,7 @@ version = "3.1.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "faf9468729b8cbcea668e36183cb69d317348c2e08e994829fb56ebfdfbaac34" dependencies = [ - "windows-sys 0.52.0", + "windows-sys 0.61.2", ] [[package]] @@ -905,6 +917,15 @@ version = "0.4.32" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "cc14f565cf027a105f7a44ccf9e5b424348421a1d8952a8fc9d499d313107789" +[[package]] +name = "concurrent-queue" +version = "2.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4ca0197aee26d1ae37445ee532fefce43251d24cc7c166799f4d46817f1d3973" +dependencies = [ + "crossbeam-utils", +] + [[package]] name = "console" version = "0.16.3" @@ -997,6 +1018,21 @@ dependencies = [ "libc", ] +[[package]] +name = "crc" +version = "3.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5eb8a2a1cd12ab0d987a5d5e825195d372001a4094a0376319d5a0ad71c1ba0d" +dependencies = [ + "crc-catalog", +] + +[[package]] +name = "crc-catalog" +version = "2.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "217698eaf96b4a3f0bc4f3662aaa55bdf913cd54d7204591faa790070c6d0853" + [[package]] name = "crc24" version = "0.1.6" @@ -1082,6 +1118,15 @@ dependencies = [ "crossbeam-utils", ] +[[package]] +name = "crossbeam-queue" +version = "0.3.12" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0f58bbc28f91df819d0aa2a2c00cd19754769c2fad90579b3592b1c9ba7a3115" +dependencies = [ + "crossbeam-utils", +] + [[package]] name = "crossbeam-utils" version = "0.8.21" @@ -1381,6 +1426,12 @@ dependencies = [ "syn", ] +[[package]] +name = "dotenvy" +version = "0.15.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1aaf95b3e5c8f23aa320147307562d361db0ae0d51242340f558153b4eb2439b" + [[package]] name = "dsa" version = "0.6.3" @@ -1460,6 +1511,9 @@ name = "either" version = "1.15.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "48c757948c5ede0e46177b7add2e67155f70e33c07fea8284df6576da70b3719" +dependencies = [ + "serde", +] [[package]] name = "elliptic-curve" @@ -1523,7 +1577,29 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "39cab71617ae0d63f51a36d69f866391735b51691dbda63cf6f96d042b63efeb" dependencies = [ "libc", - "windows-sys 0.52.0", + "windows-sys 0.61.2", +] + +[[package]] +name = "etcetera" +version = "0.8.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "136d1b5283a1ab77bd9257427ffd09d8667ced0570b6f938942bc7568ed5b943" +dependencies = [ + "cfg-if", + "home", + "windows-sys 0.48.0", +] + +[[package]] +name = "event-listener" +version = "5.4.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e13b66accf52311f30a0db42147dadea9850cb48cd070028831ae5f5d4b856ab" +dependencies = [ + "concurrent-queue", + "parking", + "pin-project-lite", ] [[package]] @@ -1676,6 +1752,17 @@ dependencies = [ "futures-util", ] +[[package]] +name = "futures-intrusive" +version = "0.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1d930c203dd0b6ff06e0201a4a2fe9149b43c684fd4420555b26d21b1a02956f" +dependencies = [ + "futures-core", + "lock_api", + "parking_lot", +] + [[package]] name = "futures-io" version = "0.3.32" @@ -1911,6 +1998,8 @@ version = "0.15.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9229cfe53dfd69f0609a49f65461bd93001ea1ef889cd5529dd176593f5338a1" dependencies = [ + "allocator-api2", + "equivalent", "foldhash 0.1.5", ] @@ -1941,6 +2030,15 @@ dependencies = [ "hashbrown 0.14.5", ] +[[package]] +name = "hashlink" +version = "0.10.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7382cf6263419f2d8df38c55d7da83da5c18aef87fc7a7fc1fb1e344edfe14c1" +dependencies = [ + "hashbrown 0.15.5", +] + [[package]] name = "hashlink" version = "0.11.0" @@ -2500,7 +2598,7 @@ checksum = "3640c1c38b8e4e43584d8df18be5fc6b0aa314ce6ebf51b53313d4306cca8e46" dependencies = [ "hermit-abi", "libc", - "windows-sys 0.52.0", + "windows-sys 0.61.2", ] [[package]] @@ -2626,7 +2724,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "160f2eade097f30263b548aae5deb12ad349c909baa710fa24b92c9090b2e006" dependencies = [ "scopeguard", - "windows-sys 0.59.0", + "windows-sys 0.61.2", ] [[package]] @@ -3145,7 +3243,7 @@ version = "0.50.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7957b9740744892f114936ab4a57b3f487491bbeafaf8083688b16841a4240e5" dependencies = [ - "windows-sys 0.59.0", + "windows-sys 0.61.2", ] [[package]] @@ -4522,6 +4620,12 @@ dependencies = [ "unicode-width 0.2.2", ] +[[package]] +name = "parking" +version = "2.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f38d5652c16fde515bb1ecef450ab0f6a219d619a7274976324d5e377f7dceba" + [[package]] name = "parking_lot" version = "0.12.5" @@ -4826,6 +4930,7 @@ dependencies = [ "serde-saphyr", "serde_json", "sha2", + "sqlx", "ssri", "tempfile", "tokio", @@ -4833,6 +4938,7 @@ dependencies = [ "tower-http 0.6.11", "tracing", "tracing-subscriber", + "url", "wax", "zip", ] @@ -5523,7 +5629,7 @@ dependencies = [ "errno", "libc", "linux-raw-sys 0.12.1", - "windows-sys 0.52.0", + "windows-sys 0.61.2", ] [[package]] @@ -5581,7 +5687,7 @@ dependencies = [ "security-framework", "security-framework-sys", "webpki-root-certs", - "windows-sys 0.52.0", + "windows-sys 0.61.2", ] [[package]] @@ -5944,6 +6050,9 @@ name = "smallvec" version = "1.15.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "67b1b7a3b5fe4f1376887184045fcf45c69e92af734b7aaddc05fb777b6fbd03" +dependencies = [ + "serde", +] [[package]] name = "smart-default" @@ -6031,6 +6140,170 @@ dependencies = [ "wasm-bindgen", ] +[[package]] +name = "sqlx" +version = "0.8.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1fefb893899429669dcdd979aff487bd78f4064e5e7907e4269081e0ef7d97dc" +dependencies = [ + "sqlx-core", + "sqlx-macros", + "sqlx-mysql", + "sqlx-postgres", +] + +[[package]] +name = "sqlx-core" +version = "0.8.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ee6798b1838b6a0f69c007c133b8df5866302197e404e8b6ee8ed3e3a5e68dc6" +dependencies = [ + "base64 0.22.1", + "bytes", + "crc", + "crossbeam-queue", + "either", + "event-listener", + "futures-core", + "futures-intrusive", + "futures-io", + "futures-util", + "hashbrown 0.15.5", + "hashlink 0.10.0", + "indexmap 2.14.0", + "log", + "memchr", + "once_cell", + "percent-encoding", + "rustls", + "serde", + "serde_json", + "sha2", + "smallvec", + "thiserror 2.0.18", + "tokio", + "tokio-stream", + "tracing", + "url", + "webpki-roots 0.26.11", +] + +[[package]] +name = "sqlx-macros" +version = "0.8.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a2d452988ccaacfbf5e0bdbc348fb91d7c8af5bee192173ac3636b5fb6e6715d" +dependencies = [ + "proc-macro2", + "quote", + "sqlx-core", + "sqlx-macros-core", + "syn", +] + +[[package]] +name = "sqlx-macros-core" +version = "0.8.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "19a9c1841124ac5a61741f96e1d9e2ec77424bf323962dd894bdb93f37d5219b" +dependencies = [ + "dotenvy", + "either", + "heck", + "hex", + "once_cell", + "proc-macro2", + "quote", + "serde", + "serde_json", + "sha2", + "sqlx-core", + "sqlx-mysql", + "sqlx-postgres", + "syn", + "tokio", + "url", +] + +[[package]] +name = "sqlx-mysql" +version = "0.8.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "aa003f0038df784eb8fecbbac13affe3da23b45194bd57dba231c8f48199c526" +dependencies = [ + "atoi", + "base64 0.22.1", + "bitflags 2.11.1", + "byteorder", + "bytes", + "crc", + "digest", + "dotenvy", + "either", + "futures-channel", + "futures-core", + "futures-io", + "futures-util", + "generic-array", + "hex", + "hkdf", + "hmac", + "itoa", + "log", + "md-5", + "memchr", + "once_cell", + "percent-encoding", + "rand 0.8.6", + "rsa", + "serde", + "sha1", + "sha2", + "smallvec", + "sqlx-core", + "stringprep", + "thiserror 2.0.18", + "tracing", + "whoami", +] + +[[package]] +name = "sqlx-postgres" +version = "0.8.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "db58fcd5a53cf07c184b154801ff91347e4c30d17a3562a635ff028ad5deda46" +dependencies = [ + "atoi", + "base64 0.22.1", + "bitflags 2.11.1", + "byteorder", + "crc", + "dotenvy", + "etcetera", + "futures-channel", + "futures-core", + "futures-util", + "hex", + "hkdf", + "hmac", + "home", + "itoa", + "log", + "md-5", + "memchr", + "once_cell", + "rand 0.8.6", + "serde", + "serde_json", + "sha2", + "smallvec", + "sqlx-core", + "stringprep", + "thiserror 2.0.18", + "tracing", + "whoami", +] + [[package]] name = "ssri" version = "9.2.0" @@ -6060,6 +6333,17 @@ version = "0.1.9" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "2b2231b7c3057d5e4ad0156fb3dc807d900806020c5ffa3ee6ff2c8c76fb8520" +[[package]] +name = "stringprep" +version = "0.1.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7b4df3d392d81bd458a8a621b8bffbd2302a12ffe288a9d931670948749463b1" +dependencies = [ + "unicode-bidi", + "unicode-normalization", + "unicode-properties", +] + [[package]] name = "strsim" version = "0.11.1" @@ -6265,7 +6549,7 @@ dependencies = [ "getrandom 0.4.2", "once_cell", "rustix 1.1.4", - "windows-sys 0.52.0", + "windows-sys 0.61.2", ] [[package]] @@ -6275,7 +6559,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "230a1b821ccbd75b185820a1f1ff7b14d21da1e442e22c0863ea5f08771a8874" dependencies = [ "rustix 1.1.4", - "windows-sys 0.59.0", + "windows-sys 0.61.2", ] [[package]] @@ -6789,6 +7073,12 @@ dependencies = [ "version_check", ] +[[package]] +name = "unicode-bidi" +version = "0.3.18" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5c1cb5db39152898a79168971543b1cb5020dff7fe43c8dc468b0885f5e29df5" + [[package]] name = "unicode-ident" version = "1.0.24" @@ -6801,6 +7091,21 @@ version = "0.1.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3b09c83c3c29d37506a3e260c08c03743a6bb66a9cd432c6934ab501a190571f" +[[package]] +name = "unicode-normalization" +version = "0.1.25" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5fd4f6878c9cb28d874b009da9e8d183b5abc80117c40bbd187a1fde336be6e8" +dependencies = [ + "tinyvec", +] + +[[package]] +name = "unicode-properties" +version = "0.1.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7df058c713841ad818f1dc5d3fd88063241cc61f49f5fbea4b951e8cf5a8d71d" + [[package]] name = "unicode-segmentation" version = "1.13.2" @@ -6947,6 +7252,12 @@ dependencies = [ "wit-bindgen 0.51.0", ] +[[package]] +name = "wasite" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b8dad83b4f25e74f184f64c43b150b91efe7647395b42289f38e50566d82855b" + [[package]] name = "wasm-bindgen" version = "0.2.118" @@ -7106,6 +7417,24 @@ dependencies = [ "rustls-pki-types", ] +[[package]] +name = "webpki-roots" +version = "0.26.11" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "521bc38abb08001b01866da9f51eb7c5d647a19260e00054a8c7fd5f9e57f7a9" +dependencies = [ + "webpki-roots 1.0.8", +] + +[[package]] +name = "webpki-roots" +version = "1.0.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bf85cb06032201fa7c6f829d7db5a7e5aa45bcc0655327713065f6f0576731bf" +dependencies = [ + "rustls-pki-types", +] + [[package]] name = "which" version = "4.4.2" @@ -7127,6 +7456,16 @@ dependencies = [ "libc", ] +[[package]] +name = "whoami" +version = "1.6.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5d4a4db5077702ca3015d3d02d74974948aba2ad9e12ab7df718ee64ccd7e97d" +dependencies = [ + "libredox", + "wasite", +] + [[package]] name = "widestring" version = "1.2.1" @@ -7155,7 +7494,7 @@ version = "0.1.11" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c2a7b1c03c876122aa43f3020e6c3c3ee5c05081c9a00739faf7503aeba10d22" dependencies = [ - "windows-sys 0.52.0", + "windows-sys 0.61.2", ] [[package]] @@ -7276,13 +7615,22 @@ dependencies = [ "windows-link", ] +[[package]] +name = "windows-sys" +version = "0.48.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "677d2418bec65e3338edb076e806bc1ec15693c5d0104683f2efe857f61056a9" +dependencies = [ + "windows-targets 0.48.5", +] + [[package]] name = "windows-sys" version = "0.52.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "282be5f36a8ce781fad8c8ae18fa3f9beff57ec1b52cb3de0789201425d9a33d" dependencies = [ - "windows-targets", + "windows-targets 0.52.6", ] [[package]] @@ -7291,7 +7639,7 @@ version = "0.59.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1e38bc4d79ed67fd075bcc251a1c39b32a1776bbe92e5bef1f0bf1f8c531853b" dependencies = [ - "windows-targets", + "windows-targets 0.52.6", ] [[package]] @@ -7303,20 +7651,35 @@ dependencies = [ "windows-link", ] +[[package]] +name = "windows-targets" +version = "0.48.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9a2fa6e2155d7247be68c096456083145c183cbbbc2764150dda45a87197940c" +dependencies = [ + "windows_aarch64_gnullvm 0.48.5", + "windows_aarch64_msvc 0.48.5", + "windows_i686_gnu 0.48.5", + "windows_i686_msvc 0.48.5", + "windows_x86_64_gnu 0.48.5", + "windows_x86_64_gnullvm 0.48.5", + "windows_x86_64_msvc 0.48.5", +] + [[package]] name = "windows-targets" version = "0.52.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9b724f72796e036ab90c1021d4780d4d3d648aca59e491e6b98e725b84e99973" dependencies = [ - "windows_aarch64_gnullvm", - "windows_aarch64_msvc", - "windows_i686_gnu", + "windows_aarch64_gnullvm 0.52.6", + "windows_aarch64_msvc 0.52.6", + "windows_i686_gnu 0.52.6", "windows_i686_gnullvm", - "windows_i686_msvc", - "windows_x86_64_gnu", - "windows_x86_64_gnullvm", - "windows_x86_64_msvc", + "windows_i686_msvc 0.52.6", + "windows_x86_64_gnu 0.52.6", + "windows_x86_64_gnullvm 0.52.6", + "windows_x86_64_msvc 0.52.6", ] [[package]] @@ -7328,18 +7691,36 @@ dependencies = [ "windows-link", ] +[[package]] +name = "windows_aarch64_gnullvm" +version = "0.48.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2b38e32f0abccf9987a4e3079dfb67dcd799fb61361e53e2882c3cbaf0d905d8" + [[package]] name = "windows_aarch64_gnullvm" version = "0.52.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "32a4622180e7a0ec044bb555404c800bc9fd9ec262ec147edd5989ccd0c02cd3" +[[package]] +name = "windows_aarch64_msvc" +version = "0.48.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "dc35310971f3b2dbbf3f0690a219f40e2d9afcf64f9ab7cc1be722937c26b4bc" + [[package]] name = "windows_aarch64_msvc" version = "0.52.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "09ec2a7bb152e2252b53fa7803150007879548bc709c039df7627cabbd05d469" +[[package]] +name = "windows_i686_gnu" +version = "0.48.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a75915e7def60c94dcef72200b9a8e58e5091744960da64ec734a6c6e9b3743e" + [[package]] name = "windows_i686_gnu" version = "0.52.6" @@ -7352,24 +7733,48 @@ version = "0.52.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0eee52d38c090b3caa76c563b86c3a4bd71ef1a819287c19d586d7334ae8ed66" +[[package]] +name = "windows_i686_msvc" +version = "0.48.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8f55c233f70c4b27f66c523580f78f1004e8b5a8b659e05a4eb49d4166cca406" + [[package]] name = "windows_i686_msvc" version = "0.52.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "240948bc05c5e7c6dabba28bf89d89ffce3e303022809e73deaefe4f6ec56c66" +[[package]] +name = "windows_x86_64_gnu" +version = "0.48.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "53d40abd2583d23e4718fddf1ebec84dbff8381c07cae67ff7768bbf19c6718e" + [[package]] name = "windows_x86_64_gnu" version = "0.52.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "147a5c80aabfbf0c7d901cb5895d1de30ef2907eb21fbbab29ca94c5b08b1a78" +[[package]] +name = "windows_x86_64_gnullvm" +version = "0.48.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0b7b52767868a23d5bab768e390dc5f5c55825b6d30b86c844ff2dc7414044cc" + [[package]] name = "windows_x86_64_gnullvm" version = "0.52.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "24d5b23dc417412679681396f2b49f3de8c1473deb516bd34410872eff51ed0d" +[[package]] +name = "windows_x86_64_msvc" +version = "0.48.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ed94fce61571a4006852b7389a063ab983c02eb1bb37b47f8272ce92d06d9538" + [[package]] name = "windows_x86_64_msvc" version = "0.52.6" diff --git a/Cargo.toml b/Cargo.toml index 44b1530a3b..fe33ff0a9e 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -139,6 +139,7 @@ same-file = { version = "1.0.6" } serde = { version = "1.0.228", features = ["derive"] } serde_json = { version = "1.0.150", features = ["preserve_order", "raw_value"] } serde-saphyr = { version = "0.0.27" } +sqlx = { version = "0.8.6", default-features = false, features = ["runtime-tokio-rustls"] } # 0.11 removes the LowerHex impl on Output; revisit after upstream/consumers catch up sha2 = { version = "0.10.9" } smart-default = { version = "0.7.1" } diff --git a/pnpr/crates/pnpr/Cargo.toml b/pnpr/crates/pnpr/Cargo.toml index 2bc490001d..9a9d23531b 100644 --- a/pnpr/crates/pnpr/Cargo.toml +++ b/pnpr/crates/pnpr/Cargo.toml @@ -18,6 +18,12 @@ path = "src/lib.rs" name = "pnpr" path = "src/main.rs" +[features] +default = ["backend-libsql"] +backend-libsql = ["dep:libsql"] +backend-mysql = ["dep:sqlx", "sqlx/mysql"] +backend-postgres = ["dep:sqlx", "sqlx/postgres"] + [dependencies] pacquet-config = { workspace = true } pacquet-config-dir = { workspace = true } @@ -46,7 +52,7 @@ getrandom = { workspace = true } home = { workspace = true } indexmap = { workspace = true } libc = { workspace = true } -libsql = { workspace = true } +libsql = { workspace = true, optional = true } miette = { workspace = true } node-semver = { workspace = true } object_store = { workspace = true } @@ -56,12 +62,14 @@ serde = { workspace = true } serde-saphyr = { workspace = true } serde_json = { workspace = true } sha2 = { workspace = true } +sqlx = { workspace = true, optional = true } ssri = { workspace = true } tempfile = { workspace = true } tokio = { workspace = true } tower-http = { workspace = true } tracing = { workspace = true } tracing-subscriber = { workspace = true, features = ["json"] } +url = { workspace = true } wax = { workspace = true } zip = { workspace = true } diff --git a/pnpr/crates/pnpr/config.yaml b/pnpr/crates/pnpr/config.yaml index 6617cb506e..0bdb271707 100644 --- a/pnpr/crates/pnpr/config.yaml +++ b/pnpr/crates/pnpr/config.yaml @@ -23,20 +23,23 @@ storage: ./storage # secretAccessKey: ${PNPR_S3_SECRET_ACCESS_KEY} # forcePathStyle: false # allowHttp: false -# Back the auth record stores (users + tokens) with a networked SQLite -# database (libsql / Turso) instead of the local htpasswd file + SQLite -# token db. Lets several stateless pnpr replicas share one consistent set -# of accounts. When omitted, auth stays on local disk (see `auth:` -# below). `authToken` is optional for an unauthenticated local `sqld`. -# Set `replicaPath` to keep a local embedded replica so the auth hot -# path (token lookups) reads locally instead of over the network; -# `syncIntervalSecs` bounds how stale a read can be (e.g. revocation lag). +# Back the auth record stores (users + tokens) with one shared SQL +# database instead of the local htpasswd file + SQLite token db. Lets +# several stateless pnpr replicas share one consistent set of accounts. +# Select exactly one backend below. `backend-libsql` is enabled by +# default; PostgreSQL and MySQL builds need the matching Cargo feature. #backend: # libsql: # url: ${PNPR_LIBSQL_URL} # authToken: ${PNPR_LIBSQL_TOKEN} # replicaPath: ./auth-replica.db # syncIntervalSecs: 60 +# postgres: +# url: ${PNPR_POSTGRES_URL} +# maxConnections: 16 +# mysql: +# url: ${PNPR_MYSQL_URL} +# maxConnections: 16 secret: pnpm-registry-mock-secret-key-32 auth: diff --git a/pnpr/crates/pnpr/src/auth.rs b/pnpr/crates/pnpr/src/auth.rs index 1a3de40696..dbf62b5247 100644 --- a/pnpr/crates/pnpr/src/auth.rs +++ b/pnpr/crates/pnpr/src/auth.rs @@ -6,7 +6,7 @@ //! * [`UserBackend`] — username → bcrypt-hashed password. //! * [`TokenBackend`] — SHA-256 token hash → token record. //! -//! Three implementations exist, picked at startup by +//! Implementations are picked at startup by //! [`AuthState::load`]: //! //! * [`UserStore`] / [`TokenStore`] — the local default. Users are an @@ -15,11 +15,12 @@ //! every write, so reads (the hot path for `enforce_access`) never //! touch disk. With no file configured both fall back to a pure //! in-memory map (the `@pnpm/registry-mock` shape). -//! * [`LibsqlAuth`] — a networked-SQLite (libsql / Turso) backend that -//! stores both records in a shared database, so several stateless -//! pnpr replicas observe a consistent set of users and tokens. The -//! on-disk htpasswd format doesn't network, so users live in a -//! `users` table here; the `tokens` table matches the local schema. +//! * `backend.libsql`, `backend.postgres`, and `backend.mysql` — +//! shared SQL databases that store both records in one place, so +//! several stateless pnpr replicas observe a consistent set of users +//! and tokens. The on-disk htpasswd format doesn't network, so users +//! live in a `users` table here; the `tokens` table matches the local +//! schema. //! //! The raw token is only ever returned to the caller once on `issue`; //! only its SHA-256 hash hits storage, so a leak of the database @@ -31,9 +32,14 @@ use crate::{ }; use async_trait::async_trait; use base64::{Engine, engine::general_purpose::STANDARD as BASE64}; +#[cfg(feature = "backend-libsql")] use libsql_backend::LibsqlAuth; use rusqlite::Connection; use sha2::{Digest, Sha256}; +#[cfg(feature = "backend-mysql")] +use sqlx_backend::mysql::MysqlAuth; +#[cfg(feature = "backend-postgres")] +use sqlx_backend::postgres::PostgresAuth; use std::{ collections::HashMap, fmt::Write as _, @@ -45,7 +51,63 @@ use std::{ time::{SystemTime, UNIX_EPOCH}, }; +#[cfg(feature = "backend-libsql")] mod libsql_backend; +#[cfg(any(feature = "backend-postgres", feature = "backend-mysql"))] +mod sqlx_backend; + +pub(crate) const MAX_USERNAME_CHARS: usize = 255; + +pub(crate) fn validate_username(username: &str) -> Result<()> { + if username.is_empty() { + return Err(RegistryError::BadRequest { reason: "username must not be empty".to_string() }); + } + + let mut chars = 0; + let mut starts_with_whitespace = false; + let mut ends_with_whitespace = false; + let mut contains_colon = false; + let mut contains_control = false; + for ch in username.chars() { + chars += 1; + if chars > MAX_USERNAME_CHARS { + return Err(RegistryError::BadRequest { + reason: format!("username must be at most {MAX_USERNAME_CHARS} characters"), + }); + } + if chars == 1 { + starts_with_whitespace = ch.is_whitespace(); + } + ends_with_whitespace = ch.is_whitespace(); + contains_colon |= ch == ':'; + contains_control |= ch.is_control(); + } + + if starts_with_whitespace || ends_with_whitespace { + return Err(RegistryError::BadRequest { + reason: "username must not start or end with whitespace".to_string(), + }); + } + if contains_colon { + return Err(RegistryError::BadRequest { + reason: "username must not contain ':'".to_string(), + }); + } + if contains_control { + return Err(RegistryError::BadRequest { + reason: "username must not contain control characters".to_string(), + }); + } + Ok(()) +} + +pub(crate) fn token_timestamp_from_sql(timestamp: i64) -> u64 { + timestamp.max(0) as u64 +} + +pub(crate) fn token_timestamp_to_sql(timestamp: u64) -> i64 { + i64::try_from(timestamp).unwrap_or(i64::MAX) +} /// Bundle of the user store and the token store, each a trait object /// so the rest of the server doesn't have to know whether auth is @@ -73,20 +135,61 @@ impl AuthState { Self { users: Arc::new(UserStore::in_memory()), tokens: Arc::new(TokenStore::in_memory()) } } - /// Build the auth state from the resolved config. The networked - /// [`BackendConfig::Libsql`] backs both stores with one shared - /// database; otherwise each local store is in-memory when its file - /// path is unset and file-backed otherwise. The fallible step (open - /// the htpasswd / `SQLite` file, or connect to the networked DB and - /// ensure its schema) runs here so a malformed file or an - /// unreachable database surfaces as a startup error before the - /// socket is bound. + /// Build the auth state from the resolved config. A configured SQL + /// backend backs both stores with one shared database; otherwise + /// each local store is in-memory when its file path is unset and + /// file-backed otherwise. The fallible step (open the htpasswd / + /// `SQLite` file, or connect to the configured DB and ensure its + /// schema) runs here so a malformed file or an unreachable database + /// surfaces as a startup error before the socket is bound. pub async fn load(auth: &AuthConfig, backend: &BackendConfig) -> Result { - if let BackendConfig::Libsql(settings) = backend { - let shared = Arc::new(LibsqlAuth::connect(settings, auth.htpasswd.max_users).await?); - let users: Arc = Arc::clone(&shared) as Arc; - let tokens: Arc = shared; - return Ok(Self { users, tokens }); + match backend { + BackendConfig::Local => {} + BackendConfig::Libsql(settings) => { + #[cfg(feature = "backend-libsql")] + { + let shared = + Arc::new(LibsqlAuth::connect(settings, auth.htpasswd.max_users).await?); + let users: Arc = Arc::clone(&shared) as Arc; + let tokens: Arc = shared; + return Ok(Self { users, tokens }); + } + #[cfg(not(feature = "backend-libsql"))] + { + let _ = settings; + return Err(backend_not_enabled("libsql", "backend-libsql")); + } + } + BackendConfig::Postgres(settings) => { + #[cfg(feature = "backend-postgres")] + { + let shared = + Arc::new(PostgresAuth::connect(settings, auth.htpasswd.max_users).await?); + let users: Arc = Arc::clone(&shared) as Arc; + let tokens: Arc = shared; + return Ok(Self { users, tokens }); + } + #[cfg(not(feature = "backend-postgres"))] + { + let _ = settings; + return Err(backend_not_enabled("postgres", "backend-postgres")); + } + } + BackendConfig::Mysql(settings) => { + #[cfg(feature = "backend-mysql")] + { + let shared = + Arc::new(MysqlAuth::connect(settings, auth.htpasswd.max_users).await?); + let users: Arc = Arc::clone(&shared) as Arc; + let tokens: Arc = shared; + return Ok(Self { users, tokens }); + } + #[cfg(not(feature = "backend-mysql"))] + { + let _ = settings; + return Err(backend_not_enabled("mysql", "backend-mysql")); + } + } } let users: Arc = match auth.htpasswd.file.clone() { Some(path) => Arc::new(UserStore::open(path, auth.htpasswd.max_users)?), @@ -100,17 +203,31 @@ impl AuthState { } } +#[cfg(any( + not(feature = "backend-libsql"), + not(feature = "backend-postgres"), + not(feature = "backend-mysql") +))] +fn backend_not_enabled(name: &str, feature: &str) -> RegistryError { + RegistryError::InvalidConfig { + reason: format!("backend.{name} is configured but pnpr was built without `{feature}`"), + } +} + /// Username + password record store. The hot read is /// [`Self::verify`] (the Basic-auth path of [`identify`]); the write /// is [`Self::add_or_login`] (npm `adduser` / `login`). #[async_trait] pub trait UserBackend: Send + Sync { - /// Add a new user or verify a returning one. See - /// [`UpsertOutcome`] for the success cases; a wrong password for - /// an existing user is [`RegistryError::Unauthenticated`], and a - /// new user past the registration cap is - /// [`RegistryError::RegistrationDisabled`] / `TooManyUsers`. - async fn add_or_login(&self, username: &str, password: &str) -> Result; + /// Add a new user or verify a returning one. On success, returns + /// the outcome plus the canonical stored username to bind follow-up + /// token issuance to the same identity that Basic auth would + /// resolve. A wrong password for an existing user is + /// [`RegistryError::Unauthenticated`], and a new user past the + /// registration cap is [`RegistryError::RegistrationDisabled`] / + /// `TooManyUsers`. + async fn add_or_login(&self, username: &str, password: &str) + -> Result<(UpsertOutcome, String)>; /// Verify a username+password pair. `Ok(Some(username))` on a match, /// `Ok(None)` when the user is unknown or the password is wrong, and @@ -229,7 +346,11 @@ impl UserBackend for UserStore { /// * Known username, password wrong → `Unauthenticated`. /// * Unknown username, registration disabled or capped → /// `RegistrationDisabled` / `TooManyUsers`. - async fn add_or_login(&self, username: &str, password: &str) -> Result { + async fn add_or_login( + &self, + username: &str, + password: &str, + ) -> Result<(UpsertOutcome, String)> { let existing_hash = { let users = self.users.lock().expect("UserStore mutex poisoned"); users.get(username).cloned() @@ -238,6 +359,8 @@ impl UserBackend for UserStore { return verify_returning_user(username, password, stored).await; } + validate_username(username)?; + // Brand-new user — check the registration cap before doing // the (expensive) bcrypt hash. match self.max_users { @@ -277,7 +400,7 @@ impl UserBackend for UserStore { match next_step { NextStep::Persist(snapshot) => { self.persist(snapshot).await?; - Ok(UpsertOutcome::Created) + Ok((UpsertOutcome::Created, username.to_string())) } NextStep::VerifyExisting(stored) => { verify_returning_user(username, password, stored).await @@ -392,12 +515,25 @@ impl TokenBackend for TokenStore { } if let Some(path) = self.persist.clone() { let hash_for_db = token_hash.clone(); - tokio::task::spawn_blocking(move || -> Result<()> { + let result = tokio::task::spawn_blocking(move || -> Result<()> { let conn = Connection::open(&path)?; - upsert_token(&conn, &hash_for_db, &record)?; + insert_token(&conn, &hash_for_db, &record)?; Ok(()) }) - .await??; + .await; + match result { + Ok(Ok(())) => {} + Ok(Err(err)) => { + let mut inner = self.inner.lock().expect("TokenStore mutex poisoned"); + inner.tokens.remove(&token_hash); + return Err(err); + } + Err(err) => { + let mut inner = self.inner.lock().expect("TokenStore mutex poisoned"); + inner.tokens.remove(&token_hash); + return Err(err.into()); + } + } } Ok(raw) } @@ -632,9 +768,9 @@ async fn verify_returning_user( username: &str, password: &str, stored: String, -) -> Result { +) -> Result<(UpsertOutcome, String)> { if verify_bcrypt(password.to_string(), stored).await? { - Ok(UpsertOutcome::LoggedIn) + Ok((UpsertOutcome::LoggedIn, username.to_string())) } else { Err(RegistryError::Unauthenticated { resource: format!("user {username:?}") }) } @@ -644,28 +780,34 @@ async fn verify_returning_user( // SQLite-backed token store // --------------------------------------------------------------- -/// `tokens` table DDL — shared verbatim by the local [`TokenStore`] -/// and the networked [`LibsqlAuth`] so the two backends store the -/// same shape and a database can be moved between them. +/// `tokens` table DDL — shared by every SQL-backed auth store so the +/// backends store the same shape and records can be moved between them. const TOKENS_TABLE_SQL: &str = "CREATE TABLE IF NOT EXISTS tokens ( - token_hash TEXT PRIMARY KEY, - username TEXT NOT NULL, - created_at INTEGER NOT NULL, - last_used_at INTEGER NOT NULL, - readonly INTEGER NOT NULL DEFAULT 0, - cidr_whitelist TEXT NOT NULL DEFAULT '[]' + token_hash CHAR(64) PRIMARY KEY, + username VARCHAR(255) NOT NULL, + created_at BIGINT NOT NULL, + last_used_at BIGINT NOT NULL, + readonly SMALLINT NOT NULL DEFAULT 0, + cidr_whitelist VARCHAR(4096) NOT NULL DEFAULT '[]' )"; const TOKENS_INDEX_SQL: &str = "CREATE INDEX IF NOT EXISTS tokens_username ON tokens(username)"; -/// `users` table DDL — only the networked backend needs it, since the -/// local backend keeps users in an htpasswd file. One bcrypt hash per -/// username, the same `$2y$...` string the htpasswd file would hold. +/// `users` table DDL — only shared-database backends need it, since +/// the local backend keeps users in an htpasswd file. One bcrypt hash +/// per username, the same `$2y$...` string the htpasswd file would hold. +#[cfg(any(feature = "backend-libsql", feature = "backend-postgres", feature = "backend-mysql"))] const USERS_TABLE_SQL: &str = "CREATE TABLE IF NOT EXISTS users ( - username TEXT PRIMARY KEY, + username VARCHAR(255) PRIMARY KEY, bcrypt_hash TEXT NOT NULL )"; +#[cfg(any(feature = "backend-libsql", feature = "backend-postgres", feature = "backend-mysql"))] +const AUTH_COUNTERS_TABLE_SQL: &str = "CREATE TABLE IF NOT EXISTS auth_counters ( + name VARCHAR(64) PRIMARY KEY, + value BIGINT NOT NULL +)"; + fn init_tokens_schema(conn: &Connection) -> Result<()> { conn.execute(TOKENS_TABLE_SQL, [])?; conn.execute(TOKENS_INDEX_SQL, [])?; @@ -691,8 +833,8 @@ fn load_all_tokens(conn: &Connection) -> Result> { hash, TokenRecord { username, - created_at: created_at as u64, - last_used_at: last_used_at as u64, + created_at: token_timestamp_from_sql(created_at), + last_used_at: token_timestamp_from_sql(last_used_at), readonly: readonly != 0, cidr_whitelist, }, @@ -706,22 +848,17 @@ fn delete_token(conn: &Connection, token_hash: &str) -> Result<()> { Ok(()) } -fn upsert_token(conn: &Connection, token_hash: &str, record: &TokenRecord) -> Result<()> { +fn insert_token(conn: &Connection, token_hash: &str, record: &TokenRecord) -> Result<()> { let cidr_json = serde_json::to_string(&record.cidr_whitelist) .expect("Vec always serializes to JSON"); conn.execute( "INSERT INTO tokens (token_hash, username, created_at, last_used_at, readonly, cidr_whitelist) - VALUES (?1, ?2, ?3, ?4, ?5, ?6) - ON CONFLICT(token_hash) DO UPDATE SET - username = excluded.username, - last_used_at = excluded.last_used_at, - readonly = excluded.readonly, - cidr_whitelist = excluded.cidr_whitelist", + VALUES (?1, ?2, ?3, ?4, ?5, ?6)", rusqlite::params![ token_hash, record.username, - record.created_at as i64, - record.last_used_at as i64, + token_timestamp_to_sql(record.created_at), + token_timestamp_to_sql(record.last_used_at), i64::from(record.readonly), cidr_json, ], diff --git a/pnpr/crates/pnpr/src/auth/libsql_backend.rs b/pnpr/crates/pnpr/src/auth/libsql_backend.rs index 641189e62d..d806bac659 100644 --- a/pnpr/crates/pnpr/src/auth/libsql_backend.rs +++ b/pnpr/crates/pnpr/src/auth/libsql_backend.rs @@ -20,14 +20,15 @@ use super::{ DEFAULT_BCRYPT_COST, TokenBackend, TokenRecord, UpsertOutcome, UserBackend, fresh_secret, - hash_bcrypt, mint_token, sha256_hex, unix_seconds, verify_bcrypt, verify_returning_user, + hash_bcrypt, mint_token, sha256_hex, token_timestamp_from_sql, unix_seconds, validate_username, + verify_bcrypt, verify_returning_user, }; use crate::{ config::{LibsqlSettings, MaxUsers}, error::{RegistryError, Result}, }; use async_trait::async_trait; -use libsql::{Builder, Connection, Database, Row, params}; +use libsql::{Builder, Connection, Database, Error as LibsqlError, Row, params}; use std::{ sync::atomic::{AtomicU64, Ordering}, time::Duration, @@ -114,7 +115,9 @@ impl LibsqlAuth { /// registration cap, never on the hot path. async fn user_count(&self) -> Result { let mut rows = self.conn.query("SELECT COUNT(*) FROM users", ()).await?; - let row = rows.next().await?.expect("COUNT(*) returns exactly one row"); + let Some(row) = rows.next().await? else { + return Err(missing_count_row()); + }; let count: i64 = row.get(0)?; Ok(count.max(0) as u64) } @@ -122,11 +125,17 @@ impl LibsqlAuth { #[async_trait] impl UserBackend for LibsqlAuth { - async fn add_or_login(&self, username: &str, password: &str) -> Result { + async fn add_or_login( + &self, + username: &str, + password: &str, + ) -> Result<(UpsertOutcome, String)> { if let Some(stored) = self.stored_hash(username).await? { return verify_returning_user(username, password, stored).await; } + validate_username(username)?; + // Brand-new user. The cheap pre-check avoids the (expensive) hash // when the cap is already full; the insert below re-checks the // cap atomically so it holds even under a concurrent burst. @@ -139,62 +148,85 @@ impl UserBackend for LibsqlAuth { } let hash = hash_bcrypt(password.to_string(), DEFAULT_BCRYPT_COST).await?; - // Count-and-insert in one statement so the cap is strict, not - // best-effort: the `WHERE (SELECT COUNT(*) ...) < max` guard is - // evaluated atomically with the insert, so concurrent registrants - // (even on other replicas, since writes serialize on the primary) - // can't race past it. `DO NOTHING` absorbs a same-username race. - // A zero row-count means either the cap won or another writer - // inserted this username first; we disambiguate below. - let inserted = match self.max_users { - MaxUsers::Limited(max) => { - self.conn - .execute( - "INSERT INTO users (username, bcrypt_hash) - SELECT ?1, ?2 WHERE (SELECT COUNT(*) FROM users) < ?3 - ON CONFLICT(username) DO NOTHING", - params![username, hash, max as i64], - ) - .await? - } - _ => { - self.conn - .execute( - "INSERT INTO users (username, bcrypt_hash) VALUES (?1, ?2) - ON CONFLICT(username) DO NOTHING", - params![username, hash], - ) - .await? - } - }; - if inserted == 0 { - if let Some(stored) = self.stored_hash(username).await? { - // A concurrent writer registered this username first. - return verify_returning_user(username, password, stored).await; - } - // Nothing inserted and the user still doesn't exist, so the - // only thing that blocked the insert is the cap guard. - if let MaxUsers::Limited(max) = self.max_users { - return Err(RegistryError::TooManyUsers { max }); - } - // Unbounded yet neither inserted nor present: a concurrent - // delete raced the insert. Surface a transient failure rather - // than silently report success. - return Err(RegistryError::Unauthenticated { resource: format!("user {username:?}") }); + if matches!(self.max_users, MaxUsers::Unlimited) { + let inserted = self + .conn + .execute( + "INSERT INTO users (username, bcrypt_hash) VALUES (?1, ?2)", + params![username, hash], + ) + .await; + return match inserted { + Ok(_) => Ok((UpsertOutcome::Created, username.to_string())), + Err(err) if is_unique_violation(&err) => { + if let Some(stored) = self.stored_hash(username).await? { + return verify_returning_user(username, password, stored).await; + } + Err(RegistryError::Unauthenticated { resource: format!("user {username:?}") }) + } + Err(err) => Err(err.into()), + }; + } + + let mut can_retry_after_reconcile = true; + loop { + let tx = self.conn.transaction().await?; + if let MaxUsers::Limited(max) = self.max_users { + let sql_max = i64::try_from(max).map_err(|_| RegistryError::InvalidConfig { + reason: "backend.libsql auth max_users must fit a signed BIGINT".to_string(), + })?; + let updated = tx + .execute( + "UPDATE auth_counters SET value = value + 1 + WHERE name = ?1 AND value < ?2", + params!["users", sql_max], + ) + .await?; + if updated == 0 { + tx.rollback().await?; + if can_retry_after_reconcile { + can_retry_after_reconcile = false; + if reconcile_user_counter_overcount(&self.conn).await? { + continue; + } + } + if let Some(stored) = self.stored_hash(username).await? { + return verify_returning_user(username, password, stored).await; + } + return Err(RegistryError::TooManyUsers { max }); + } + } + let inserted = tx + .execute( + "INSERT INTO users (username, bcrypt_hash) VALUES (?1, ?2)", + params![username, hash], + ) + .await; + match inserted { + Ok(_) => { + tx.commit().await?; + return Ok((UpsertOutcome::Created, username.to_string())); + } + Err(err) if is_unique_violation(&err) => { + tx.rollback().await?; + if let Some(stored) = self.stored_hash(username).await? { + return verify_returning_user(username, password, stored).await; + } + return Err(RegistryError::Unauthenticated { + resource: format!("user {username:?}"), + }); + } + Err(err) => return Err(err.into()), + } } - Ok(UpsertOutcome::Created) } async fn verify(&self, username: &str, password: &str) -> Result> { let Some(stored) = self.stored_hash(username).await? else { return Ok(None); }; - // A database error already propagated above; a bcrypt error here - // is treated as a non-match, not a store outage. - Ok(verify_bcrypt(password.to_string(), stored) - .await - .unwrap_or(false) - .then(|| username.to_string())) + let valid = verify_bcrypt(password.to_string(), stored).await?; + Ok(valid.then(|| username.to_string())) } } @@ -208,9 +240,8 @@ impl TokenBackend for LibsqlAuth { self.conn .execute( "INSERT INTO tokens - (token_hash, username, created_at, last_used_at, readonly, cidr_whitelist) - VALUES (?1, ?2, ?3, ?3, 0, '[]') - ON CONFLICT(token_hash) DO UPDATE SET last_used_at = excluded.last_used_at", + (token_hash, username, created_at, last_used_at, readonly, cidr_whitelist) + VALUES (?1, ?2, ?3, ?3, 0, '[]')", params![token_hash, username, now], ) .await?; @@ -261,9 +292,85 @@ async fn init_schema(conn: &Connection) -> Result<()> { conn.execute(super::USERS_TABLE_SQL, ()).await?; conn.execute(super::TOKENS_TABLE_SQL, ()).await?; conn.execute(super::TOKENS_INDEX_SQL, ()).await?; + conn.execute(super::AUTH_COUNTERS_TABLE_SQL, ()).await?; + ensure_user_counter(conn).await +} + +async fn ensure_user_counter(conn: &Connection) -> Result<()> { + let mut rows = conn.query("SELECT COUNT(*) FROM users", ()).await?; + let Some(row) = rows.next().await? else { + return Err(missing_count_row()); + }; + let count: i64 = row.get(0)?; + let tx = conn.transaction().await?; + let inserted = tx + .execute("INSERT INTO auth_counters (name, value) VALUES (?1, ?2)", params!["users", count]) + .await; + match inserted { + Ok(_) => {} + Err(err) if is_unique_violation(&err) => {} + Err(err) => { + tx.rollback().await?; + return Err(err.into()); + } + } + tx.execute( + "UPDATE auth_counters + SET value = CASE WHEN value < ?2 THEN ?2 ELSE value END + WHERE name = ?1", + params!["users", count], + ) + .await?; + tx.commit().await?; Ok(()) } +async fn reconcile_user_counter_overcount(conn: &Connection) -> Result { + let tx = conn.transaction().await?; + let mut counter_rows = + tx.query("SELECT value FROM auth_counters WHERE name = ?1", params!["users"]).await?; + let Some(counter_row) = counter_rows.next().await? else { + drop(counter_rows); + tx.commit().await?; + return Ok(false); + }; + let counter: i64 = counter_row.get(0)?; + drop(counter_rows); + let mut count_rows = tx.query("SELECT COUNT(*) FROM users", ()).await?; + let Some(count_row) = count_rows.next().await? else { + return Err(missing_count_row()); + }; + let count: i64 = count_row.get(0)?; + drop(count_rows); + if counter <= count { + tx.commit().await?; + return Ok(false); + } + tx.execute( + "UPDATE auth_counters SET value = ?2 WHERE name = ?1", + params!["users", count.max(0)], + ) + .await?; + tx.commit().await?; + Ok(true) +} + +fn is_unique_violation(err: &LibsqlError) -> bool { + match err { + LibsqlError::SqliteFailure(code, message) => { + *code == 19 || *code == 2067 || message.contains("UNIQUE constraint failed") + } + LibsqlError::RemoteSqliteFailure(_, code, message) => { + *code == 19 || *code == 2067 || message.contains("UNIQUE constraint failed") + } + _ => false, + } +} + +fn missing_count_row() -> RegistryError { + RegistryError::Internal { reason: "auth database COUNT(*) returned no rows".to_string() } +} + /// Decode a row selecting [`TOKEN_COLUMNS`] into its `(token_hash, /// record)` pair. fn row_to_keyed_record(row: &Row) -> Result<(String, TokenRecord)> { @@ -278,8 +385,8 @@ fn row_to_keyed_record(row: &Row) -> Result<(String, TokenRecord)> { token_hash, TokenRecord { username, - created_at: created_at as u64, - last_used_at: last_used_at as u64, + created_at: token_timestamp_from_sql(created_at), + last_used_at: token_timestamp_from_sql(last_used_at), readonly: readonly != 0, cidr_whitelist, }, diff --git a/pnpr/crates/pnpr/src/auth/libsql_backend/tests.rs b/pnpr/crates/pnpr/src/auth/libsql_backend/tests.rs index 5259172d60..493e32f06a 100644 --- a/pnpr/crates/pnpr/src/auth/libsql_backend/tests.rs +++ b/pnpr/crates/pnpr/src/auth/libsql_backend/tests.rs @@ -12,11 +12,11 @@ async fn add_or_login_creates_then_logs_in() { let backend = local_backend(MaxUsers::Unlimited).await; assert!(matches!( backend.add_or_login("alice", "secret").await.unwrap(), - UpsertOutcome::Created, + (UpsertOutcome::Created, _), )); assert!(matches!( backend.add_or_login("alice", "secret").await.unwrap(), - UpsertOutcome::LoggedIn, + (UpsertOutcome::LoggedIn, _), )); assert_eq!(backend.verify("alice", "secret").await.unwrap().as_deref(), Some("alice")); assert!(backend.verify("alice", "wrong").await.unwrap().is_none()); @@ -31,6 +31,53 @@ async fn add_or_login_rejects_existing_user_with_wrong_password() { assert_eq!(err.status_code(), axum::http::StatusCode::UNAUTHORIZED); } +#[tokio::test] +async fn add_or_login_rejects_invalid_username_before_insert() { + let backend = local_backend(MaxUsers::Unlimited).await; + let err = backend.add_or_login("alice ", "secret").await.unwrap_err(); + assert_eq!(err.status_code(), axum::http::StatusCode::BAD_REQUEST); + + let mut rows = backend.conn.query("SELECT COUNT(*) FROM users", ()).await.unwrap(); + let total: i64 = rows.next().await.unwrap().unwrap().get(0).unwrap(); + assert_eq!(total, 0, "invalid username must not be inserted"); +} + +#[tokio::test] +async fn add_or_login_allows_existing_legacy_username() { + let backend = local_backend(MaxUsers::Unlimited).await; + let hash = bcrypt::hash("secret", 4).unwrap(); + backend + .conn + .execute( + "INSERT INTO users (username, bcrypt_hash) VALUES (?1, ?2)", + params!["alice ", hash], + ) + .await + .unwrap(); + + let outcome = backend.add_or_login("alice ", "secret").await.unwrap(); + + assert!(matches!(outcome, (UpsertOutcome::LoggedIn, _))); + assert_eq!(outcome.1, "alice "); +} + +#[tokio::test] +async fn verify_propagates_corrupt_hash_errors() { + let backend = local_backend(MaxUsers::Unlimited).await; + backend + .conn + .execute( + "INSERT INTO users (username, bcrypt_hash) VALUES (?1, ?2)", + params!["alice", "not-a-bcrypt-hash"], + ) + .await + .unwrap(); + + let err = backend.verify("alice", "secret").await.unwrap_err(); + + assert!(matches!(err, RegistryError::Bcrypt(_)), "got {err:?}"); +} + #[tokio::test] async fn max_users_caps_registration() { let backend = local_backend(MaxUsers::Limited(1)).await; @@ -57,7 +104,7 @@ async fn registration_cap_is_strict_under_concurrency() { } let mut created = 0; for handle in handles { - if matches!(handle.await.unwrap(), Ok(UpsertOutcome::Created)) { + if matches!(handle.await.unwrap(), Ok((UpsertOutcome::Created, _))) { created += 1; } } @@ -68,6 +115,53 @@ async fn registration_cap_is_strict_under_concurrency() { assert_eq!(total, 1, "the cap must be strictly enforced, never exceeded"); } +#[tokio::test] +async fn ensure_user_counter_reconciles_a_stale_counter() { + let backend = local_backend(MaxUsers::Unlimited).await; + backend + .conn + .execute( + "INSERT INTO users (username, bcrypt_hash) VALUES (?1, ?2)", + params!["alice", "not-used-by-this-test"], + ) + .await + .unwrap(); + backend + .conn + .execute("UPDATE auth_counters SET value = 0 WHERE name = ?1", params!["users"]) + .await + .unwrap(); + + ensure_user_counter(&backend.conn).await.unwrap(); + + let mut rows = backend + .conn + .query("SELECT value FROM auth_counters WHERE name = ?1", params!["users"]) + .await + .unwrap(); + let value: i64 = rows.next().await.unwrap().unwrap().get(0).unwrap(); + assert_eq!(value, 1, "startup reconciliation must lift stale counters to the user count"); +} + +#[tokio::test] +async fn registration_cap_self_heals_an_overcounted_counter() { + let backend = local_backend(MaxUsers::Limited(1)).await; + backend.add_or_login("alice", "x").await.unwrap(); + backend.conn.execute("DELETE FROM users WHERE username = ?1", params!["alice"]).await.unwrap(); + + assert!( + matches!(backend.add_or_login("bob", "x").await.unwrap(), (UpsertOutcome::Created, _),), + ); + + let mut rows = backend + .conn + .query("SELECT value FROM auth_counters WHERE name = ?1", params!["users"]) + .await + .unwrap(); + let value: i64 = rows.next().await.unwrap().unwrap().get(0).unwrap(); + assert_eq!(value, 1, "counter should match the newly created user"); +} + #[tokio::test] async fn tokens_round_trip_and_revoke() { let backend = local_backend(MaxUsers::Unlimited).await; diff --git a/pnpr/crates/pnpr/src/auth/sqlx_backend.rs b/pnpr/crates/pnpr/src/auth/sqlx_backend.rs new file mode 100644 index 0000000000..7d00a2daf7 --- /dev/null +++ b/pnpr/crates/pnpr/src/auth/sqlx_backend.rs @@ -0,0 +1,1307 @@ +//! Shared SQL auth backend for relational databases supported through +//! feature-gated `sqlx` drivers. + +use super::{ + DEFAULT_BCRYPT_COST, MAX_USERNAME_CHARS, TokenBackend, TokenRecord, UpsertOutcome, UserBackend, + fresh_secret, hash_bcrypt, mint_token, sha256_hex, unix_seconds, validate_username, + verify_bcrypt, verify_returning_user, +}; +use crate::{ + config::MaxUsers, + error::{RegistryError, Result}, +}; +use async_trait::async_trait; +use std::{ + future::Future, + sync::atomic::{AtomicU64, Ordering}, + time::Duration, +}; + +#[derive(Debug)] +pub(in crate::auth) struct SqlAuth { + db: Db, + secret: [u8; 32], + counter: AtomicU64, + next_cap_reconcile_at: AtomicU64, + max_users: MaxUsers, + timeout: Duration, +} + +const CAP_RECONCILE_INTERVAL_SECS: u64 = 60; + +impl SqlAuth { + fn new(db: Db, max_users: MaxUsers, timeout: Duration) -> Self { + Self { + db, + secret: fresh_secret(), + counter: AtomicU64::new(0), + next_cap_reconcile_at: AtomicU64::new(0), + max_users, + timeout, + } + } + + async fn reconcile_capped_counter_once_per_interval(&self) -> Result + where + Db: AuthSqlBackend, + { + let now = unix_seconds(); + let next = self.next_cap_reconcile_at.load(Ordering::Relaxed); + if now < next { + return Ok(false); + } + let updated_next = now.saturating_add(CAP_RECONCILE_INTERVAL_SECS); + if self + .next_cap_reconcile_at + .compare_exchange(next, updated_next, Ordering::Relaxed, Ordering::Relaxed) + .is_err() + { + return Ok(false); + } + self.db.reconcile_user_counter_overcount().await + } +} + +/// Only use this around read-only request-path work or startup setup. Request-path +/// writes rely on the backend's statement timeout and must await the database +/// result so callers do not observe a timeout with an unknown commit state. +async fn with_auth_timeout( + timeout: Duration, + future: impl Future>, +) -> Result +where + RegistryError: From, +{ + match tokio::time::timeout(timeout, future).await { + Ok(result) => result.map_err(RegistryError::from), + Err(_) => Err(RegistryError::AuthDatabaseTimeout), + } +} + +#[async_trait] +trait AuthSqlBackend: Send + Sync { + async fn stored_user(&self, username: &str) -> Result>; + async fn user_count(&self) -> Result; + async fn reconcile_user_counter_overcount(&self) -> Result; + async fn insert_user( + &self, + username: &str, + bcrypt_hash: &str, + max_users: MaxUsers, + ) -> Result; + async fn insert_token(&self, token_hash: &str, record: &TokenRecord) -> Result<()>; + async fn lookup_token(&self, token_hash: &str) -> Result>; + async fn find_token(&self, token_hash: &str) -> Result>; + async fn list_tokens(&self, username: &str) -> Result>; + async fn delete_token(&self, token_hash: &str) -> Result<()>; +} + +#[derive(Clone)] +struct StoredUser { + username: String, + bcrypt_hash: String, +} + +enum InsertUser { + Created, + Existing(StoredUser), + CapReached, +} + +#[async_trait] +impl UserBackend for SqlAuth +where + Db: AuthSqlBackend, +{ + async fn add_or_login( + &self, + username: &str, + password: &str, + ) -> Result<(UpsertOutcome, String)> { + validate_username_bounds(username)?; + + if let Some(stored) = with_auth_timeout(self.timeout, self.db.stored_user(username)).await? + { + return verify_returning_user(&stored.username, password, stored.bcrypt_hash).await; + } + + validate_username(username)?; + + match self.max_users { + MaxUsers::Disabled => return Err(RegistryError::RegistrationDisabled), + MaxUsers::Limited(max) => { + if with_auth_timeout(self.timeout, self.db.user_count()).await? >= max { + let reconciled_below_cap = + self.reconcile_capped_counter_once_per_interval().await? + && with_auth_timeout(self.timeout, self.db.user_count()).await? < max; + if !reconciled_below_cap { + return Err(RegistryError::TooManyUsers { max }); + } + } + } + MaxUsers::Unlimited => {} + } + + let hash = hash_bcrypt(password.to_string(), DEFAULT_BCRYPT_COST).await?; + match self.db.insert_user(username, &hash, self.max_users).await? { + InsertUser::Created => Ok((UpsertOutcome::Created, username.to_string())), + InsertUser::Existing(stored) => { + verify_returning_user(&stored.username, password, stored.bcrypt_hash).await + } + InsertUser::CapReached => match self.max_users { + MaxUsers::Limited(max) => Err(RegistryError::TooManyUsers { max }), + MaxUsers::Disabled | MaxUsers::Unlimited => { + Err(RegistryError::Unauthenticated { resource: format!("user {username:?}") }) + } + }, + } + } + + async fn verify(&self, username: &str, password: &str) -> Result> { + if username_has_invalid_bounds(username) { + return Ok(None); + } + + let Some(stored) = with_auth_timeout(self.timeout, self.db.stored_user(username)).await? + else { + return Ok(None); + }; + let valid = verify_bcrypt(password.to_string(), stored.bcrypt_hash).await?; + Ok(valid.then_some(stored.username)) + } +} + +fn validate_username_bounds(username: &str) -> Result<()> { + if username.is_empty() { + return Err(RegistryError::BadRequest { reason: "username must not be empty".to_string() }); + } + if username_too_long(username) { + return Err(RegistryError::BadRequest { + reason: format!("username must be at most {MAX_USERNAME_CHARS} characters"), + }); + } + Ok(()) +} + +fn username_has_invalid_bounds(username: &str) -> bool { + username.is_empty() || username_too_long(username) +} + +fn username_too_long(username: &str) -> bool { + username.chars().nth(MAX_USERNAME_CHARS).is_some() +} + +#[async_trait] +impl TokenBackend for SqlAuth +where + Db: AuthSqlBackend, +{ + async fn issue(&self, username: &str) -> Result { + let nonce = self.counter.fetch_add(1, Ordering::Relaxed); + let raw = mint_token(&self.secret, nonce, username); + let token_hash = sha256_hex(raw.as_bytes()); + let now = unix_seconds(); + let record = TokenRecord { + username: username.to_string(), + created_at: now, + last_used_at: now, + readonly: false, + cidr_whitelist: Vec::new(), + }; + self.db.insert_token(&token_hash, &record).await?; + Ok(raw) + } + + async fn lookup(&self, raw: &str) -> Result> { + let token_hash = sha256_hex(raw.as_bytes()); + with_auth_timeout(self.timeout, self.db.lookup_token(&token_hash)).await + } + + async fn find_by_key(&self, key: &str) -> Result> { + with_auth_timeout(self.timeout, self.db.find_token(key)).await + } + + async fn list_for_user(&self, username: &str) -> Result> { + with_auth_timeout(self.timeout, self.db.list_tokens(username)).await + } + + async fn revoke_by_key(&self, key: &str) -> Result> { + let Some(record) = with_auth_timeout(self.timeout, self.db.find_token(key)).await? else { + return Ok(None); + }; + self.db.delete_token(key).await?; + Ok(Some(record)) + } +} + +#[cfg(feature = "backend-postgres")] +pub(super) mod postgres { + use super::super::{TokenRecord, token_timestamp_from_sql, token_timestamp_to_sql}; + use super::{ + AuthSqlBackend, InsertUser, SqlAuth, StoredUser, invalid_pool_size, sql_max_users, + timeout_millis, with_auth_timeout, + }; + use crate::{ + config::{MaxUsers, SqlBackendSettings}, + error::{RegistryError, Result}, + }; + use async_trait::async_trait; + use sqlx::{PgPool, Row, postgres::PgPoolOptions}; + use std::time::Duration; + + #[derive(Debug)] + pub(in crate::auth) struct PostgresDatabase { + pool: PgPool, + } + + pub(in crate::auth) type PostgresAuth = SqlAuth; + + impl SqlAuth { + pub(in crate::auth) async fn connect( + settings: &SqlBackendSettings, + max_users: MaxUsers, + ) -> Result { + let startup_options = postgres_pool_options( + settings, + settings.startup_timeout, + settings.startup_timeout, + )?; + let startup_pool = + with_auth_timeout(settings.startup_timeout, startup_options.connect(&settings.url)) + .await?; + let startup_db = PostgresDatabase { pool: startup_pool }; + with_auth_timeout(settings.startup_timeout, startup_db.init_schema()).await?; + startup_db.pool.close().await; + + let pool = postgres_pool_options(settings, settings.timeout, settings.timeout)? + .connect_lazy(&settings.url)?; + let db = PostgresDatabase { pool }; + Ok(SqlAuth::new(db, max_users, settings.timeout)) + } + } + + fn postgres_pool_options( + settings: &SqlBackendSettings, + session_timeout: Duration, + acquire_timeout: Duration, + ) -> Result { + let mut options = PgPoolOptions::new(); + if let Some(max_connections) = settings.max_connections { + if max_connections == 0 { + return Err(invalid_pool_size("postgres")); + } + options = options.max_connections(max_connections); + } + let statement_timeout_sql = + format!("SET statement_timeout = {}", timeout_millis(session_timeout)); + options = options.after_connect(move |conn, _meta| { + let statement_timeout_sql = statement_timeout_sql.clone(); + Box::pin(async move { + sqlx::query(&statement_timeout_sql).execute(conn).await?; + Ok(()) + }) + }); + Ok(options.acquire_timeout(acquire_timeout)) + } + + #[async_trait] + impl AuthSqlBackend for PostgresDatabase { + async fn stored_user(&self, username: &str) -> Result> { + let row = sqlx::query("SELECT username, bcrypt_hash FROM users WHERE username = $1") + .bind(username) + .fetch_optional(&self.pool) + .await?; + row.map(|row| -> std::result::Result { + Ok(StoredUser { username: row.try_get(0)?, bcrypt_hash: row.try_get(1)? }) + }) + .transpose() + .map_err(RegistryError::from) + } + + async fn user_count(&self) -> Result { + let Some(count) = self.user_counter().await? else { + self.ensure_user_counter().await?; + return Ok(self.user_counter().await?.unwrap_or(0).max(0) as u64); + }; + Ok(count.max(0) as u64) + } + + async fn reconcile_user_counter_overcount(&self) -> Result { + self.reconcile_user_counter_overcount_impl().await + } + + async fn insert_user( + &self, + username: &str, + bcrypt_hash: &str, + max_users: MaxUsers, + ) -> Result { + let mut can_retry_after_reconcile = matches!(max_users, MaxUsers::Limited(_)); + loop { + let mut tx = self.pool.begin().await?; + match max_users { + MaxUsers::Limited(max) => { + let max = sql_max_users(max, "postgres")?; + let updated = sqlx::query( + "UPDATE auth_counters SET value = value + 1 + WHERE name = $1 AND value < $2", + ) + .bind("users") + .bind(max) + .execute(&mut *tx) + .await?; + if updated.rows_affected() == 0 { + tx.rollback().await?; + if can_retry_after_reconcile { + can_retry_after_reconcile = false; + if self.reconcile_user_counter_overcount_impl().await? { + continue; + } + } + return self.existing_or_cap_reached(username).await; + } + } + MaxUsers::Unlimited => { + sqlx::query("UPDATE auth_counters SET value = value + 1 WHERE name = $1") + .bind("users") + .execute(&mut *tx) + .await?; + } + MaxUsers::Disabled => {} + } + let inserted = + sqlx::query("INSERT INTO users (username, bcrypt_hash) VALUES ($1, $2)") + .bind(username) + .bind(bcrypt_hash) + .execute(&mut *tx) + .await; + match inserted { + Ok(_) => { + tx.commit().await?; + return Ok(InsertUser::Created); + } + Err(err) if is_unique_violation(&err) => { + tx.rollback().await?; + return self.existing_or_cap_reached(username).await; + } + Err(err) => return Err(err.into()), + } + } + } + + async fn insert_token(&self, token_hash: &str, record: &TokenRecord) -> Result<()> { + let cidr_json = serde_json::to_string(&record.cidr_whitelist) + .expect("Vec always serializes to JSON"); + sqlx::query( + "INSERT INTO tokens + (token_hash, username, created_at, last_used_at, readonly, cidr_whitelist) + VALUES ($1, $2, $3, $4, $5, $6)", + ) + .bind(token_hash) + .bind(&record.username) + .bind(token_timestamp_to_sql(record.created_at)) + .bind(token_timestamp_to_sql(record.last_used_at)) + .bind(i16::from(record.readonly)) + .bind(cidr_json) + .execute(&self.pool) + .await?; + Ok(()) + } + + async fn lookup_token(&self, token_hash: &str) -> Result> { + let row = sqlx::query("SELECT username FROM tokens WHERE token_hash = $1") + .bind(token_hash) + .fetch_optional(&self.pool) + .await?; + row.map(|row| row.try_get(0)).transpose().map_err(RegistryError::from) + } + + async fn find_token(&self, token_hash: &str) -> Result> { + let row = sqlx::query( + "SELECT username, created_at, last_used_at, readonly, cidr_whitelist + FROM tokens WHERE token_hash = $1", + ) + .bind(token_hash) + .fetch_optional(&self.pool) + .await?; + row.map(|row| token_record_from_row(&row)).transpose() + } + + async fn list_tokens(&self, username: &str) -> Result> { + let rows = sqlx::query( + "SELECT token_hash, username, created_at, last_used_at, readonly, cidr_whitelist + FROM tokens WHERE username = $1", + ) + .bind(username) + .fetch_all(&self.pool) + .await?; + rows.into_iter().map(|row| keyed_token_record_from_row(&row)).collect() + } + + async fn delete_token(&self, token_hash: &str) -> Result<()> { + sqlx::query("DELETE FROM tokens WHERE token_hash = $1") + .bind(token_hash) + .execute(&self.pool) + .await?; + Ok(()) + } + } + + impl PostgresDatabase { + async fn init_schema(&self) -> Result<()> { + sqlx::query(super::super::USERS_TABLE_SQL).execute(&self.pool).await?; + sqlx::query(super::super::TOKENS_TABLE_SQL).execute(&self.pool).await?; + sqlx::query(super::super::TOKENS_INDEX_SQL).execute(&self.pool).await?; + sqlx::query(super::super::AUTH_COUNTERS_TABLE_SQL).execute(&self.pool).await?; + self.ensure_user_counter().await + } + + async fn ensure_user_counter(&self) -> Result<()> { + let count = self.actual_user_count().await?; + if self.set_user_counter_floor(count).await? > 0 { + return Ok(()); + } + let inserted = sqlx::query("INSERT INTO auth_counters (name, value) VALUES ($1, $2)") + .bind("users") + .bind(count) + .execute(&self.pool) + .await; + match inserted { + Ok(_) => Ok(()), + Err(err) if is_unique_violation(&err) => { + self.set_user_counter_floor(count).await?; + Ok(()) + } + Err(err) => Err(err.into()), + } + } + + async fn actual_user_count(&self) -> Result { + let count: i64 = + sqlx::query_scalar("SELECT COUNT(*) FROM users").fetch_one(&self.pool).await?; + Ok(count.max(0)) + } + + async fn user_counter(&self) -> Result> { + let count: Option = + sqlx::query_scalar("SELECT value FROM auth_counters WHERE name = $1") + .bind("users") + .fetch_optional(&self.pool) + .await?; + Ok(count) + } + + async fn set_user_counter_floor(&self, count: i64) -> Result { + let updated = sqlx::query( + "UPDATE auth_counters + SET value = CASE WHEN value < $2 THEN $2 ELSE value END + WHERE name = $1", + ) + .bind("users") + .bind(count) + .execute(&self.pool) + .await?; + Ok(updated.rows_affected()) + } + + async fn reconcile_user_counter_overcount_impl(&self) -> Result { + let mut tx = self.pool.begin().await?; + let Some(counter): Option = + sqlx::query_scalar("SELECT value FROM auth_counters WHERE name = $1 FOR UPDATE") + .bind("users") + .fetch_optional(&mut *tx) + .await? + else { + tx.commit().await?; + return Ok(false); + }; + let count: i64 = + sqlx::query_scalar("SELECT COUNT(*) FROM users").fetch_one(&mut *tx).await?; + if counter <= count { + tx.commit().await?; + return Ok(false); + } + sqlx::query("UPDATE auth_counters SET value = $2 WHERE name = $1") + .bind("users") + .bind(count.max(0)) + .execute(&mut *tx) + .await?; + tx.commit().await?; + Ok(true) + } + + async fn existing_or_cap_reached(&self, username: &str) -> Result { + match self.stored_user(username).await? { + Some(stored) => Ok(InsertUser::Existing(stored)), + None => Ok(InsertUser::CapReached), + } + } + } + + fn keyed_token_record_from_row(row: &sqlx::postgres::PgRow) -> Result<(String, TokenRecord)> { + Ok((row.try_get(0)?, token_record_from_offset(row, 1)?)) + } + + fn token_record_from_row(row: &sqlx::postgres::PgRow) -> Result { + token_record_from_offset(row, 0) + } + + fn token_record_from_offset(row: &sqlx::postgres::PgRow, offset: usize) -> Result { + let cidr_json: String = row.try_get(offset + 4)?; + let cidr_whitelist: Vec = serde_json::from_str(&cidr_json).unwrap_or_default(); + let readonly: i16 = row.try_get(offset + 3)?; + Ok(TokenRecord { + username: row.try_get(offset)?, + created_at: token_timestamp_from_sql(row.try_get(offset + 1)?), + last_used_at: token_timestamp_from_sql(row.try_get(offset + 2)?), + readonly: readonly != 0, + cidr_whitelist, + }) + } + + fn is_unique_violation(err: &sqlx::Error) -> bool { + err.as_database_error() + .and_then(sqlx::error::DatabaseError::code) + .is_some_and(|code| code.as_ref() == "23505") + } +} + +#[cfg(feature = "backend-mysql")] +pub(super) mod mysql { + use super::super::{TokenRecord, token_timestamp_from_sql, token_timestamp_to_sql}; + use super::{ + AuthSqlBackend, InsertUser, SqlAuth, StoredUser, invalid_pool_size, sql_max_users, + timeout_millis, timeout_seconds, with_auth_timeout, + }; + use crate::{ + config::{MaxUsers, SqlBackendSettings}, + error::{RegistryError, Result}, + }; + use async_trait::async_trait; + use sqlx::{MySqlPool, Row, mysql::MySqlPoolOptions}; + use std::time::Duration; + + #[derive(Debug)] + pub(in crate::auth) struct MysqlDatabase { + pool: MySqlPool, + } + + pub(in crate::auth) type MysqlAuth = SqlAuth; + + impl SqlAuth { + pub(in crate::auth) async fn connect( + settings: &SqlBackendSettings, + max_users: MaxUsers, + ) -> Result { + let startup_options = + mysql_pool_options(settings, settings.startup_timeout, settings.startup_timeout)?; + let startup_pool = + with_auth_timeout(settings.startup_timeout, startup_options.connect(&settings.url)) + .await?; + let startup_db = MysqlDatabase { pool: startup_pool }; + with_auth_timeout(settings.startup_timeout, startup_db.init_schema()).await?; + startup_db.pool.close().await; + + let pool = mysql_pool_options(settings, settings.timeout, settings.timeout)? + .connect_lazy(&settings.url)?; + let db = MysqlDatabase { pool }; + Ok(SqlAuth::new(db, max_users, settings.timeout)) + } + } + + fn mysql_pool_options( + settings: &SqlBackendSettings, + session_timeout: Duration, + acquire_timeout: Duration, + ) -> Result { + let mut options = MySqlPoolOptions::new(); + if let Some(max_connections) = settings.max_connections { + if max_connections == 0 { + return Err(invalid_pool_size("mysql")); + } + options = options.max_connections(max_connections); + } + let statement_timeout_sql = + format!("SET SESSION max_execution_time = {}", timeout_millis(session_timeout)); + let row_lock_timeout_sql = + format!("SET SESSION innodb_lock_wait_timeout = {}", timeout_seconds(session_timeout)); + let metadata_lock_timeout_sql = + format!("SET SESSION lock_wait_timeout = {}", timeout_seconds(session_timeout)); + options = options.after_connect(move |conn, _meta| { + let statement_timeout_sql = statement_timeout_sql.clone(); + let row_lock_timeout_sql = row_lock_timeout_sql.clone(); + let metadata_lock_timeout_sql = metadata_lock_timeout_sql.clone(); + Box::pin(async move { + sqlx::query(&statement_timeout_sql).execute(&mut *conn).await?; + sqlx::query(&row_lock_timeout_sql).execute(&mut *conn).await?; + sqlx::query(&metadata_lock_timeout_sql).execute(conn).await?; + Ok(()) + }) + }); + Ok(options.acquire_timeout(acquire_timeout)) + } + + #[async_trait] + impl AuthSqlBackend for MysqlDatabase { + async fn stored_user(&self, username: &str) -> Result> { + let row = sqlx::query("SELECT username, bcrypt_hash FROM users WHERE username = ?") + .bind(username) + .fetch_optional(&self.pool) + .await?; + row.map(|row| -> std::result::Result { + Ok(StoredUser { username: row.try_get(0)?, bcrypt_hash: row.try_get(1)? }) + }) + .transpose() + .map_err(RegistryError::from) + } + + async fn user_count(&self) -> Result { + let Some(count) = self.user_counter().await? else { + self.ensure_user_counter().await?; + return Ok(self.user_counter().await?.unwrap_or(0).max(0) as u64); + }; + Ok(count.max(0) as u64) + } + + async fn reconcile_user_counter_overcount(&self) -> Result { + self.reconcile_user_counter_overcount_impl().await + } + + async fn insert_user( + &self, + username: &str, + bcrypt_hash: &str, + max_users: MaxUsers, + ) -> Result { + let mut can_retry_after_reconcile = matches!(max_users, MaxUsers::Limited(_)); + loop { + let mut tx = self.pool.begin().await?; + match max_users { + MaxUsers::Limited(max) => { + let max = sql_max_users(max, "mysql")?; + let updated = sqlx::query( + "UPDATE auth_counters SET value = value + 1 + WHERE name = ? AND value < ?", + ) + .bind("users") + .bind(max) + .execute(&mut *tx) + .await?; + if updated.rows_affected() == 0 { + tx.rollback().await?; + if can_retry_after_reconcile { + can_retry_after_reconcile = false; + if self.reconcile_user_counter_overcount_impl().await? { + continue; + } + } + return self.existing_or_cap_reached(username).await; + } + } + MaxUsers::Unlimited => { + sqlx::query("UPDATE auth_counters SET value = value + 1 WHERE name = ?") + .bind("users") + .execute(&mut *tx) + .await?; + } + MaxUsers::Disabled => {} + } + let inserted = + sqlx::query("INSERT INTO users (username, bcrypt_hash) VALUES (?, ?)") + .bind(username) + .bind(bcrypt_hash) + .execute(&mut *tx) + .await; + match inserted { + Ok(_) => { + tx.commit().await?; + return Ok(InsertUser::Created); + } + Err(err) if is_unique_violation(&err) => { + tx.rollback().await?; + return self.existing_or_cap_reached(username).await; + } + Err(err) => return Err(err.into()), + } + } + } + + async fn insert_token(&self, token_hash: &str, record: &TokenRecord) -> Result<()> { + let cidr_json = serde_json::to_string(&record.cidr_whitelist) + .expect("Vec always serializes to JSON"); + sqlx::query( + "INSERT INTO tokens + (token_hash, username, created_at, last_used_at, readonly, cidr_whitelist) + VALUES (?, ?, ?, ?, ?, ?)", + ) + .bind(token_hash) + .bind(&record.username) + .bind(token_timestamp_to_sql(record.created_at)) + .bind(token_timestamp_to_sql(record.last_used_at)) + .bind(i16::from(record.readonly)) + .bind(cidr_json) + .execute(&self.pool) + .await?; + Ok(()) + } + + async fn lookup_token(&self, token_hash: &str) -> Result> { + let row = sqlx::query("SELECT username FROM tokens WHERE token_hash = ?") + .bind(token_hash) + .fetch_optional(&self.pool) + .await?; + row.map(|row| row.try_get(0)).transpose().map_err(RegistryError::from) + } + + async fn find_token(&self, token_hash: &str) -> Result> { + let row = sqlx::query( + "SELECT username, created_at, last_used_at, readonly, cidr_whitelist + FROM tokens WHERE token_hash = ?", + ) + .bind(token_hash) + .fetch_optional(&self.pool) + .await?; + row.map(|row| token_record_from_row(&row)).transpose() + } + + async fn list_tokens(&self, username: &str) -> Result> { + let rows = sqlx::query( + "SELECT token_hash, username, created_at, last_used_at, readonly, cidr_whitelist + FROM tokens WHERE username = ?", + ) + .bind(username) + .fetch_all(&self.pool) + .await?; + rows.into_iter().map(|row| keyed_token_record_from_row(&row)).collect() + } + + async fn delete_token(&self, token_hash: &str) -> Result<()> { + sqlx::query("DELETE FROM tokens WHERE token_hash = ?") + .bind(token_hash) + .execute(&self.pool) + .await?; + Ok(()) + } + } + + impl MysqlDatabase { + async fn init_schema(&self) -> Result<()> { + sqlx::query(super::super::USERS_TABLE_SQL).execute(&self.pool).await?; + sqlx::query(super::super::TOKENS_TABLE_SQL).execute(&self.pool).await?; + create_token_index(&self.pool).await?; + sqlx::query(super::super::AUTH_COUNTERS_TABLE_SQL).execute(&self.pool).await?; + self.ensure_user_counter().await + } + + async fn ensure_user_counter(&self) -> Result<()> { + let count = self.actual_user_count().await?; + if self.user_counter().await?.is_some() { + self.set_user_counter_floor(count).await?; + return Ok(()); + } + let inserted = sqlx::query("INSERT INTO auth_counters (name, value) VALUES (?, ?)") + .bind("users") + .bind(count) + .execute(&self.pool) + .await; + match inserted { + Ok(_) => Ok(()), + Err(err) if is_unique_violation(&err) => { + self.set_user_counter_floor(count).await?; + Ok(()) + } + Err(err) => Err(err.into()), + } + } + + async fn actual_user_count(&self) -> Result { + let count: i64 = + sqlx::query_scalar("SELECT COUNT(*) FROM users").fetch_one(&self.pool).await?; + Ok(count.max(0)) + } + + async fn user_counter(&self) -> Result> { + let count: Option = + sqlx::query_scalar("SELECT value FROM auth_counters WHERE name = ?") + .bind("users") + .fetch_optional(&self.pool) + .await?; + Ok(count) + } + + async fn set_user_counter_floor(&self, count: i64) -> Result { + let updated = sqlx::query( + "UPDATE auth_counters + SET value = CASE WHEN value < ? THEN ? ELSE value END + WHERE name = ?", + ) + .bind(count) + .bind(count) + .bind("users") + .execute(&self.pool) + .await?; + Ok(updated.rows_affected()) + } + + async fn reconcile_user_counter_overcount_impl(&self) -> Result { + let mut tx = self.pool.begin().await?; + let Some(counter): Option = + sqlx::query_scalar("SELECT value FROM auth_counters WHERE name = ? FOR UPDATE") + .bind("users") + .fetch_optional(&mut *tx) + .await? + else { + tx.commit().await?; + return Ok(false); + }; + let count: i64 = + sqlx::query_scalar("SELECT COUNT(*) FROM users").fetch_one(&mut *tx).await?; + if counter <= count { + tx.commit().await?; + return Ok(false); + } + sqlx::query("UPDATE auth_counters SET value = ? WHERE name = ?") + .bind(count.max(0)) + .bind("users") + .execute(&mut *tx) + .await?; + tx.commit().await?; + Ok(true) + } + + async fn existing_or_cap_reached(&self, username: &str) -> Result { + match self.stored_user(username).await? { + Some(stored) => Ok(InsertUser::Existing(stored)), + None => Ok(InsertUser::CapReached), + } + } + } + + async fn create_token_index(pool: &MySqlPool) -> Result<()> { + let result = + sqlx::query("CREATE INDEX tokens_username ON tokens(username)").execute(pool).await; + match result { + Ok(_) => Ok(()), + Err(err) if is_duplicate_index(&err) => Ok(()), + Err(err) => Err(err.into()), + } + } + + fn keyed_token_record_from_row(row: &sqlx::mysql::MySqlRow) -> Result<(String, TokenRecord)> { + Ok((row.try_get(0)?, token_record_from_offset(row, 1)?)) + } + + fn token_record_from_row(row: &sqlx::mysql::MySqlRow) -> Result { + token_record_from_offset(row, 0) + } + + fn token_record_from_offset(row: &sqlx::mysql::MySqlRow, offset: usize) -> Result { + let cidr_json: String = row.try_get(offset + 4)?; + let cidr_whitelist: Vec = serde_json::from_str(&cidr_json).unwrap_or_default(); + let readonly: i16 = row.try_get(offset + 3)?; + Ok(TokenRecord { + username: row.try_get(offset)?, + created_at: token_timestamp_from_sql(row.try_get(offset + 1)?), + last_used_at: token_timestamp_from_sql(row.try_get(offset + 2)?), + readonly: readonly != 0, + cidr_whitelist, + }) + } + + fn is_unique_violation(err: &sqlx::Error) -> bool { + err.as_database_error().is_some_and(|err| { + err.code().is_some_and(|code| code.as_ref() == "23000" || code.as_ref() == "1062") + }) + } + + fn is_duplicate_index(err: &sqlx::Error) -> bool { + err.as_database_error() + .and_then(|err| err.try_downcast_ref::()) + .is_some_and(|err| err.number() == 1061) + } +} + +fn invalid_pool_size(backend: &str) -> RegistryError { + RegistryError::InvalidConfig { + reason: format!("backend.{backend}.maxConnections must be greater than 0"), + } +} + +fn sql_max_users(max: u64, backend: &str) -> Result { + i64::try_from(max).map_err(|_| RegistryError::InvalidConfig { + reason: format!("backend.{backend} auth max_users must fit a signed BIGINT"), + }) +} + +#[cfg(any(feature = "backend-postgres", feature = "backend-mysql"))] +fn timeout_millis(timeout: Duration) -> u128 { + timeout.as_millis().max(1) +} + +#[cfg(feature = "backend-mysql")] +fn timeout_seconds(timeout: Duration) -> u64 { + timeout.as_secs().max(1) +} + +#[cfg(test)] +mod tests { + use super::*; + use std::sync::Arc; + + struct CanonicalBackend { + user: StoredUser, + } + + struct SlowLookupBackend; + + struct SlowWriteBackend; + + struct CountingLookupBackend { + stored_user_calls: Arc, + } + + struct CappedBackend { + reconcile_calls: Arc, + } + + #[async_trait] + impl AuthSqlBackend for CanonicalBackend { + async fn stored_user(&self, _username: &str) -> Result> { + Ok(Some(self.user.clone())) + } + + async fn user_count(&self) -> Result { + Ok(1) + } + + async fn reconcile_user_counter_overcount(&self) -> Result { + Ok(false) + } + + async fn insert_user( + &self, + _username: &str, + _bcrypt_hash: &str, + _max_users: MaxUsers, + ) -> Result { + Ok(InsertUser::Existing(self.user.clone())) + } + + async fn insert_token(&self, _token_hash: &str, _record: &TokenRecord) -> Result<()> { + Ok(()) + } + + async fn lookup_token(&self, _token_hash: &str) -> Result> { + Ok(None) + } + + async fn find_token(&self, _token_hash: &str) -> Result> { + Ok(None) + } + + async fn list_tokens(&self, _username: &str) -> Result> { + Ok(Vec::new()) + } + + async fn delete_token(&self, _token_hash: &str) -> Result<()> { + Ok(()) + } + } + + #[async_trait] + impl AuthSqlBackend for SlowLookupBackend { + async fn stored_user(&self, _username: &str) -> Result> { + Ok(None) + } + + async fn user_count(&self) -> Result { + Ok(0) + } + + async fn reconcile_user_counter_overcount(&self) -> Result { + Ok(false) + } + + async fn insert_user( + &self, + _username: &str, + _bcrypt_hash: &str, + _max_users: MaxUsers, + ) -> Result { + Ok(InsertUser::CapReached) + } + + async fn insert_token(&self, _token_hash: &str, _record: &TokenRecord) -> Result<()> { + Ok(()) + } + + async fn lookup_token(&self, _token_hash: &str) -> Result> { + tokio::time::sleep(Duration::from_mins(1)).await; + Ok(None) + } + + async fn find_token(&self, _token_hash: &str) -> Result> { + Ok(None) + } + + async fn list_tokens(&self, _username: &str) -> Result> { + Ok(Vec::new()) + } + + async fn delete_token(&self, _token_hash: &str) -> Result<()> { + Ok(()) + } + } + + #[async_trait] + impl AuthSqlBackend for SlowWriteBackend { + async fn stored_user(&self, _username: &str) -> Result> { + Ok(None) + } + + async fn user_count(&self) -> Result { + Ok(0) + } + + async fn reconcile_user_counter_overcount(&self) -> Result { + Ok(false) + } + + async fn insert_user( + &self, + _username: &str, + _bcrypt_hash: &str, + _max_users: MaxUsers, + ) -> Result { + Ok(InsertUser::Created) + } + + async fn insert_token(&self, _token_hash: &str, _record: &TokenRecord) -> Result<()> { + tokio::time::sleep(Duration::from_millis(20)).await; + Ok(()) + } + + async fn lookup_token(&self, _token_hash: &str) -> Result> { + Ok(None) + } + + async fn find_token(&self, _token_hash: &str) -> Result> { + Ok(None) + } + + async fn list_tokens(&self, _username: &str) -> Result> { + Ok(Vec::new()) + } + + async fn delete_token(&self, _token_hash: &str) -> Result<()> { + tokio::time::sleep(Duration::from_millis(20)).await; + Ok(()) + } + } + + #[async_trait] + impl AuthSqlBackend for CountingLookupBackend { + async fn stored_user(&self, _username: &str) -> Result> { + self.stored_user_calls.fetch_add(1, Ordering::SeqCst); + Ok(None) + } + + async fn user_count(&self) -> Result { + Ok(0) + } + + async fn reconcile_user_counter_overcount(&self) -> Result { + Ok(false) + } + + async fn insert_user( + &self, + _username: &str, + _bcrypt_hash: &str, + _max_users: MaxUsers, + ) -> Result { + Ok(InsertUser::Created) + } + + async fn insert_token(&self, _token_hash: &str, _record: &TokenRecord) -> Result<()> { + Ok(()) + } + + async fn lookup_token(&self, _token_hash: &str) -> Result> { + Ok(None) + } + + async fn find_token(&self, _token_hash: &str) -> Result> { + Ok(None) + } + + async fn list_tokens(&self, _username: &str) -> Result> { + Ok(Vec::new()) + } + + async fn delete_token(&self, _token_hash: &str) -> Result<()> { + Ok(()) + } + } + + #[async_trait] + impl AuthSqlBackend for CappedBackend { + async fn stored_user(&self, _username: &str) -> Result> { + Ok(None) + } + + async fn user_count(&self) -> Result { + Ok(1) + } + + async fn reconcile_user_counter_overcount(&self) -> Result { + self.reconcile_calls.fetch_add(1, Ordering::SeqCst); + Ok(false) + } + + async fn insert_user( + &self, + _username: &str, + _bcrypt_hash: &str, + _max_users: MaxUsers, + ) -> Result { + panic!("capped precheck should reject before insert_user") + } + + async fn insert_token(&self, _token_hash: &str, _record: &TokenRecord) -> Result<()> { + Ok(()) + } + + async fn lookup_token(&self, _token_hash: &str) -> Result> { + Ok(None) + } + + async fn find_token(&self, _token_hash: &str) -> Result> { + Ok(None) + } + + async fn list_tokens(&self, _username: &str) -> Result> { + Ok(Vec::new()) + } + + async fn delete_token(&self, _token_hash: &str) -> Result<()> { + Ok(()) + } + } + + #[tokio::test] + async fn verify_returns_the_stored_username() { + let bcrypt_hash = bcrypt::hash("secret", 4).unwrap(); + let auth = SqlAuth::new( + CanonicalBackend { user: StoredUser { username: "Alice".to_string(), bcrypt_hash } }, + MaxUsers::Unlimited, + Duration::from_secs(30), + ); + + assert_eq!(auth.verify("alice", "secret").await.unwrap().as_deref(), Some("Alice")); + } + + #[tokio::test] + async fn verify_propagates_corrupt_hash_errors() { + let auth = SqlAuth::new( + CanonicalBackend { + user: StoredUser { + username: "Alice".to_string(), + bcrypt_hash: "not-a-bcrypt-hash".to_string(), + }, + }, + MaxUsers::Unlimited, + Duration::from_secs(30), + ); + + let err = auth.verify("alice", "secret").await.unwrap_err(); + + assert!(matches!(err, RegistryError::Bcrypt(_)), "got {err:?}"); + } + + #[tokio::test] + async fn verify_skips_unbounded_usernames_without_db_lookup() { + let stored_user_calls = Arc::new(AtomicU64::new(0)); + let auth = SqlAuth::new( + CountingLookupBackend { stored_user_calls: Arc::clone(&stored_user_calls) }, + MaxUsers::Unlimited, + Duration::from_secs(30), + ); + let overlong = "a".repeat(MAX_USERNAME_CHARS + 1); + + assert_eq!(auth.verify("", "secret").await.unwrap(), None); + assert_eq!(auth.verify(&overlong, "secret").await.unwrap(), None); + assert_eq!(stored_user_calls.load(Ordering::SeqCst), 0); + } + + #[tokio::test] + async fn add_or_login_returns_the_stored_username_for_existing_users() { + let bcrypt_hash = bcrypt::hash("secret", 4).unwrap(); + let auth = SqlAuth::new( + CanonicalBackend { user: StoredUser { username: "Alice".to_string(), bcrypt_hash } }, + MaxUsers::Unlimited, + Duration::from_secs(30), + ); + + let outcome = auth.add_or_login(" alice", "secret").await.unwrap(); + + assert!(matches!(outcome, (UpsertOutcome::LoggedIn, _))); + assert_eq!(outcome.1, "Alice"); + } + + #[tokio::test] + async fn add_or_login_rejects_unbounded_usernames_without_db_lookup() { + let stored_user_calls = Arc::new(AtomicU64::new(0)); + let auth = SqlAuth::new( + CountingLookupBackend { stored_user_calls: Arc::clone(&stored_user_calls) }, + MaxUsers::Unlimited, + Duration::from_secs(30), + ); + let overlong = "a".repeat(MAX_USERNAME_CHARS + 1); + + let empty_err = auth.add_or_login("", "secret").await.unwrap_err(); + assert!( + matches!(empty_err, RegistryError::BadRequest { reason } if reason == "username must not be empty") + ); + let overlong_err = auth.add_or_login(&overlong, "secret").await.unwrap_err(); + assert!( + matches!(overlong_err, RegistryError::BadRequest { reason } if reason == format!("username must be at most {MAX_USERNAME_CHARS} characters")) + ); + assert_eq!(stored_user_calls.load(Ordering::SeqCst), 0); + } + + #[tokio::test] + async fn add_or_login_rate_limits_capped_reconciliation() { + let reconcile_calls = Arc::new(AtomicU64::new(0)); + let auth = SqlAuth::new( + CappedBackend { reconcile_calls: Arc::clone(&reconcile_calls) }, + MaxUsers::Limited(1), + Duration::from_secs(30), + ); + + for username in ["alice", "bob", "carol"] { + let err = auth.add_or_login(username, "secret").await.unwrap_err(); + assert!(matches!(err, RegistryError::TooManyUsers { max: 1 })); + } + + assert_eq!(reconcile_calls.load(Ordering::SeqCst), 1); + } + + #[tokio::test] + async fn token_lookup_times_out_when_the_backend_stalls() { + let auth = SqlAuth::new(SlowLookupBackend, MaxUsers::Unlimited, Duration::from_millis(1)); + + let err = auth.lookup("token").await.unwrap_err(); + + assert!(matches!(err, RegistryError::AuthDatabaseTimeout)); + } + + #[tokio::test] + async fn token_issue_waits_for_slow_backend_write() { + let auth = SqlAuth::new(SlowWriteBackend, MaxUsers::Unlimited, Duration::from_millis(1)); + + let token = auth.issue("alice").await.unwrap(); + + assert!(!token.is_empty()); + } +} diff --git a/pnpr/crates/pnpr/src/auth/tests.rs b/pnpr/crates/pnpr/src/auth/tests.rs index bfe174bc0a..444ad343eb 100644 --- a/pnpr/crates/pnpr/src/auth/tests.rs +++ b/pnpr/crates/pnpr/src/auth/tests.rs @@ -1,5 +1,6 @@ use super::{ - TokenBackend, TokenStore, UpsertOutcome, UserBackend, UserStore, identify, parse_htpasswd, + MAX_USERNAME_CHARS, TokenBackend, TokenStore, UpsertOutcome, UserBackend, UserStore, identify, + parse_htpasswd, token_timestamp_from_sql, token_timestamp_to_sql, validate_username, }; use crate::config::MaxUsers; use std::sync::Arc; @@ -19,14 +20,86 @@ fn test_user_store() -> UserStore { } } +#[test] +fn username_length_limit_matches_sql_schema() { + let max = "a".repeat(MAX_USERNAME_CHARS); + let too_long = "a".repeat(MAX_USERNAME_CHARS + 1); + validate_username(&max).unwrap(); + let err = validate_username(&too_long).unwrap_err(); + assert_eq!(err.status_code(), axum::http::StatusCode::BAD_REQUEST); +} + +#[test] +fn token_timestamp_from_sql_clamps_negative_values() { + assert_eq!(token_timestamp_from_sql(-1), 0); + assert_eq!(token_timestamp_from_sql(0), 0); + assert_eq!(token_timestamp_from_sql(42), 42); +} + +#[test] +fn token_timestamp_to_sql_saturates_overflow() { + assert_eq!(token_timestamp_to_sql(42), 42); + assert_eq!(token_timestamp_to_sql(u64::MAX), i64::MAX); +} + +#[test] +fn username_validation_rejects_htpasswd_structural_characters() { + for username in ["", "alice:bob", "alice\nbob", "alice\rbob", "alice\0bob", "alice\u{7f}bob"] { + let err = validate_username(username).unwrap_err(); + assert_eq!( + err.status_code(), + axum::http::StatusCode::BAD_REQUEST, + "expected {username:?} to be rejected", + ); + } +} + +#[test] +fn username_validation_rejects_names_that_trim_differently() { + for username in [" alice", "alice ", " alice "] { + let err = validate_username(username).unwrap_err(); + assert_eq!( + err.status_code(), + axum::http::StatusCode::BAD_REQUEST, + "expected {username:?} to be rejected", + ); + } +} + +#[tokio::test] +async fn user_store_rejects_invalid_username_before_persisting() { + let store = test_user_store(); + let err = store.add_or_login("alice:bob", "secret").await.unwrap_err(); + assert_eq!(err.status_code(), axum::http::StatusCode::BAD_REQUEST); + assert!( + store.users.lock().expect("UserStore mutex poisoned").is_empty(), + "invalid username should be rejected before any persistence", + ); +} + +#[tokio::test] +async fn user_store_allows_existing_legacy_username_to_login() { + let store = test_user_store(); + let legacy = format!("{}:", "a".repeat(MAX_USERNAME_CHARS)); + let hash = bcrypt::hash("secret", TEST_COST).unwrap(); + store.users.lock().expect("UserStore mutex poisoned").insert(legacy.clone(), hash); + + let outcome = store.add_or_login(&legacy, "secret").await.unwrap(); + + assert!(matches!(outcome, (UpsertOutcome::LoggedIn, _))); + assert_eq!(outcome.1, legacy); +} + #[tokio::test] async fn adduser_creates_then_validates() { let store = test_user_store(); let outcome = store.add_or_login("alice", "secret").await.unwrap(); - assert!(matches!(outcome, UpsertOutcome::Created)); + assert!(matches!(outcome, (UpsertOutcome::Created, _))); + assert_eq!(outcome.1, "alice"); let outcome = store.add_or_login("alice", "secret").await.unwrap(); - assert!(matches!(outcome, UpsertOutcome::LoggedIn)); + assert!(matches!(outcome, (UpsertOutcome::LoggedIn, _))); + assert_eq!(outcome.1, "alice"); assert!(store.verify("alice", "secret").await.unwrap().is_some()); assert!(store.verify("alice", "wrong").await.unwrap().is_none()); @@ -71,7 +144,7 @@ async fn adduser_rejects_same_username_concurrent_registration_with_different_pa let result_b = add_b.await.unwrap(); let created = [result_a.as_ref(), result_b.as_ref()] .into_iter() - .filter(|result| matches!(result, Ok(UpsertOutcome::Created))) + .filter(|result| matches!(result, Ok((UpsertOutcome::Created, _)))) .count(); let unauthorized = [&result_a, &result_b] .into_iter() @@ -103,7 +176,7 @@ async fn adduser_persists_across_reopen() { // Cold-load from disk; the hashed entry should still verify. let reopened = UserStore::open_with_cost(path.clone(), MaxUsers::Unlimited, TEST_COST).unwrap(); let outcome = reopened.add_or_login("alice", "secret").await.unwrap(); - assert!(matches!(outcome, UpsertOutcome::LoggedIn)); + assert!(matches!(outcome, (UpsertOutcome::LoggedIn, _))); assert!(reopened.verify("alice", "secret").await.unwrap().is_some()); } @@ -198,6 +271,29 @@ async fn tokens_persist_across_reopen() { ); } +#[tokio::test] +async fn tokens_clamp_negative_persisted_timestamps() { + let tmp = tempfile::tempdir().unwrap(); + let path = tmp.path().join("tokens.db"); + let conn = rusqlite::Connection::open(&path).unwrap(); + super::init_tokens_schema(&conn).unwrap(); + conn.execute( + "INSERT INTO tokens + (token_hash, username, created_at, last_used_at, readonly, cidr_whitelist) + VALUES (?1, ?2, ?3, ?4, ?5, ?6)", + rusqlite::params!["token-hash", "alice", -1_i64, -42_i64, 0_i64, "[]"], + ) + .unwrap(); + drop(conn); + + let store = TokenStore::open(path).unwrap(); + let records = store.list_for_user("alice").await.unwrap(); + + assert_eq!(records.len(), 1); + assert_eq!(records[0].1.created_at, 0); + assert_eq!(records[0].1.last_used_at, 0); +} + #[tokio::test] async fn tokens_db_stores_hash_not_raw() { let tmp = tempfile::tempdir().unwrap(); @@ -216,6 +312,22 @@ async fn tokens_db_stores_hash_not_raw() { assert_eq!(stored.len(), 64, "SHA-256 hex is 64 chars"); } +#[tokio::test] +async fn token_issue_rolls_back_memory_when_sqlite_persistence_fails() { + let tmp = tempfile::tempdir().unwrap(); + let path = tmp.path().join("tokens.db"); + let store = TokenStore::open(path.clone()).unwrap(); + rusqlite::Connection::open(&path).unwrap().execute("DROP TABLE tokens", []).unwrap(); + + let err = store.issue("alice").await.unwrap_err(); + + assert_eq!(err.status_code(), axum::http::StatusCode::INTERNAL_SERVER_ERROR); + assert!( + store.inner.lock().expect("TokenStore mutex poisoned").tokens.is_empty(), + "failed persistence must not leave an in-memory bearer token active", + ); +} + #[tokio::test] async fn identify_recognizes_bearer_and_basic() { let users = test_user_store(); diff --git a/pnpr/crates/pnpr/src/config.rs b/pnpr/crates/pnpr/src/config.rs index 10acd0a5aa..03cd48671a 100644 --- a/pnpr/crates/pnpr/src/config.rs +++ b/pnpr/crates/pnpr/src/config.rs @@ -106,8 +106,8 @@ pub struct Config { pub hosted_store: HostedStoreConfig, /// Which record store backs the auth state (users + tokens). /// Defaults to [`BackendConfig::Local`] — today's htpasswd file - /// plus `SQLite` token database. The YAML `backend.libsql:` block - /// switches both to a shared networked-SQLite database so several + /// plus `SQLite` token database. The YAML `backend:` block can + /// switch both stores to one shared SQL database so several /// stateless pnpr replicas see a consistent set of accounts. pub backend: BackendConfig, /// Optional local OSV database used by the resolver to reject known @@ -135,8 +135,8 @@ pub enum HostedStoreConfig { /// The resolved record-store backend for auth (users + tokens). Unlike /// [`HostedStoreConfig`], this only carries the parsed settings — the -/// fallible step (connecting to the networked database and ensuring its -/// schema) is async, so it runs in `AuthState::load` rather than at +/// fallible step (connecting to the database and ensuring its schema) +/// is async, so it runs in `AuthState::load` rather than at /// config-parse time. #[derive(Debug, Default, Clone)] pub enum BackendConfig { @@ -147,6 +147,10 @@ pub enum BackendConfig { /// Networked `SQLite` (libsql / Turso): both records live in one /// shared database reachable over the network. Libsql(LibsqlSettings), + /// `PostgreSQL`: both records live in one shared database. + Postgres(SqlBackendSettings), + /// `MySQL`-compatible database: both records live in one shared database. + Mysql(SqlBackendSettings), } /// The YAML `backend.libsql:` block. Whole-file `${ENV}` substitution @@ -183,6 +187,31 @@ impl LibsqlSettings { pub const DEFAULT_SYNC_INTERVAL_SECS: u64 = 60; } +/// The YAML `backend.postgres:` and `backend.mysql:` blocks. Whole-file +/// `${ENV}` substitution runs before parsing, so `url` can hold +/// `${...}` refs and keep credentials out of the committed config. +#[derive(Debug, Clone, Deserialize)] +#[serde(rename_all = "camelCase")] +pub struct SqlBackendSettings { + /// Driver connection URL, e.g. `postgres://user:pass@host/db` or + /// `mysql://user:pass@host/db`. + pub url: String, + /// Maximum connections in the backend pool. Defaults to the + /// driver's pool default when omitted. + pub max_connections: Option, + /// Deadline for request-path auth database operations. + pub timeout: Duration, + /// Deadline for initial auth database connect and schema setup. + pub startup_timeout: Duration, +} + +impl SqlBackendSettings { + /// Default request-path auth database deadline. + pub const DEFAULT_TIMEOUT: Duration = Duration::from_secs(30); + /// Default startup auth database deadline. + pub const DEFAULT_STARTUP_TIMEOUT: Duration = Duration::from_mins(5); +} + /// Auth-related runtime configuration. Built from the YAML /// `auth:` block plus runtime defaults. #[derive(Debug, Default, Clone)] @@ -724,8 +753,8 @@ struct ConfigFile { #[serde(default)] s3: Option, /// pnpr-only block: back the auth record stores (users + tokens) - /// with a networked `SQLite` database. Absent on a stock verdaccio - /// config (silently ignored there). + /// with a shared SQL database. Absent on a stock verdaccio config + /// (silently ignored there). #[serde(default)] backend: Option, /// pnpr-only local OSV database settings. @@ -771,6 +800,24 @@ struct AuthFile { struct BackendFile { #[serde(default)] libsql: Option, + #[serde(default)] + postgres: Option, + #[serde(default)] + postgresql: Option, + #[serde(default)] + mysql: Option, +} + +#[derive(Debug, Deserialize)] +#[serde(rename_all = "camelCase")] +struct SqlBackendFile { + url: String, + #[serde(default)] + max_connections: Option, + #[serde(default)] + timeout: Option, + #[serde(default)] + startup_timeout: Option, } #[derive(Debug, Default, Deserialize)] @@ -971,20 +1018,7 @@ impl Config { } None => HostedStoreConfig::Fs, }; - let backend = match file.backend.and_then(|block| block.libsql) { - Some(mut settings) => { - // Resolve a relative `replicaPath` against the config - // file's directory, the same convention `storage` and - // the auth files follow, so `./auth-replica.db` lands - // next to the config rather than in the process CWD. - if let Some(path) = settings.replica_path.take() { - settings.replica_path = - Some(if path.is_absolute() { path } else { base_dir.join(path) }); - } - BackendConfig::Libsql(settings) - } - None => BackendConfig::Local, - }; + let backend = build_backend_config(file.backend, base_dir)?; let public_url = public_url.unwrap_or_else(|| format!("http://{listen}")); let auth = build_auth_config(&file.auth, base_dir); let logs = build_log_config(file.log.as_ref()); @@ -1063,6 +1097,98 @@ fn build_osv_config(file: &OsvFile, base_dir: &Path) -> OsvConfig { } } +fn build_backend_config( + file: Option, + base_dir: &Path, +) -> Result { + let Some(file) = file else { + return Ok(BackendConfig::Local); + }; + let mut selected = Vec::new(); + if let Some(mut settings) = file.libsql { + resolve_libsql_paths(&mut settings, base_dir); + selected.push(("libsql", BackendConfig::Libsql(settings))); + } + if let Some(settings) = file.postgres { + selected.push(( + "postgres", + BackendConfig::Postgres(build_sql_backend_settings("postgres", settings)?), + )); + } + if let Some(settings) = file.postgresql { + selected.push(( + "postgresql", + BackendConfig::Postgres(build_sql_backend_settings("postgresql", settings)?), + )); + } + if let Some(settings) = file.mysql { + selected + .push(("mysql", BackendConfig::Mysql(build_sql_backend_settings("mysql", settings)?))); + } + match selected.len() { + 0 => Err(RegistryError::InvalidConfig { + reason: "backend must select exactly one database backend".to_string(), + }), + 1 => Ok(selected.remove(0).1), + _ => { + let names = selected.into_iter().map(|(name, _)| name).collect::>().join(", "); + Err(RegistryError::InvalidConfig { + reason: format!("backend must select exactly one database backend, got {names}"), + }) + } + } +} + +fn build_sql_backend_settings( + backend: &str, + file: SqlBackendFile, +) -> Result { + let timeout = parse_backend_interval(backend, "timeout", file.timeout.as_ref())? + .unwrap_or(SqlBackendSettings::DEFAULT_TIMEOUT); + if timeout.is_zero() { + return Err(RegistryError::InvalidConfig { + reason: format!("backend.{backend}.timeout must be greater than 0"), + }); + } + let startup_timeout = + parse_backend_interval(backend, "startupTimeout", file.startup_timeout.as_ref())? + .unwrap_or(SqlBackendSettings::DEFAULT_STARTUP_TIMEOUT); + if startup_timeout.is_zero() { + return Err(RegistryError::InvalidConfig { + reason: format!("backend.{backend}.startupTimeout must be greater than 0"), + }); + } + Ok(SqlBackendSettings { + url: file.url, + max_connections: file.max_connections, + timeout, + startup_timeout, + }) +} + +fn parse_backend_interval( + backend: &str, + field: &str, + raw: Option<&Interval>, +) -> Result, RegistryError> { + raw.map(|Interval(value)| { + parse_interval(value).ok_or_else(|| RegistryError::InvalidConfig { + reason: format!("backend.{backend}.{field} has an invalid interval {value:?}"), + }) + }) + .transpose() +} + +fn resolve_libsql_paths(settings: &mut LibsqlSettings, base_dir: &Path) { + // Resolve a relative `replicaPath` against the config file's + // directory, the same convention `storage` and the auth files + // follow, so `./auth-replica.db` lands next to the config rather + // than in the process CWD. + if let Some(path) = settings.replica_path.take() { + settings.replica_path = Some(if path.is_absolute() { path } else { base_dir.join(path) }); + } +} + /// Lift the YAML `log:` object's `format` / `level` onto runtime /// defaults. Missing block = default pretty/info config; missing /// individual fields fall back to their `Default` impls. diff --git a/pnpr/crates/pnpr/src/config/tests.rs b/pnpr/crates/pnpr/src/config/tests.rs index 0ee83efdd3..caaa82253f 100644 --- a/pnpr/crates/pnpr/src/config/tests.rs +++ b/pnpr/crates/pnpr/src/config/tests.rs @@ -10,6 +10,7 @@ use reqwest::header::AUTHORIZATION; use std::{ net::{Ipv4Addr, SocketAddr, SocketAddrV4}, path::{Path, PathBuf}, + time::Duration, }; /// Test [`EnvVar`] provider with a fixed set of variables, so @@ -440,6 +441,35 @@ fn backend_defaults_to_local_without_a_block() { assert!(matches!(config.backend, BackendConfig::Local)); } +#[test] +fn backend_block_rejects_empty_selection() { + let yaml = "storage: /var/lib/pnpr\nbackend: {}\nuplinks: {}\npackages: {}\n"; + let err = Config::from_yaml_str(yaml, Path::new("/etc/pnpr"), listen(), None) + .expect_err("an empty backend block must not fall back to local"); + assert!( + matches!(err, RegistryError::InvalidConfig { ref reason } if reason.contains("exactly one database backend")), + "expected an InvalidConfig naming the backend selection, got {err:?}", + ); +} + +#[test] +fn backend_block_rejects_unknown_only_selection() { + let yaml = "\ +storage: /var/lib/pnpr +backend: + sqlite: + url: sqlite:///var/lib/pnpr/auth.db +uplinks: {} +packages: {} +"; + let err = Config::from_yaml_str(yaml, Path::new("/etc/pnpr"), listen(), None) + .expect_err("an unknown backend key must not fall back to local"); + assert!( + matches!(err, RegistryError::InvalidConfig { ref reason } if reason.contains("exactly one database backend")), + "expected an InvalidConfig naming the backend selection, got {err:?}", + ); +} + #[test] fn libsql_backend_block_selects_the_networked_record_store() { let yaml = "\ @@ -457,7 +487,7 @@ packages: {} assert_eq!(settings.url, "libsql://db.turso.io"); assert_eq!(settings.auth_token.as_deref(), Some("tok-secret")); } - BackendConfig::Local => panic!("expected a libsql backend, got Local"), + other => panic!("expected a libsql backend, got {other:?}"), } } @@ -477,7 +507,7 @@ packages: {} assert!(settings.auth_token.is_none()); assert!(settings.replica_path.is_none(), "no replica by default"); } - BackendConfig::Local => panic!("expected a libsql backend, got Local"), + other => panic!("expected a libsql backend, got {other:?}"), } } @@ -503,7 +533,7 @@ packages: {} ); assert_eq!(settings.sync_interval_secs, Some(15)); } - BackendConfig::Local => panic!("expected a libsql backend, got Local"), + other => panic!("expected a libsql backend, got {other:?}"), } } @@ -524,10 +554,140 @@ packages: {} settings.replica_path.as_deref(), Some(Path::new("/var/lib/pnpr/auth-replica.db")), ), - BackendConfig::Local => panic!("expected a libsql backend, got Local"), + other => panic!("expected a libsql backend, got {other:?}"), } } +#[test] +fn postgres_backend_block_selects_postgres_record_store() { + let yaml = "\ +storage: /var/lib/pnpr +backend: + postgres: + url: postgres://pnpr:secret@db.example/pnpr + maxConnections: 12 + timeout: 5s + startupTimeout: 2m +uplinks: {} +packages: {} +"; + let config = Config::from_yaml_str(yaml, Path::new("/etc/pnpr"), listen(), None).unwrap(); + match config.backend { + BackendConfig::Postgres(settings) => { + assert_eq!(settings.url, "postgres://pnpr:secret@db.example/pnpr"); + assert_eq!(settings.max_connections, Some(12)); + assert_eq!(settings.timeout, Duration::from_secs(5)); + assert_eq!(settings.startup_timeout, Duration::from_mins(2)); + } + other => panic!("expected a postgres backend, got {other:?}"), + } +} + +#[test] +fn postgresql_backend_alias_selects_postgres_record_store() { + let yaml = "\ +storage: /var/lib/pnpr +backend: + postgresql: + url: postgresql://pnpr:secret@db.example/pnpr +uplinks: {} +packages: {} +"; + let config = Config::from_yaml_str(yaml, Path::new("/etc/pnpr"), listen(), None).unwrap(); + match config.backend { + BackendConfig::Postgres(settings) => { + assert_eq!(settings.url, "postgresql://pnpr:secret@db.example/pnpr"); + assert_eq!(settings.max_connections, None); + assert_eq!(settings.timeout, super::SqlBackendSettings::DEFAULT_TIMEOUT); + assert_eq!( + settings.startup_timeout, + super::SqlBackendSettings::DEFAULT_STARTUP_TIMEOUT, + ); + } + other => panic!("expected a postgres backend, got {other:?}"), + } +} + +#[test] +fn mysql_backend_block_selects_mysql_record_store() { + let yaml = "\ +storage: /var/lib/pnpr +backend: + mysql: + url: mysql://pnpr:secret@db.example/pnpr +uplinks: {} +packages: {} +"; + let config = Config::from_yaml_str(yaml, Path::new("/etc/pnpr"), listen(), None).unwrap(); + match config.backend { + BackendConfig::Mysql(settings) => { + assert_eq!(settings.url, "mysql://pnpr:secret@db.example/pnpr"); + assert_eq!(settings.max_connections, None); + assert_eq!(settings.timeout, super::SqlBackendSettings::DEFAULT_TIMEOUT); + assert_eq!( + settings.startup_timeout, + super::SqlBackendSettings::DEFAULT_STARTUP_TIMEOUT, + ); + } + other => panic!("expected a mysql backend, got {other:?}"), + } +} + +#[test] +fn sql_backend_rejects_zero_startup_timeout() { + let yaml = "\ +storage: /var/lib/pnpr +backend: + mysql: + url: mysql://pnpr:secret@db.example/pnpr + startupTimeout: 0 +uplinks: {} +packages: {} +"; + let err = Config::from_yaml_str(yaml, Path::new("/etc/pnpr"), listen(), None) + .expect_err("zero startup timeout must not be accepted"); + assert!( + matches!(err, RegistryError::InvalidConfig { ref reason } if reason.contains("backend.mysql.startupTimeout")), + "expected an InvalidConfig naming backend.mysql.startupTimeout, got {err:?}", + ); +} + +#[test] +fn sql_backend_rejects_zero_timeout() { + let yaml = "\ +storage: /var/lib/pnpr +backend: + postgres: + url: postgres://pnpr:secret@db.example/pnpr + timeout: 0 +uplinks: {} +packages: {} +"; + let err = Config::from_yaml_str(yaml, Path::new("/etc/pnpr"), listen(), None) + .expect_err("zero timeout must not be accepted"); + assert!( + matches!(err, RegistryError::InvalidConfig { ref reason } if reason.contains("backend.postgres.timeout")), + "expected an InvalidConfig naming backend.postgres.timeout, got {err:?}", + ); +} + +#[test] +fn backend_block_rejects_multiple_database_backends() { + let yaml = "\ +storage: /var/lib/pnpr +backend: + libsql: + url: libsql://db.turso.io + postgres: + url: postgres://pnpr:secret@db.example/pnpr +uplinks: {} +packages: {} +"; + let err = Config::from_yaml_str(yaml, Path::new("/etc/pnpr"), listen(), None) + .expect_err("a backend block must not select two databases"); + assert!(matches!(err, RegistryError::InvalidConfig { .. })); +} + #[test] fn from_yaml_str_ignores_unknown_sections() { // Sections we don't implement (`auth`, `web`, `plugins`, etc.) diff --git a/pnpr/crates/pnpr/src/error.rs b/pnpr/crates/pnpr/src/error.rs index 6db680bcd2..69be436d11 100644 --- a/pnpr/crates/pnpr/src/error.rs +++ b/pnpr/crates/pnpr/src/error.rs @@ -113,6 +113,13 @@ pub enum RegistryError { #[from(skip)] TooManyUsers { max: u64 }, + #[display("Internal error: {reason}")] + #[from(skip)] + Internal { + #[error(not(source))] + reason: String, + }, + /// The htpasswd file on disk couldn't be parsed at startup. /// Surfaced as a startup-time error rather than a silent empty /// store so a corrupted file can't quietly lock every existing @@ -134,9 +141,20 @@ pub enum RegistryError { Sqlite(rusqlite::Error), /// Networked-SQLite (libsql / Turso) auth backend failure. + #[cfg(feature = "backend-libsql")] #[display("Auth database error: {_0}")] Libsql(libsql::Error), + /// SQL auth backend failure. + #[cfg(any(feature = "backend-postgres", feature = "backend-mysql"))] + #[display("Auth database error: {_0}")] + Sqlx(sqlx::Error), + + /// SQL auth backend operation timed out. + #[cfg(any(feature = "backend-postgres", feature = "backend-mysql"))] + #[display("Auth database timeout")] + AuthDatabaseTimeout, + /// A blocking task spawned for bcrypt or `SQLite` work panicked /// or was cancelled. Treat as an internal server error. #[display("Background task failed: {_0}")] @@ -155,6 +173,53 @@ pub enum RegistryError { } impl RegistryError { + #[must_use] + pub fn log_kind(&self) -> &'static str { + match self { + RegistryError::Upstream { .. } => "upstream", + RegistryError::UpstreamStatus { .. } => "upstream_status", + RegistryError::UpstreamUnavailable { .. } => "upstream_unavailable", + RegistryError::InvalidPackageName { .. } => "invalid_package_name", + RegistryError::InvalidTarballName { .. } => "invalid_tarball_name", + RegistryError::InvalidPolicyPattern { .. } => "invalid_policy_pattern", + RegistryError::InvalidConfig { .. } => "invalid_config", + RegistryError::Unauthenticated { .. } => "unauthenticated", + RegistryError::Forbidden { .. } => "forbidden", + RegistryError::InvalidAttachment { .. } => "invalid_attachment", + RegistryError::BadRequest { .. } => "bad_request", + RegistryError::RegistrationDisabled => "registration_disabled", + RegistryError::TooManyUsers { .. } => "too_many_users", + RegistryError::Internal { .. } => "internal", + RegistryError::InvalidHtpasswdFile { .. } => "invalid_htpasswd_file", + RegistryError::Bcrypt(_) => "bcrypt", + RegistryError::Sqlite(_) => "sqlite", + #[cfg(feature = "backend-libsql")] + RegistryError::Libsql(_) => "libsql", + #[cfg(any(feature = "backend-postgres", feature = "backend-mysql"))] + RegistryError::Sqlx(_) => "sqlx", + #[cfg(any(feature = "backend-postgres", feature = "backend-mysql"))] + RegistryError::AuthDatabaseTimeout => "auth_database_timeout", + RegistryError::JoinError(_) => "join_error", + RegistryError::Io(_) => "io", + RegistryError::ObjectStore(_) => "object_store", + RegistryError::Json(_) => "json", + } + } + + #[must_use] + pub fn log_message(&self) -> String { + redact_url_credentials(&self.to_string()) + } + + #[must_use] + pub fn public_message(&self) -> String { + let status = self.status_code(); + if status.is_server_error() { + return status.canonical_reason().unwrap_or("Internal Server Error").to_string(); + } + self.to_string() + } + /// Map the error to the HTTP status the proxy should return to the /// client. Follows the standard gateway semantics: /// @@ -194,11 +259,17 @@ impl RegistryError { RegistryError::RegistrationDisabled | RegistryError::TooManyUsers { .. } => { StatusCode::FORBIDDEN } - RegistryError::InvalidHtpasswdFile { .. } + RegistryError::Internal { .. } + | RegistryError::InvalidHtpasswdFile { .. } | RegistryError::Bcrypt(_) | RegistryError::Sqlite(_) - | RegistryError::Libsql(_) | RegistryError::JoinError(_) => StatusCode::INTERNAL_SERVER_ERROR, + #[cfg(feature = "backend-libsql")] + RegistryError::Libsql(_) => StatusCode::INTERNAL_SERVER_ERROR, + #[cfg(any(feature = "backend-postgres", feature = "backend-mysql"))] + RegistryError::Sqlx(_) => StatusCode::INTERNAL_SERVER_ERROR, + #[cfg(any(feature = "backend-postgres", feature = "backend-mysql"))] + RegistryError::AuthDatabaseTimeout => StatusCode::GATEWAY_TIMEOUT, RegistryError::Io(_) | RegistryError::ObjectStore(_) | RegistryError::Json(_) => { StatusCode::BAD_GATEWAY } @@ -206,6 +277,207 @@ impl RegistryError { } } +fn redact_url_credentials(message: &str) -> String { + let mut redacted = String::with_capacity(message.len()); + let mut cursor = 0; + while let Some(relative_scheme_end) = message[cursor..].find("://") { + let scheme_end = cursor + relative_scheme_end; + let scheme_start = find_scheme_start(message, scheme_end); + if scheme_start == scheme_end || !is_valid_scheme(&message[scheme_start..scheme_end]) { + redacted.push_str(&message[cursor..scheme_end + 3]); + cursor = scheme_end + 3; + continue; + } + + let url_end = find_url_end(message, scheme_end + 3); + let (candidate, suffix) = split_trailing_punctuation(&message[scheme_start..url_end]); + let Some(safe_url) = redact_url_candidate(candidate) else { + redacted.push_str(&message[cursor..url_end]); + cursor = url_end; + continue; + }; + + redacted.push_str(&message[cursor..scheme_start]); + redacted.push_str(&safe_url); + redacted.push_str(suffix); + cursor = url_end; + } + redacted.push_str(&message[cursor..]); + redacted +} + +fn find_scheme_start(message: &str, scheme_end: usize) -> usize { + let bytes = message.as_bytes(); + let mut start = scheme_end; + while start > 0 { + let byte = bytes[start - 1]; + if !byte.is_ascii_alphanumeric() && byte != b'+' && byte != b'.' && byte != b'-' { + break; + } + start -= 1; + } + start +} + +fn is_valid_scheme(scheme: &str) -> bool { + let mut chars = scheme.bytes(); + let Some(first) = chars.next() else { + return false; + }; + first.is_ascii_alphabetic() + && chars.all(|byte| { + byte.is_ascii_alphanumeric() || byte == b'+' || byte == b'.' || byte == b'-' + }) +} + +fn find_url_end(message: &str, url_start: usize) -> usize { + message[url_start..] + .char_indices() + .find_map(|(offset, ch)| is_url_delimiter(ch).then_some(url_start + offset)) + .unwrap_or(message.len()) +} + +fn is_url_delimiter(ch: char) -> bool { + ch.is_whitespace() || matches!(ch, '"' | '\'' | '`' | '<' | '>' | '(' | ')' | '{' | '}') +} + +fn split_trailing_punctuation(candidate: &str) -> (&str, &str) { + let mut end = candidate.len(); + while let Some(ch) = candidate[..end].chars().next_back() { + if !matches!(ch, '.' | ',' | ';' | '!') { + break; + } + end -= ch.len_utf8(); + } + (&candidate[..end], &candidate[end..]) +} + +fn redact_url_candidate(candidate: &str) -> Option { + redact_parseable_url_candidate(candidate).or_else(|| redact_unparsable_url_candidate(candidate)) +} + +fn redact_parseable_url_candidate(candidate: &str) -> Option { + let mut url = url::Url::parse(candidate).ok()?; + let mut changed = false; + if !url.username().is_empty() || url.password().is_some() { + if url.set_username("redacted").is_ok() { + changed = true; + } + if url.set_password(None).is_ok() { + changed = true; + } + } + + if url.query().is_some() { + let pairs = url + .query_pairs() + .map(|(key, value)| { + if is_sensitive_query_key(&key) { + changed = true; + (key.into_owned(), "redacted".to_string()) + } else { + (key.into_owned(), value.into_owned()) + } + }) + .collect::>(); + if changed { + url.query_pairs_mut() + .clear() + .extend_pairs(pairs.iter().map(|(key, value)| (&**key, &**value))); + } + } + + if url.fragment().is_some() { + url.set_fragment(None); + changed = true; + } + + changed.then(|| url.to_string()) +} + +fn redact_unparsable_url_candidate(candidate: &str) -> Option { + let mut redacted = candidate.to_string(); + let mut changed = false; + if let Some(safe_url) = redact_unparsable_url_userinfo(&redacted) { + redacted = safe_url; + changed = true; + } + if let Some(safe_url) = redact_sensitive_query_values(&redacted) { + redacted = safe_url; + changed = true; + } + if let Some(safe_url) = redact_fragment(&redacted) { + redacted = safe_url; + changed = true; + } + changed.then_some(redacted) +} + +fn redact_unparsable_url_userinfo(candidate: &str) -> Option { + let authority_start = candidate.find("://")? + 3; + let scan_end = candidate[authority_start..] + .find('?') + .map_or(candidate.len(), |offset| authority_start + offset); + let userinfo_end = candidate[authority_start..scan_end].rfind('@')? + authority_start; + let mut redacted = String::with_capacity(candidate.len()); + redacted.push_str(&candidate[..authority_start]); + redacted.push_str("redacted@"); + redacted.push_str(&candidate[userinfo_end + 1..]); + Some(redacted) +} + +fn redact_sensitive_query_values(candidate: &str) -> Option { + let query_start = candidate.find('?')?; + let fragment_start = candidate[query_start + 1..] + .find('#') + .map_or(candidate.len(), |offset| query_start + 1 + offset); + let query = &candidate[query_start + 1..fragment_start]; + let mut redacted = String::with_capacity(candidate.len()); + redacted.push_str(&candidate[..=query_start]); + let mut changed = false; + for segment in query.split_inclusive('&') { + let (pair, separator) = segment.strip_suffix('&').map_or((segment, ""), |pair| (pair, "&")); + if let Some(value_start) = pair.find('=') { + let key = &pair[..value_start]; + if is_sensitive_query_key(key) { + redacted.push_str(key); + redacted.push_str("=redacted"); + redacted.push_str(separator); + changed = true; + continue; + } + } + redacted.push_str(segment); + } + redacted.push_str(&candidate[fragment_start..]); + changed.then_some(redacted) +} + +fn redact_fragment(candidate: &str) -> Option { + let fragment_start = candidate.find('#')?; + Some(candidate[..fragment_start].to_string()) +} + +fn is_sensitive_query_key(key: &str) -> bool { + let normalized = key + .chars() + .filter(|ch| *ch != '-' && *ch != '_') + .map(|ch| ch.to_ascii_lowercase()) + .collect::(); + matches!( + normalized.as_str(), + "auth" + | "authtoken" + | "password" + | "passwd" + | "pwd" + | "secret" + | "token" + | "accesstoken" + | "apikey", + ) +} + pub type Result = std::result::Result; #[cfg(test)] diff --git a/pnpr/crates/pnpr/src/error/tests.rs b/pnpr/crates/pnpr/src/error/tests.rs index d152e2592c..3fb97b2d79 100644 --- a/pnpr/crates/pnpr/src/error/tests.rs +++ b/pnpr/crates/pnpr/src/error/tests.rs @@ -40,3 +40,115 @@ fn object_store_error_maps_to_bad_gateway() { }); assert_eq!(err.status_code(), StatusCode::BAD_GATEWAY); } + +#[test] +fn public_message_hides_server_error_details() { + let err = RegistryError::ObjectStore(object_store::Error::Generic { + store: "test", + source: "internal-hostname".into(), + }); + assert_eq!(err.status_code(), StatusCode::BAD_GATEWAY); + assert_eq!(err.public_message(), "Bad Gateway"); + assert!(err.to_string().contains("internal-hostname")); +} + +#[test] +fn log_message_keeps_non_secret_server_error_details() { + let err = + RegistryError::Internal { reason: "auth database COUNT(*) returned no rows".to_string() }; + assert_eq!(err.public_message(), "Internal Server Error"); + assert_eq!(err.log_message(), "Internal error: auth database COUNT(*) returned no rows"); +} + +#[test] +fn log_message_redacts_embedded_database_url_credentials() { + let err = RegistryError::Internal { + reason: "connection failed for postgres://admin:secret@db.example/pnpr?sslmode=require and libsql://edge.example/pnpr?authToken=token-value".to_string(), + }; + + let message = err.log_message(); + + assert!(message.contains("connection failed")); + assert!(message.contains("db.example")); + assert!(message.contains("edge.example")); + assert!(message.contains("sslmode=require")); + assert!(message.contains("postgres://redacted@db.example/pnpr?sslmode=require")); + assert!(message.contains("authToken=redacted")); + assert!(!message.contains("admin")); + assert!(!message.contains("secret")); + assert!(!message.contains("token-value")); +} + +#[test] +fn log_message_redacts_ipv6_database_url_credentials() { + let err = RegistryError::Internal { + reason: "connection failed for postgres://admin:secret@[::1]/pnpr?sslmode=require" + .to_string(), + }; + + let message = err.log_message(); + + assert!(message.contains("postgres://redacted@[::1]/pnpr?sslmode=require")); + assert!(!message.contains("admin")); + assert!(!message.contains("secret")); +} + +#[test] +fn log_message_redacts_malformed_database_url_credentials() { + let err = RegistryError::Internal { + reason: + "connection failed for postgres://admin:sec#ret@db.example/pnpr?password=query-secret" + .to_string(), + }; + + let message = err.log_message(); + + assert!(message.contains("postgres://redacted@db.example/pnpr?password=redacted")); + assert!(!message.contains("admin")); + assert!(!message.contains("sec#ret")); + assert!(!message.contains("query-secret")); +} + +#[test] +fn log_message_redacts_malformed_database_url_credentials_with_slash() { + let err = RegistryError::Internal { + reason: "connection failed for postgres://admin:pa/ss@db.example/pnpr".to_string(), + }; + + let message = err.log_message(); + + assert!(message.contains("postgres://redacted@db.example/pnpr")); + assert!(!message.contains("admin")); + assert!(!message.contains("pa/ss")); +} + +#[test] +fn log_message_redacts_database_url_fragment_secrets() { + let err = RegistryError::Internal { + reason: "connection failed for postgres://db.example/pnpr#password=fragment-secret" + .to_string(), + }; + + let message = err.log_message(); + + assert!(message.contains("postgres://db.example/pnpr")); + assert!(!message.contains("fragment-secret")); + assert!(!message.contains("#password")); +} + +#[test] +fn log_message_redacts_malformed_database_url_fragment_secrets() { + let err = RegistryError::Internal { + reason: + "connection failed for postgres://admin:pa/ss@db.example/pnpr#password=fragment-secret" + .to_string(), + }; + + let message = err.log_message(); + + assert!(message.contains("postgres://redacted@db.example/pnpr")); + assert!(!message.contains("admin")); + assert!(!message.contains("pa/ss")); + assert!(!message.contains("fragment-secret")); + assert!(!message.contains("#password")); +} diff --git a/pnpr/crates/pnpr/src/lib.rs b/pnpr/crates/pnpr/src/lib.rs index fa4de972b7..dca515431d 100644 --- a/pnpr/crates/pnpr/src/lib.rs +++ b/pnpr/crates/pnpr/src/lib.rs @@ -29,7 +29,7 @@ pub use auth::{ pub use config::{ AuthConfig, BackendConfig, Config, ConfigSource, DEFAULT_CONFIG_YAML, HostedStoreConfig, HtpasswdConfig, LibsqlSettings, LogConfig, LogFormat, LogLevel, MaxUsers, OsvConfig, - PackageAccess, TokensConfig, UplinkConfig, default_cache_dir, + PackageAccess, SqlBackendSettings, TokensConfig, UplinkConfig, default_cache_dir, }; pub use error::{RegistryError, Result}; pub use journal::recover_publish_journal; diff --git a/pnpr/crates/pnpr/src/main.rs b/pnpr/crates/pnpr/src/main.rs index 620ecb0323..61fa8e0cea 100644 --- a/pnpr/crates/pnpr/src/main.rs +++ b/pnpr/crates/pnpr/src/main.rs @@ -1,5 +1,5 @@ use clap::Parser; -use pnpr::{Config, ConfigSource, LogConfig, LogFormat, default_cache_dir, serve}; +use pnpr::{Config, ConfigSource, LogConfig, LogFormat, RegistryError, default_cache_dir, serve}; use std::{net::SocketAddr, path::PathBuf, time::Duration}; use tracing_subscriber::EnvFilter; @@ -97,7 +97,12 @@ async fn main() -> miette::Result<()> { } init_logging(&config.logs); log_config_source(&source); - serve(config).await.map_err(|err| miette::miette!("{err}")) + serve(config).await.map_err(|err| redacted_report(&err)) +} + +fn redacted_report(err: &RegistryError) -> miette::Report { + let message = err.log_message(); + miette::miette!("{message}") } /// Install the `tracing-subscriber` for this process based on the @@ -136,3 +141,6 @@ fn log_config_source(source: &ConfigSource) { ConfigSource::Bundled => tracing::info!("loaded bundled default config"), } } + +#[cfg(test)] +mod tests; diff --git a/pnpr/crates/pnpr/src/server.rs b/pnpr/crates/pnpr/src/server.rs index cbcc0c6ec2..b0c6bd82b3 100644 --- a/pnpr/crates/pnpr/src/server.rs +++ b/pnpr/crates/pnpr/src/server.rs @@ -741,19 +741,20 @@ async fn add_user(state: &AppState, name: &str, body: &[u8]) -> Response { }); }; - let outcome = match state.inner.auth.users.add_or_login(name, password).await { + let (outcome, username) = match state.inner.auth.users.add_or_login(name, password).await { Ok(o) => o, Err(err) => return error_response(&err), }; - let token = match state.inner.auth.tokens.issue(name).await { + let token = match state.inner.auth.tokens.issue(&username).await { Ok(t) => t, Err(err) => return error_response(&err), }; let ok_msg = match outcome { - UpsertOutcome::Created => format!("user '{name}' created"), - UpsertOutcome::LoggedIn => format!("you are authenticated as '{name}'"), + UpsertOutcome::Created => format!("user '{username}' created"), + UpsertOutcome::LoggedIn => format!("you are authenticated as '{username}'"), }; - let body = json!({ "ok": ok_msg, "token": token, "id": format!("org.couchdb.user:{name}") }); + let body = + json!({ "ok": ok_msg, "token": token, "id": format!("org.couchdb.user:{username}") }); let bytes = serde_json::to_vec(&body).expect("static-shape JSON serializes"); Response::builder() .status(StatusCode::CREATED) @@ -878,8 +879,8 @@ async fn logout(state: &AppState, headers: &HeaderMap, raw_token: &str) -> Respo fn token_response_object(key: &str, record: &crate::auth::TokenRecord) -> Value { let preview: String = key.chars().take(6).collect(); - let created = iso_from_unix_millis((record.created_at as i64) * 1000); - let updated = iso_from_unix_millis((record.last_used_at as i64) * 1000); + let created = token_timestamp_iso(record.created_at); + let updated = token_timestamp_iso(record.last_used_at); json!({ "key": key, "token": preview, @@ -891,6 +892,19 @@ fn token_response_object(key: &str, record: &crate::auth::TokenRecord) -> Value }) } +fn token_timestamp_iso(seconds: u64) -> String { + iso_from_unix_millis(token_timestamp_millis(seconds)) +} + +fn token_timestamp_millis(seconds: u64) -> i64 { + const MILLIS_PER_SECOND: u64 = 1000; + let max_seconds = i64::MAX as u64 / MILLIS_PER_SECOND; + (seconds.min(max_seconds) * MILLIS_PER_SECOND) as i64 +} + +#[cfg(test)] +mod tests; + async fn caller_username( state: &AppState, headers: &HeaderMap, @@ -2008,8 +2022,16 @@ fn not_found() -> Response { fn error_response(err: &RegistryError) -> Response { let status = err.status_code(); - tracing::error!(%err, %status, "request failed"); - (status, err.to_string()).into_response() + let error_kind = err.log_kind(); + if status.is_server_error() { + let err = err.log_message(); + tracing::error!(%err, %error_kind, %status, "request failed"); + } else if status == StatusCode::UNAUTHORIZED || status == StatusCode::FORBIDDEN { + tracing::debug!(%err, %error_kind, %status, "request failed"); + } else { + tracing::warn!(%err, %error_kind, %status, "request failed"); + } + (status, err.public_message()).into_response() } async fn serve_ping(State(_state): State) -> Response { diff --git a/pnpr/crates/pnpr/src/server/tests.rs b/pnpr/crates/pnpr/src/server/tests.rs new file mode 100644 index 0000000000..a0e4b33304 --- /dev/null +++ b/pnpr/crates/pnpr/src/server/tests.rs @@ -0,0 +1,7 @@ +use super::token_timestamp_millis; + +#[test] +fn token_timestamp_millis_saturates_before_i64_conversion() { + assert_eq!(token_timestamp_millis(42), 42_000); + assert_eq!(token_timestamp_millis(u64::MAX), i64::MAX / 1000 * 1000); +} diff --git a/pnpr/crates/pnpr/src/tests.rs b/pnpr/crates/pnpr/src/tests.rs new file mode 100644 index 0000000000..bb5aceb25f --- /dev/null +++ b/pnpr/crates/pnpr/src/tests.rs @@ -0,0 +1,13 @@ +use super::*; + +#[test] +fn startup_error_report_redacts_dsn_credentials() { + let err = RegistryError::Internal { + reason: "startup failed for postgres://admin:secret@[::1]/pnpr?sslmode=require".to_string(), + }; + let report = redacted_report(&err).to_string(); + + assert!(report.contains("postgres://redacted@[::1]/pnpr?sslmode=require")); + assert!(!report.contains("admin")); + assert!(!report.contains("secret")); +} diff --git a/pnpr/crates/pnpr/tests/auth_user_endpoints.rs b/pnpr/crates/pnpr/tests/auth_user_endpoints.rs index caccc99c84..51192ee14c 100644 --- a/pnpr/crates/pnpr/tests/auth_user_endpoints.rs +++ b/pnpr/crates/pnpr/tests/auth_user_endpoints.rs @@ -9,13 +9,15 @@ use axum::{ http::{Request, StatusCode}, }; use pnpr::{ - AuthConfig, AuthState, Config, HtpasswdConfig, MaxUsers, TokensConfig, router, router_with_auth, + AuthConfig, AuthState, Config, HtpasswdConfig, MaxUsers, TokenStore, TokensConfig, + UpsertOutcome, UserBackend, router, router_with_auth, }; use serde_json::{Value, json}; use std::{ fmt::Write as _, net::{Ipv4Addr, SocketAddr, SocketAddrV4}, path::PathBuf, + sync::Arc, }; use tempfile::TempDir; use tower::ServiceExt; @@ -98,6 +100,24 @@ async fn add_user_and_get_token( (app, token) } +struct CanonicalUserBackend; + +#[async_trait::async_trait] +impl UserBackend for CanonicalUserBackend { + async fn add_or_login( + &self, + username: &str, + _password: &str, + ) -> pnpr::Result<(UpsertOutcome, String)> { + assert_eq!(username, "alice"); + Ok((UpsertOutcome::LoggedIn, "Alice".to_string())) + } + + async fn verify(&self, _username: &str, _password: &str) -> pnpr::Result> { + Ok(None) + } +} + #[tokio::test] async fn whoami_returns_username_for_authenticated_caller() { let tmp = TempDir::new().unwrap(); @@ -110,6 +130,32 @@ async fn whoami_returns_username_for_authenticated_caller() { assert_eq!(payload["username"].as_str(), Some("alice")); } +#[tokio::test] +async fn adduser_issues_token_for_canonical_username() { + let tmp = TempDir::new().unwrap(); + let auth = AuthState { + users: Arc::new(CanonicalUserBackend), + tokens: Arc::new(TokenStore::in_memory()), + }; + let app = router_with_auth(static_config(tmp.path().to_path_buf()), auth); + + let response = app + .clone() + .oneshot(put_json("/-/user/org.couchdb.user:alice", adduser_body("alice", "secret"))) + .await + .unwrap(); + assert_eq!(response.status(), StatusCode::CREATED); + let payload = body_json(response.into_body()).await; + assert_eq!(payload["id"].as_str(), Some("org.couchdb.user:Alice")); + assert_eq!(payload["ok"].as_str(), Some("you are authenticated as 'Alice'")); + let token = payload["token"].as_str().expect("token in response"); + + let response = app.oneshot(get_with_bearer("/-/whoami", token)).await.unwrap(); + assert_eq!(response.status(), StatusCode::OK); + let payload = body_json(response.into_body()).await; + assert_eq!(payload["username"].as_str(), Some("Alice")); +} + #[tokio::test] async fn whoami_returns_401_when_unauthenticated() { let tmp = TempDir::new().unwrap(); diff --git a/pnpr/npm/pnpr/README.md b/pnpr/npm/pnpr/README.md index f49a13460a..8b48fb2e38 100644 --- a/pnpr/npm/pnpr/README.md +++ b/pnpr/npm/pnpr/README.md @@ -92,8 +92,8 @@ pnpr -c ./pnpr.yaml By default both are local directories. Adding an `s3:` block moves the **hosted** store into an S3-compatible object store, so the durable data is replicated by the provider and can be shared by several stateless -`pnpr` replicas. The cache and the resolver databases always -stay on local disk — only the hosted store is pluggable. +`pnpr` replicas. The cache stays on local disk — only the hosted +package store is pluggable here. Because any S3-compatible endpoint works, this also covers **Cloudflare R2**, **MinIO**, **Backblaze B2**, **Wasabi**, etc. — point `endpoint` @@ -182,21 +182,30 @@ s3: secretAccessKey: minioadmin ``` -### Storing users and tokens in a networked SQLite database +### Storing users and tokens in a shared SQL database Auth state — the registered users and their bearer tokens — is the other piece of per-instance disk state. By default users live in an htpasswd file and tokens in a local SQLite database (see `auth:` above), so two `pnpr` replicas don't see each other's accounts. Adding a -`backend:` block moves both into one **networked SQLite** database -(libsql / [Turso](https://turso.tech)), so several stateless replicas -share a consistent set of logins and tokens — the auth half of running -`pnpr` horizontally scaled. +`backend:` block moves both into one shared SQL database, so several +stateless replicas share a consistent set of logins and tokens — the +auth half of running `pnpr` horizontally scaled. -The schema is the same SQLite the local backend uses (the `tokens` table -is identical; users move from the htpasswd file into a `users` table), so -a database can be migrated between the two. Token lookups happen on the -request hot path, so the database should be low-latency from the server. +The same auth traits drive every backend, and the SQL schema sticks to +common column types so records can be moved between supported drivers. +Only one backend may be selected in a config file. + +Database drivers are Cargo-feature gated: + +| Backend | Config key | Cargo feature | +| --- | --- | --- | +| libsql / Turso | `backend.libsql` | `backend-libsql` (enabled by default) | +| PostgreSQL | `backend.postgres` or `backend.postgresql` | `backend-postgres` | +| MySQL-compatible | `backend.mysql` | `backend-mysql` | + +Token lookups happen on the request hot path, so the database should be +low-latency from the server. ```yaml storage: ./storage @@ -235,6 +244,27 @@ replica's writes (a token issued or revoked elsewhere) only after the next background sync, so lower `syncIntervalSecs` means less revocation lag. Omit `replicaPath` to always read the primary directly. +PostgreSQL: + +```yaml +backend: + postgres: + url: ${PNPR_POSTGRES_URL} + maxConnections: 16 +``` + +MySQL: + +```yaml +backend: + mysql: + url: ${PNPR_MYSQL_URL} + maxConnections: 16 +``` + +For PostgreSQL or MySQL support, build pnpr with the matching Cargo +feature, for example `cargo build -p pnpr --features backend-postgres`. + When the `backend:` block is absent, auth stays on local disk and the `auth.htpasswd` / `auth.tokens` settings apply as before. The `auth.htpasswd.max_users` registration cap is honored either way.