From 10423e3f7fca99cbc56fad0aa318cccafc6ffb3f Mon Sep 17 00:00:00 2001 From: "Ericson \"Fogo\" Soares" Date: Fri, 15 Nov 2024 04:40:43 -0300 Subject: [PATCH] Update quic-rpc crate (#2805) * Fix breaking changes on quic-rpc * Rust fmt --- Cargo.lock | Bin 331432 -> 334355 bytes Cargo.toml | 2 +- core/crates/cloud-services/Cargo.toml | 14 ++--- core/crates/cloud-services/src/client.rs | 10 ++-- core/crates/cloud-services/src/error.rs | 48 +++++++++++++++--- core/crates/cloud-services/src/lib.rs | 2 +- .../src/p2p/new_sync_messages_notifier.rs | 8 +-- core/crates/cloud-services/src/p2p/runner.rs | 14 ++--- .../crates/cloud-services/src/sync/receive.rs | 4 +- core/crates/cloud-services/src/sync/send.rs | 4 +- core/src/api/cloud/devices.rs | 6 +-- core/src/api/cloud/mod.rs | 12 ++--- 12 files changed, 75 insertions(+), 49 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index f6cc8276095ef5c61542837a45237d1a93b1ddb4..d8ca0858bb42785f513a9fd4d0ce1067c7cab3be 100644 GIT binary patch delta 5187 zcmaJ_dw5mVmH+nH``(+7gb*INd6OWd6$+mBKBtZZ)X_@SA?RqaWA!{Gk^~5e;-f+s zZ7tX|^i+1ssK7V%?dOoW=vI)Ig9r>3;(YokGuDW09PtIFz~`f(>)e}rPT-IJbHBaT zS$nO$*WGRpMZaYQCFJ7;blExhWYZKc_-)tMWb4i{&aHJMAV{%%#?QqaG$ zI=ID4WY+k{$Y^Hf=ZmkJ2$ zX8)0e;Wg3!)?{YCvp2kAxSafde=djRp5xV-`k`dFHC$f$mw}OGF#Gf$N0`Q$Pa6R)ftZ5A8@r(%-hCrhU0({z71Vh(9zjuEnVt$u(qve3Y)ea^{`jBqnq*$<~6UN7R3H#2$izkx1q~sE1GRl z&C(o=I=(NPrYpPDu_V{^JW*48T~`#-m8hoME;T(zSE%h+u4Y-fB#D9aaGKTc7sfDs zC+f%>$wra=2S*9EbO-z#+lk6!lTDuN7`ka&nyed&tGJ?09Ye8f)e%KY_FW62X!(+2 zORA!}sv{|y?5nb&Q=lv@SodC-?b)6M8AMeJ3@W1TNK_Xc)o>t2qUC5F+gu9AUo{T@m@nFc zPDI(xh6hn2yYCv4%B2`*N;$5UYs?YKn;A(XJ}Vku-(N% z1*;iCGx((^&{GkgAO8~VjdkUvzQ-=$xQzYyB>E-qJ%x_p2_xB?OX&{FX=%SLIN8_8 z<_w_}fA%Z%Y@~$W@D17=ANI^y^q5@)__avE2)VZ}VGN z_>IG!LX;e-speKZF&J>JnXOtn)PW{DqG>{9>QKV6MMX^x(^zv1pGuzZ+OFk7E=z`? zDw-dnj{!MmnQYyA7z(9XC}kohDfVU(PYMRVn`U({LzUjN4cY7;ekhEs>ga6uI$K-X z7ei`G$}A}xN5Cc5V+~y>YJLo=xRTv!;$pUC98R+7JJ9%G@4Phs=z8JIq_C09t77jj z77~2;cHuebc&xcosN)Bg376L8h-&_wUZE|*@oZeSG2xr?yf63*&kL2%_*lbpLJj|9 zi?F@~i2iM$X46h#VHo=E-wV~O?p0w7pS)LiG)g7b+=V9a!~2D!k<9mkh>ozqcs8Ug;;80sMJXeuS*HkRur>>>ip6iGJ5sK{q;>)c6 zMby9syYK`ytl&b{pTy;P*A@glb%pHA%V_-cHm|dz#aW(pXlsk@WEbG(BwM0tN?TyrYif6K{Zp>eQHC~lAUZ20l^JhSE0v<6iP<- zY*CXuMHfAf>M|>P6;Ku`q?qka;%;``9k_yDox-n0vj{IM!cs)#CWVf%xHdWw2kaxOSL3Tfo~HR(8NY?u*_8-xAUTe!*qY{137WHHnTDZ3L$Y+= zwrv^OziUgB>Z&Z-qAq*TR5a+_swDcib%F zHl0cUb%sV&Pm(>;7d^`m!AUSWz_8+Ik^!@eY|E;pLTn{!1_Mw)pVg8g-hDmpNw8@@ z!lgMsTdG4{M~0cogG)-((;d~bY(|`pfZ{J%i1#LEI$lTTat6IaVn^mN zzBqxxQiUomWj!spf<4fV7l%WD52!9v=Yn`iVNQc|7O5+_8X%M50QR`LV(V~9R(2Nv zySW6vn5>uK38aMRmwtFVF65<4@rD=_rF(D#+ju7~MW6+xC8e@NfOF% zkSQD$5jZ3p5TRwXcHs;8&|Zml4{;9ODu*uA(Q637-aKX4y@ERy24t-`(M2vjrJ zlj4>CiXVxjLU+}&H}A)*qqAj!ZOH@JiiM8+3?GWWhplt>&v9R@eAycOQv5x<2v}-h z18Xs1Eo}ihn;8T$h++1aFu?Du}9_W8XZ&J6J{<1}MZWIFMeP0q_-Mo~dkxp{yANU( zO)(sNIwgGG^}UB{*MESE`OP0<9v%xAHC!6RkH^`%=Rqv-)NyQ(pzqE3O)?DNs^cjt zJRKBEcO6$ZT-Sh?g(L!~K$U8$r^}8h!eFoXimAe5!m~Wnv}{X}VTLYeB0*Jr_;dVv zc=`TQxR!r7ginW^qW?Rjnx8t0tE2gfk4uo%@S0`aRb&zyC?Qc_oCFH)*$;cP`=CY2n^*Wp151FKVN9+ZDzK~Ao^n~dWR-a{(Vkh7bAP3B+{PHL=Y9ZB8Vi`};r%Ym`ghTcbAPlBcf??*$GO#>)+ z3CObPDuxI1tE9vDYQZiEUhxWmNVfnF%DITJx*KsLKm9nlEG(_i9J}@@Qp}Ex6(+E= zPml#{-_ztLdAG4mpOP#1?az=aq9gJx&yzpIuG)M9s%N{NgA+gAM;30$YAHEpL;9%{yWPO$l()%rD3_@e~#Qwi zn+w*jF4)*y5}dfZEH$sBFAgXENQMTFRep@5)WWJH3I)OOq4CtFAu}>AFy_da%Tr#6%tTciiSe#aNNlDl%V8*Xl-{5%4B3T`l1XTmrxl?UW3k&G^y94BuR zP$4W@h^DcL^HCXM-U74}!3T3l3H0`nB>Z?M%7kqTP%-05)=wmCz}=`l@1iC~yaHi9+p;T&jW=wKLot*nS# z-7pl*cU05WG|lvF9-CsQwjv9XsCteg+P>^aw(0tcZ&;=z>w@pex+g+sC$HX=Op?p~ z{tB9i>0Ps?dx5};pt27Qhnd^?rCzoj{UJ;$2ICf!Y`9?>>IdU?plezC4s;uW!qKF$ zR`Ffc6fD`aM8)wPQ8PT>Qe4ASTu&8b9+#>qqNnjlT}!bHL6>DgG zJO-G?6BuV<-2xb`eUz6pRPLq}@n?CA(1)*6c=8fqYFG>Rdq+OY^x`j_4S+BKo>3nUe0>rl{+h>$tX~2I%RcpzcsMTk{&a zA`1rpiPS*9zoK(c?&3-&b)oB{#)tLY=xm4p?A0C=9~4rMhj0cvc?#{qY||NZ62q@2 z;1R5=4?Tr|FHB^czC=qXGrvP0B3Ata>ZDt`@XJ5B=xGl-FkXS!REn)w)T5Exzhz+$ zvkAFa!fauGoXUspK2pP)EAeY3Va10++bYrzUb>zp*{zr3W(0Q(rlmpqs3iM#EItyC zLhZW}&x=LWgV*50Sy9jx4LA$lYQhE3vJE%FnMS;UbvEJg2>9=4kebi{=v~VtcU_NH z!_r}-fK9p)pD2t%>zIitn=~807@-`?y9-}k5%pzjfM;i@;j~wlt!E zTMUE*5Xv%7?$T+&V8r9M>7_9jRH zx!8fMuz-}KusI|)vydD~u*o&#j_l~*+f))`?+b(@b=`bY3f1qD0=T-BB-y71Sx4Z= zb+oZg)g%s?s-+2vEBm@*7^`J?E0c*d5*xFRC5L2vTaRu zd7Wh6kR?~L6h(6^(-Z~W;_#$9s>=g04N;Or7p#X#Q3!%%(DD@rypCG37-sath3x7& zvh}CKL!Ls0MVGsp-QPfNKTno6l68E(-qKK~`nIhou5LMw!qF$hDOr&?+?qazW&Y&{ zCx(D+ID+JBwxc-?2O8IPLL4)LJKii}{xtGr3~ruIvS334PdS>pj!ncw)EzI)C2i3u zl|jRN^0;7{9CLu4(2-1)2f*5`m;9mPZ`8L8wwk?AWiiMjG zV4vmER)QpGNze+mG(i>ql4aZS=_U+A2GZg1@N$0k&lb=s1fRF_UOiq&m!RMEkRdDc zXq>et>0|_x=aW$(3!Ke>UCVh<2ba*RP|#hGWcQWQyHVtejT%I+O*iueZMLzTu0g=x zIK@G4MHc+njtkgBgXxXYb_MKQB^^dW>q_`VH9ZklGTOL)82tz_b2#mZ!837M2IEK5 zOsF112Ly?33i@SqPpHev1$r_)6A}}6yd%cZg(Tc<1Upbi_r;>_*?tv0n9HD%)}*IS z-87oXjyKWEqt`rBZ=&09G^<(S7J5^R&2=cpq(u%L#H}1?DPInE&Jui4n-AW=LW { #[default] NotConnected, - Connected(Client>), + Connected(Client>), } /// Cloud services are a optional feature that allows you to interact with the cloud services @@ -157,7 +157,7 @@ impl CloudServices { http_client: &ClientWithMiddleware, get_cloud_api_address: Url, domain_name: String, - ) -> Result>, Error> { + ) -> Result>, Error> { let cloud_api_address = http_client .get(get_cloud_api_address) .send() @@ -256,7 +256,7 @@ impl CloudServices { .map_err(Error::FailedToCreateEndpoint)?; endpoint.set_default_client_config(client_config); - Ok(Client::new(RpcClient::new(QuinnConnection::new( + Ok(Client::new(RpcClient::new(QuinnConnector::new( endpoint, cloud_api_address, domain_name, @@ -268,7 +268,7 @@ impl CloudServices { /// If the client is not connected, it will try to connect to the cloud services. /// Available routes documented in /// [`sd_cloud_schema::Service`](https://github.com/spacedriveapp/cloud-services-schema). - pub async fn client(&self) -> Result>, Error> { + pub async fn client(&self) -> Result>, Error> { if let ClientState::Connected(client) = { &*self.client_state.read().await } { return Ok(client.clone()); } diff --git a/core/crates/cloud-services/src/error.rs b/core/crates/cloud-services/src/error.rs index ecb038aa3..ae8709ee6 100644 --- a/core/crates/cloud-services/src/error.rs +++ b/core/crates/cloud-services/src/error.rs @@ -1,11 +1,15 @@ -use sd_cloud_schema::{cloud_p2p, sync::groups, Request, Response}; +use sd_cloud_schema::{ + cloud_p2p, + sync::{self, groups}, + Request, Response, +}; use sd_utils::error::FileIOError; use std::{io, net::AddrParseError}; use quic_rpc::{ pattern::{bidi_streaming, rpc, server_streaming}, - transport::quinn::QuinnConnection, + transport::{mapped::MappedConnector, quinn::QuinnConnector}, }; #[derive(thiserror::Error, Debug)] @@ -69,7 +73,7 @@ pub enum Error { ConnectToCloudP2PNode(anyhow::Error), #[error("Communication error with Cloud P2P node: {0}")] CloudP2PRpcCommunication( - #[from] rpc::Error>, + #[from] rpc::Error>, ), #[error("Cloud P2P not initialized")] CloudP2PNotInitialized, @@ -80,15 +84,43 @@ pub enum Error { // Communication errors #[error("Failed to communicate with RPC backend: {0}")] - RpcCommunication(#[from] rpc::Error>), + RpcCommunication(#[from] rpc::Error>), + #[error("Failed to communicate with RPC sync backend: {0}")] + RpcSyncCommunication( + #[from] + rpc::Error< + MappedConnector>, + >, + ), #[error("Failed to communicate with Server Streaming RPC backend: {0}")] - ServerStreamCommunication(#[from] server_streaming::Error>), + ServerStreamCommunication(#[from] server_streaming::Error>), + #[error("Failed to communicate with Server Streaming RPC sync backend: {0}")] + ServerStreamSyncCommunication( + #[from] + server_streaming::Error< + MappedConnector>, + >, + ), #[error("Failed to receive next response from Server Streaming RPC backend: {0}")] - ServerStreamRecv(#[from] server_streaming::ItemError>), + ServerStreamRecv(#[from] server_streaming::ItemError>), + #[error("Failed to receive next response from Server Streaming RPC sync backend: {0}")] + ServerStreamSyncRecv( + #[from] + server_streaming::ItemError< + MappedConnector>, + >, + ), #[error("Failed to communicate with Bidi Streaming RPC backend: {0}")] - BidiStreamCommunication(#[from] bidi_streaming::Error>), + BidiStreamCommunication(#[from] bidi_streaming::Error>), + #[error("Failed to communicate with Bidi Streaming RPC sync backend: {0}")] + BidiStreamSyncCommunication( + #[from] + bidi_streaming::Error< + MappedConnector>, + >, + ), #[error("Failed to receive next response from Bidi Streaming RPC backend: {0}")] - BidiStreamRecv(#[from] bidi_streaming::ItemError>), + BidiStreamRecv(#[from] bidi_streaming::ItemError>), #[error("Error from backend: {0}")] Backend(#[from] sd_cloud_schema::Error), #[error("Failed to get access token from refresher: {0}")] diff --git a/core/crates/cloud-services/src/lib.rs b/core/crates/cloud-services/src/lib.rs index 615d5397d..064634fac 100644 --- a/core/crates/cloud-services/src/lib.rs +++ b/core/crates/cloud-services/src/lib.rs @@ -48,7 +48,7 @@ pub use sync::{ }; // Re-exports -pub use quic_rpc::transport::quinn::QuinnConnection; +pub use quic_rpc::transport::quinn::QuinnConnector; // Export URL for the auth server pub const AUTH_SERVER_URL: &str = "https://auth.spacedrive.com"; diff --git a/core/crates/cloud-services/src/p2p/new_sync_messages_notifier.rs b/core/crates/cloud-services/src/p2p/new_sync_messages_notifier.rs index 704f3ea54..a8a696ffd 100644 --- a/core/crates/cloud-services/src/p2p/new_sync_messages_notifier.rs +++ b/core/crates/cloud-services/src/p2p/new_sync_messages_notifier.rs @@ -10,7 +10,7 @@ use std::time::Duration; use futures_concurrency::future::Join; use iroh_net::{Endpoint, NodeId}; -use quic_rpc::{transport::quinn::QuinnConnection, RpcClient}; +use quic_rpc::{transport::quinn::QuinnConnector, RpcClient}; use tokio::time::Instant; use tracing::{debug, error, instrument, warn}; @@ -24,7 +24,7 @@ pub async fn dispatch_notifier( devices: Option<(Instant, Vec<(devices::PubId, NodeId)>)>, msgs_tx: flume::Sender, cloud_services: sd_cloud_schema::Client< - QuinnConnection, + QuinnConnector, >, token_refresher: TokenRefresher, endpoint: Endpoint, @@ -63,7 +63,7 @@ async fn notify_peers( device_pub_id: devices::PubId, devices: Option<(Instant, Vec<(devices::PubId, NodeId)>)>, cloud_services: sd_cloud_schema::Client< - QuinnConnection, + QuinnConnector, >, token_refresher: TokenRefresher, endpoint: Endpoint, @@ -128,7 +128,7 @@ async fn connect_and_send_notification( connection_id: &NodeId, endpoint: &Endpoint, ) -> Result<(), Error> { - let client = Client::new(RpcClient::new(QuinnConnection::from_connection( + let client = Client::new(RpcClient::new(QuinnConnector::from_connection( endpoint .connect(*connection_id, CloudP2PALPN::LATEST) .await diff --git a/core/crates/cloud-services/src/p2p/runner.rs b/core/crates/cloud-services/src/p2p/runner.rs index 17e6da08b..c21eb2ef4 100644 --- a/core/crates/cloud-services/src/p2p/runner.rs +++ b/core/crates/cloud-services/src/p2p/runner.rs @@ -30,7 +30,7 @@ use futures_concurrency::stream::Merge; use iroh_net::{Endpoint, NodeId}; use quic_rpc::{ server::{Accepting, RpcChannel, RpcServerError}, - transport::quinn::{QuinnConnection, QuinnServerEndpoint}, + transport::quinn::{QuinnConnector, QuinnListener}, RpcClient, RpcServer, }; use tokio::{ @@ -73,7 +73,7 @@ pub struct Runner { current_device_pub_id: devices::PubId, token_refresher: TokenRefresher, cloud_services: sd_cloud_schema::Client< - QuinnConnection, + QuinnConnector, >, msgs_tx: flume::Sender, endpoint: Endpoint, @@ -111,13 +111,13 @@ impl Clone for Runner { } struct PendingSyncGroupJoin { - channel: RpcChannel>, + channel: RpcChannel>, request: authorize_new_device_in_sync_group::Request, this_device: Device, since: Instant, } -type P2PServerEndpoint = QuinnServerEndpoint; +type P2PServerEndpoint = QuinnListener; impl Runner { pub async fn new( @@ -596,7 +596,7 @@ impl Runner { async fn connect_to_first_available_client( endpoint: &Endpoint, devices_in_group: &[(devices::PubId, NodeId)], -) -> Result>, CloudP2PError> { +) -> Result>, CloudP2PError> { for (device_pub_id, device_connection_id) in devices_in_group { if let Ok(connection) = endpoint .connect(*device_connection_id, CloudP2PALPN::LATEST) @@ -607,7 +607,7 @@ async fn connect_to_first_available_client( debug!(%device_pub_id, "Connected to authorizor device candidate"); return Ok(Client::new(RpcClient::new( - QuinnConnection::from_connection(connection), + QuinnConnector::from_connection(connection), ))); } } @@ -627,7 +627,7 @@ fn setup_server_endpoint( let (connections_tx, connections_rx) = flume::bounded(16); ( - RpcServer::new(QuinnServerEndpoint::handle_connections( + RpcServer::new(QuinnListener::handle_connections( connections_rx, local_addr, )), diff --git a/core/crates/cloud-services/src/sync/receive.rs b/core/crates/cloud-services/src/sync/receive.rs index 363d379db..53a8230a3 100644 --- a/core/crates/cloud-services/src/sync/receive.rs +++ b/core/crates/cloud-services/src/sync/receive.rs @@ -32,7 +32,7 @@ use std::{ use chrono::{DateTime, Utc}; use futures::{FutureExt, StreamExt}; use futures_concurrency::future::{Race, TryJoin}; -use quic_rpc::transport::quinn::QuinnConnection; +use quic_rpc::transport::quinn::QuinnConnector; use serde::{Deserialize, Serialize}; use tokio::{fs, io, sync::Notify, time::sleep}; use tracing::{debug, error, instrument, warn}; @@ -49,7 +49,7 @@ pub struct Receiver { sync_group_pub_id: groups::PubId, device_pub_id: devices::PubId, cloud_services: Arc, - cloud_client: Client>, + cloud_client: Client>, key_manager: Arc, sync: SyncManager, notifiers: Arc, diff --git a/core/crates/cloud-services/src/sync/send.rs b/core/crates/cloud-services/src/sync/send.rs index 5a95240e6..cf7f3a544 100644 --- a/core/crates/cloud-services/src/sync/send.rs +++ b/core/crates/cloud-services/src/sync/send.rs @@ -29,7 +29,7 @@ use std::{ use chrono::{DateTime, Utc}; use futures::{FutureExt, StreamExt, TryStreamExt}; use futures_concurrency::future::{Race, TryJoin}; -use quic_rpc::transport::quinn::QuinnConnection; +use quic_rpc::transport::quinn::QuinnConnector; use tokio::{ sync::{broadcast, Notify}, time::sleep, @@ -60,7 +60,7 @@ pub struct Sender { sync_group_pub_id: groups::PubId, sync: SyncManager, cloud_services: Arc, - cloud_client: Client>, + cloud_client: Client>, key_manager: Arc, is_active: Arc, state_notify: Arc, diff --git a/core/src/api/cloud/devices.rs b/core/src/api/cloud/devices.rs index d775c7c86..ead1db58e 100644 --- a/core/src/api/cloud/devices.rs +++ b/core/src/api/cloud/devices.rs @@ -1,6 +1,6 @@ use crate::api::{Ctx, R}; -use sd_core_cloud_services::QuinnConnection; +use sd_core_cloud_services::QuinnConnector; use sd_cloud_schema::{ auth::AccessToken, @@ -149,7 +149,7 @@ pub fn mount() -> AlphaRouter { } pub async fn hello( - client: &Client>, + client: &Client>, access_token: AccessToken, device_pub_id: PubId, hashed_pub_id: Hash, @@ -270,7 +270,7 @@ pub struct DeviceRegisterData { } pub async fn register( - client: &Client>, + client: &Client>, access_token: AccessToken, DeviceRegisterData { pub_id, diff --git a/core/src/api/cloud/mod.rs b/core/src/api/cloud/mod.rs index e2afdbbf6..538c0d2a4 100644 --- a/core/src/api/cloud/mod.rs +++ b/core/src/api/cloud/mod.rs @@ -4,7 +4,7 @@ use crate::{ Node, }; -use sd_core_cloud_services::{CloudP2P, KeyManager, QuinnConnection, UserResponse}; +use sd_core_cloud_services::{CloudP2P, KeyManager, QuinnConnector, UserResponse}; use sd_cloud_schema::{ auth, @@ -32,7 +32,7 @@ mod sync_groups; async fn try_get_cloud_services_client( node: &Node, -) -> Result>, sd_core_cloud_services::Error> { +) -> Result>, sd_core_cloud_services::Error> { node.cloud_services .client() .await @@ -302,13 +302,7 @@ async fn initialize_cloud_sync( async fn get_client_and_access_token( node: &Node, -) -> Result< - ( - Client>, - auth::AccessToken, - ), - rspc::Error, -> { +) -> Result<(Client>, auth::AccessToken), rspc::Error> { ( try_get_cloud_services_client(node), node.cloud_services