diff --git a/Cargo.lock b/Cargo.lock index f6cc82760..d8ca0858b 100644 Binary files a/Cargo.lock and b/Cargo.lock differ diff --git a/Cargo.toml b/Cargo.toml index c403e7776..1d53fa440 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -20,7 +20,7 @@ rust-version = "1.81" [workspace.dependencies] # First party dependencies -sd-cloud-schema = { git = "https://github.com/spacedriveapp/cloud-services-schema", rev = "99d59e8fab" } +sd-cloud-schema = { git = "https://github.com/spacedriveapp/cloud-services-schema", rev = "74af2a727c" } # Third party dependencies used by one or more of our crates async-channel = "2.3" diff --git a/core/crates/cloud-services/Cargo.toml b/core/crates/cloud-services/Cargo.toml index 2908c9586..55d27b955 100644 --- a/core/crates/cloud-services/Cargo.toml +++ b/core/crates/cloud-services/Cargo.toml @@ -39,16 +39,16 @@ zeroize = { workspace = true } # External dependencies anyhow = "1.0.86" dashmap = "6.1.0" -iroh-net = { version = "0.27", features = ["discovery-local-network", "iroh-relay"] } +iroh-net = { version = "0.28.1", features = ["discovery-local-network", "iroh-relay"] } paste = "=1.0.15" -quic-rpc = { version = "0.13.0", features = ["quinn-transport"] } -quinn = { package = "iroh-quinn", version = "0.11" } +quic-rpc = { version = "0.15.1", features = ["iroh-net-transport", "quinn-transport"] } +quinn = { package = "iroh-quinn", version = "0.12" } # Using whatever version of reqwest that reqwest-middleware uses, just putting here to enable some features reqwest = { version = "0.12", features = ["json", "native-tls-vendored", "stream"] } -reqwest-middleware = { version = "0.3", features = ["json"] } -reqwest-retry = "0.6" -rustls = { version = "=0.23.15", default-features = false, features = ["brotli", "ring", "std"] } -rustls-platform-verifier = "0.3.3" +reqwest-middleware = { version = "0.4", features = ["json"] } +reqwest-retry = "0.7" +rustls = { version = "=0.23.16", default-features = false, features = ["brotli", "ring", "std"] } +rustls-platform-verifier = "0.4.0" [dev-dependencies] diff --git a/core/crates/cloud-services/src/client.rs b/core/crates/cloud-services/src/client.rs index 24b427ef1..2b81c79da 100644 --- a/core/crates/cloud-services/src/client.rs +++ b/core/crates/cloud-services/src/client.rs @@ -6,7 +6,7 @@ use std::{net::SocketAddr, sync::Arc, time::Duration}; use futures::Stream; use iroh_net::relay::RelayUrl; -use quic_rpc::{transport::quinn::QuinnConnection, RpcClient, RpcMessage}; +use quic_rpc::{transport::quinn::QuinnConnector, RpcClient, RpcMessage}; use quinn::{crypto::rustls::QuicClientConfig, ClientConfig, Endpoint}; use reqwest::{IntoUrl, Url}; use reqwest_middleware::{reqwest, ClientBuilder, ClientWithMiddleware}; @@ -22,7 +22,7 @@ use super::{ enum ClientState { #[default] NotConnected, - Connected(Client>), + Connected(Client>), } /// Cloud services are a optional feature that allows you to interact with the cloud services @@ -157,7 +157,7 @@ impl CloudServices { http_client: &ClientWithMiddleware, get_cloud_api_address: Url, domain_name: String, - ) -> Result>, Error> { + ) -> Result>, Error> { let cloud_api_address = http_client .get(get_cloud_api_address) .send() @@ -256,7 +256,7 @@ impl CloudServices { .map_err(Error::FailedToCreateEndpoint)?; endpoint.set_default_client_config(client_config); - Ok(Client::new(RpcClient::new(QuinnConnection::new( + Ok(Client::new(RpcClient::new(QuinnConnector::new( endpoint, cloud_api_address, domain_name, @@ -268,7 +268,7 @@ impl CloudServices { /// If the client is not connected, it will try to connect to the cloud services. /// Available routes documented in /// [`sd_cloud_schema::Service`](https://github.com/spacedriveapp/cloud-services-schema). - pub async fn client(&self) -> Result>, Error> { + pub async fn client(&self) -> Result>, Error> { if let ClientState::Connected(client) = { &*self.client_state.read().await } { return Ok(client.clone()); } diff --git a/core/crates/cloud-services/src/error.rs b/core/crates/cloud-services/src/error.rs index ecb038aa3..ae8709ee6 100644 --- a/core/crates/cloud-services/src/error.rs +++ b/core/crates/cloud-services/src/error.rs @@ -1,11 +1,15 @@ -use sd_cloud_schema::{cloud_p2p, sync::groups, Request, Response}; +use sd_cloud_schema::{ + cloud_p2p, + sync::{self, groups}, + Request, Response, +}; use sd_utils::error::FileIOError; use std::{io, net::AddrParseError}; use quic_rpc::{ pattern::{bidi_streaming, rpc, server_streaming}, - transport::quinn::QuinnConnection, + transport::{mapped::MappedConnector, quinn::QuinnConnector}, }; #[derive(thiserror::Error, Debug)] @@ -69,7 +73,7 @@ pub enum Error { ConnectToCloudP2PNode(anyhow::Error), #[error("Communication error with Cloud P2P node: {0}")] CloudP2PRpcCommunication( - #[from] rpc::Error>, + #[from] rpc::Error>, ), #[error("Cloud P2P not initialized")] CloudP2PNotInitialized, @@ -80,15 +84,43 @@ pub enum Error { // Communication errors #[error("Failed to communicate with RPC backend: {0}")] - RpcCommunication(#[from] rpc::Error>), + RpcCommunication(#[from] rpc::Error>), + #[error("Failed to communicate with RPC sync backend: {0}")] + RpcSyncCommunication( + #[from] + rpc::Error< + MappedConnector>, + >, + ), #[error("Failed to communicate with Server Streaming RPC backend: {0}")] - ServerStreamCommunication(#[from] server_streaming::Error>), + ServerStreamCommunication(#[from] server_streaming::Error>), + #[error("Failed to communicate with Server Streaming RPC sync backend: {0}")] + ServerStreamSyncCommunication( + #[from] + server_streaming::Error< + MappedConnector>, + >, + ), #[error("Failed to receive next response from Server Streaming RPC backend: {0}")] - ServerStreamRecv(#[from] server_streaming::ItemError>), + ServerStreamRecv(#[from] server_streaming::ItemError>), + #[error("Failed to receive next response from Server Streaming RPC sync backend: {0}")] + ServerStreamSyncRecv( + #[from] + server_streaming::ItemError< + MappedConnector>, + >, + ), #[error("Failed to communicate with Bidi Streaming RPC backend: {0}")] - BidiStreamCommunication(#[from] bidi_streaming::Error>), + BidiStreamCommunication(#[from] bidi_streaming::Error>), + #[error("Failed to communicate with Bidi Streaming RPC sync backend: {0}")] + BidiStreamSyncCommunication( + #[from] + bidi_streaming::Error< + MappedConnector>, + >, + ), #[error("Failed to receive next response from Bidi Streaming RPC backend: {0}")] - BidiStreamRecv(#[from] bidi_streaming::ItemError>), + BidiStreamRecv(#[from] bidi_streaming::ItemError>), #[error("Error from backend: {0}")] Backend(#[from] sd_cloud_schema::Error), #[error("Failed to get access token from refresher: {0}")] diff --git a/core/crates/cloud-services/src/lib.rs b/core/crates/cloud-services/src/lib.rs index 615d5397d..064634fac 100644 --- a/core/crates/cloud-services/src/lib.rs +++ b/core/crates/cloud-services/src/lib.rs @@ -48,7 +48,7 @@ pub use sync::{ }; // Re-exports -pub use quic_rpc::transport::quinn::QuinnConnection; +pub use quic_rpc::transport::quinn::QuinnConnector; // Export URL for the auth server pub const AUTH_SERVER_URL: &str = "https://auth.spacedrive.com"; diff --git a/core/crates/cloud-services/src/p2p/new_sync_messages_notifier.rs b/core/crates/cloud-services/src/p2p/new_sync_messages_notifier.rs index 704f3ea54..a8a696ffd 100644 --- a/core/crates/cloud-services/src/p2p/new_sync_messages_notifier.rs +++ b/core/crates/cloud-services/src/p2p/new_sync_messages_notifier.rs @@ -10,7 +10,7 @@ use std::time::Duration; use futures_concurrency::future::Join; use iroh_net::{Endpoint, NodeId}; -use quic_rpc::{transport::quinn::QuinnConnection, RpcClient}; +use quic_rpc::{transport::quinn::QuinnConnector, RpcClient}; use tokio::time::Instant; use tracing::{debug, error, instrument, warn}; @@ -24,7 +24,7 @@ pub async fn dispatch_notifier( devices: Option<(Instant, Vec<(devices::PubId, NodeId)>)>, msgs_tx: flume::Sender, cloud_services: sd_cloud_schema::Client< - QuinnConnection, + QuinnConnector, >, token_refresher: TokenRefresher, endpoint: Endpoint, @@ -63,7 +63,7 @@ async fn notify_peers( device_pub_id: devices::PubId, devices: Option<(Instant, Vec<(devices::PubId, NodeId)>)>, cloud_services: sd_cloud_schema::Client< - QuinnConnection, + QuinnConnector, >, token_refresher: TokenRefresher, endpoint: Endpoint, @@ -128,7 +128,7 @@ async fn connect_and_send_notification( connection_id: &NodeId, endpoint: &Endpoint, ) -> Result<(), Error> { - let client = Client::new(RpcClient::new(QuinnConnection::from_connection( + let client = Client::new(RpcClient::new(QuinnConnector::from_connection( endpoint .connect(*connection_id, CloudP2PALPN::LATEST) .await diff --git a/core/crates/cloud-services/src/p2p/runner.rs b/core/crates/cloud-services/src/p2p/runner.rs index 17e6da08b..c21eb2ef4 100644 --- a/core/crates/cloud-services/src/p2p/runner.rs +++ b/core/crates/cloud-services/src/p2p/runner.rs @@ -30,7 +30,7 @@ use futures_concurrency::stream::Merge; use iroh_net::{Endpoint, NodeId}; use quic_rpc::{ server::{Accepting, RpcChannel, RpcServerError}, - transport::quinn::{QuinnConnection, QuinnServerEndpoint}, + transport::quinn::{QuinnConnector, QuinnListener}, RpcClient, RpcServer, }; use tokio::{ @@ -73,7 +73,7 @@ pub struct Runner { current_device_pub_id: devices::PubId, token_refresher: TokenRefresher, cloud_services: sd_cloud_schema::Client< - QuinnConnection, + QuinnConnector, >, msgs_tx: flume::Sender, endpoint: Endpoint, @@ -111,13 +111,13 @@ impl Clone for Runner { } struct PendingSyncGroupJoin { - channel: RpcChannel>, + channel: RpcChannel>, request: authorize_new_device_in_sync_group::Request, this_device: Device, since: Instant, } -type P2PServerEndpoint = QuinnServerEndpoint; +type P2PServerEndpoint = QuinnListener; impl Runner { pub async fn new( @@ -596,7 +596,7 @@ impl Runner { async fn connect_to_first_available_client( endpoint: &Endpoint, devices_in_group: &[(devices::PubId, NodeId)], -) -> Result>, CloudP2PError> { +) -> Result>, CloudP2PError> { for (device_pub_id, device_connection_id) in devices_in_group { if let Ok(connection) = endpoint .connect(*device_connection_id, CloudP2PALPN::LATEST) @@ -607,7 +607,7 @@ async fn connect_to_first_available_client( debug!(%device_pub_id, "Connected to authorizor device candidate"); return Ok(Client::new(RpcClient::new( - QuinnConnection::from_connection(connection), + QuinnConnector::from_connection(connection), ))); } } @@ -627,7 +627,7 @@ fn setup_server_endpoint( let (connections_tx, connections_rx) = flume::bounded(16); ( - RpcServer::new(QuinnServerEndpoint::handle_connections( + RpcServer::new(QuinnListener::handle_connections( connections_rx, local_addr, )), diff --git a/core/crates/cloud-services/src/sync/receive.rs b/core/crates/cloud-services/src/sync/receive.rs index 363d379db..53a8230a3 100644 --- a/core/crates/cloud-services/src/sync/receive.rs +++ b/core/crates/cloud-services/src/sync/receive.rs @@ -32,7 +32,7 @@ use std::{ use chrono::{DateTime, Utc}; use futures::{FutureExt, StreamExt}; use futures_concurrency::future::{Race, TryJoin}; -use quic_rpc::transport::quinn::QuinnConnection; +use quic_rpc::transport::quinn::QuinnConnector; use serde::{Deserialize, Serialize}; use tokio::{fs, io, sync::Notify, time::sleep}; use tracing::{debug, error, instrument, warn}; @@ -49,7 +49,7 @@ pub struct Receiver { sync_group_pub_id: groups::PubId, device_pub_id: devices::PubId, cloud_services: Arc, - cloud_client: Client>, + cloud_client: Client>, key_manager: Arc, sync: SyncManager, notifiers: Arc, diff --git a/core/crates/cloud-services/src/sync/send.rs b/core/crates/cloud-services/src/sync/send.rs index 5a95240e6..cf7f3a544 100644 --- a/core/crates/cloud-services/src/sync/send.rs +++ b/core/crates/cloud-services/src/sync/send.rs @@ -29,7 +29,7 @@ use std::{ use chrono::{DateTime, Utc}; use futures::{FutureExt, StreamExt, TryStreamExt}; use futures_concurrency::future::{Race, TryJoin}; -use quic_rpc::transport::quinn::QuinnConnection; +use quic_rpc::transport::quinn::QuinnConnector; use tokio::{ sync::{broadcast, Notify}, time::sleep, @@ -60,7 +60,7 @@ pub struct Sender { sync_group_pub_id: groups::PubId, sync: SyncManager, cloud_services: Arc, - cloud_client: Client>, + cloud_client: Client>, key_manager: Arc, is_active: Arc, state_notify: Arc, diff --git a/core/src/api/cloud/devices.rs b/core/src/api/cloud/devices.rs index d775c7c86..ead1db58e 100644 --- a/core/src/api/cloud/devices.rs +++ b/core/src/api/cloud/devices.rs @@ -1,6 +1,6 @@ use crate::api::{Ctx, R}; -use sd_core_cloud_services::QuinnConnection; +use sd_core_cloud_services::QuinnConnector; use sd_cloud_schema::{ auth::AccessToken, @@ -149,7 +149,7 @@ pub fn mount() -> AlphaRouter { } pub async fn hello( - client: &Client>, + client: &Client>, access_token: AccessToken, device_pub_id: PubId, hashed_pub_id: Hash, @@ -270,7 +270,7 @@ pub struct DeviceRegisterData { } pub async fn register( - client: &Client>, + client: &Client>, access_token: AccessToken, DeviceRegisterData { pub_id, diff --git a/core/src/api/cloud/mod.rs b/core/src/api/cloud/mod.rs index e2afdbbf6..538c0d2a4 100644 --- a/core/src/api/cloud/mod.rs +++ b/core/src/api/cloud/mod.rs @@ -4,7 +4,7 @@ use crate::{ Node, }; -use sd_core_cloud_services::{CloudP2P, KeyManager, QuinnConnection, UserResponse}; +use sd_core_cloud_services::{CloudP2P, KeyManager, QuinnConnector, UserResponse}; use sd_cloud_schema::{ auth, @@ -32,7 +32,7 @@ mod sync_groups; async fn try_get_cloud_services_client( node: &Node, -) -> Result>, sd_core_cloud_services::Error> { +) -> Result>, sd_core_cloud_services::Error> { node.cloud_services .client() .await @@ -302,13 +302,7 @@ async fn initialize_cloud_sync( async fn get_client_and_access_token( node: &Node, -) -> Result< - ( - Client>, - auth::AccessToken, - ), - rspc::Error, -> { +) -> Result<(Client>, auth::AccessToken), rspc::Error> { ( try_get_cloud_services_client(node), node.cloud_services