Update quic-rpc crate (#2805)

* Fix breaking changes on quic-rpc

* Rust fmt
This commit is contained in:
Ericson "Fogo" Soares
2024-11-15 04:40:43 -03:00
committed by GitHub
parent 13ee54808b
commit 10423e3f7f
12 changed files with 75 additions and 49 deletions

BIN
Cargo.lock generated
View File

Binary file not shown.

View File

@@ -20,7 +20,7 @@ rust-version = "1.81"
[workspace.dependencies]
# First party dependencies
sd-cloud-schema = { git = "https://github.com/spacedriveapp/cloud-services-schema", rev = "99d59e8fab" }
sd-cloud-schema = { git = "https://github.com/spacedriveapp/cloud-services-schema", rev = "74af2a727c" }
# Third party dependencies used by one or more of our crates
async-channel = "2.3"

View File

@@ -39,16 +39,16 @@ zeroize = { workspace = true }
# External dependencies
anyhow = "1.0.86"
dashmap = "6.1.0"
iroh-net = { version = "0.27", features = ["discovery-local-network", "iroh-relay"] }
iroh-net = { version = "0.28.1", features = ["discovery-local-network", "iroh-relay"] }
paste = "=1.0.15"
quic-rpc = { version = "0.13.0", features = ["quinn-transport"] }
quinn = { package = "iroh-quinn", version = "0.11" }
quic-rpc = { version = "0.15.1", features = ["iroh-net-transport", "quinn-transport"] }
quinn = { package = "iroh-quinn", version = "0.12" }
# Using whatever version of reqwest that reqwest-middleware uses, just putting here to enable some features
reqwest = { version = "0.12", features = ["json", "native-tls-vendored", "stream"] }
reqwest-middleware = { version = "0.3", features = ["json"] }
reqwest-retry = "0.6"
rustls = { version = "=0.23.15", default-features = false, features = ["brotli", "ring", "std"] }
rustls-platform-verifier = "0.3.3"
reqwest-middleware = { version = "0.4", features = ["json"] }
reqwest-retry = "0.7"
rustls = { version = "=0.23.16", default-features = false, features = ["brotli", "ring", "std"] }
rustls-platform-verifier = "0.4.0"
[dev-dependencies]

View File

@@ -6,7 +6,7 @@ use std::{net::SocketAddr, sync::Arc, time::Duration};
use futures::Stream;
use iroh_net::relay::RelayUrl;
use quic_rpc::{transport::quinn::QuinnConnection, RpcClient, RpcMessage};
use quic_rpc::{transport::quinn::QuinnConnector, RpcClient, RpcMessage};
use quinn::{crypto::rustls::QuicClientConfig, ClientConfig, Endpoint};
use reqwest::{IntoUrl, Url};
use reqwest_middleware::{reqwest, ClientBuilder, ClientWithMiddleware};
@@ -22,7 +22,7 @@ use super::{
enum ClientState<In: RpcMessage, Out: RpcMessage> {
#[default]
NotConnected,
Connected(Client<QuinnConnection<In, Out>>),
Connected(Client<QuinnConnector<In, Out>>),
}
/// Cloud services are a optional feature that allows you to interact with the cloud services
@@ -157,7 +157,7 @@ impl CloudServices {
http_client: &ClientWithMiddleware,
get_cloud_api_address: Url,
domain_name: String,
) -> Result<Client<QuinnConnection<Response, Request>>, Error> {
) -> Result<Client<QuinnConnector<Response, Request>>, Error> {
let cloud_api_address = http_client
.get(get_cloud_api_address)
.send()
@@ -256,7 +256,7 @@ impl CloudServices {
.map_err(Error::FailedToCreateEndpoint)?;
endpoint.set_default_client_config(client_config);
Ok(Client::new(RpcClient::new(QuinnConnection::new(
Ok(Client::new(RpcClient::new(QuinnConnector::new(
endpoint,
cloud_api_address,
domain_name,
@@ -268,7 +268,7 @@ impl CloudServices {
/// If the client is not connected, it will try to connect to the cloud services.
/// Available routes documented in
/// [`sd_cloud_schema::Service`](https://github.com/spacedriveapp/cloud-services-schema).
pub async fn client(&self) -> Result<Client<QuinnConnection<Response, Request>>, Error> {
pub async fn client(&self) -> Result<Client<QuinnConnector<Response, Request>>, Error> {
if let ClientState::Connected(client) = { &*self.client_state.read().await } {
return Ok(client.clone());
}

View File

@@ -1,11 +1,15 @@
use sd_cloud_schema::{cloud_p2p, sync::groups, Request, Response};
use sd_cloud_schema::{
cloud_p2p,
sync::{self, groups},
Request, Response,
};
use sd_utils::error::FileIOError;
use std::{io, net::AddrParseError};
use quic_rpc::{
pattern::{bidi_streaming, rpc, server_streaming},
transport::quinn::QuinnConnection,
transport::{mapped::MappedConnector, quinn::QuinnConnector},
};
#[derive(thiserror::Error, Debug)]
@@ -69,7 +73,7 @@ pub enum Error {
ConnectToCloudP2PNode(anyhow::Error),
#[error("Communication error with Cloud P2P node: {0}")]
CloudP2PRpcCommunication(
#[from] rpc::Error<QuinnConnection<cloud_p2p::Response, cloud_p2p::Request>>,
#[from] rpc::Error<QuinnConnector<cloud_p2p::Response, cloud_p2p::Request>>,
),
#[error("Cloud P2P not initialized")]
CloudP2PNotInitialized,
@@ -80,15 +84,43 @@ pub enum Error {
// Communication errors
#[error("Failed to communicate with RPC backend: {0}")]
RpcCommunication(#[from] rpc::Error<QuinnConnection<Response, Request>>),
RpcCommunication(#[from] rpc::Error<QuinnConnector<Response, Request>>),
#[error("Failed to communicate with RPC sync backend: {0}")]
RpcSyncCommunication(
#[from]
rpc::Error<
MappedConnector<sync::Response, sync::Request, QuinnConnector<Response, Request>>,
>,
),
#[error("Failed to communicate with Server Streaming RPC backend: {0}")]
ServerStreamCommunication(#[from] server_streaming::Error<QuinnConnection<Response, Request>>),
ServerStreamCommunication(#[from] server_streaming::Error<QuinnConnector<Response, Request>>),
#[error("Failed to communicate with Server Streaming RPC sync backend: {0}")]
ServerStreamSyncCommunication(
#[from]
server_streaming::Error<
MappedConnector<sync::Response, sync::Request, QuinnConnector<Response, Request>>,
>,
),
#[error("Failed to receive next response from Server Streaming RPC backend: {0}")]
ServerStreamRecv(#[from] server_streaming::ItemError<QuinnConnection<Response, Request>>),
ServerStreamRecv(#[from] server_streaming::ItemError<QuinnConnector<Response, Request>>),
#[error("Failed to receive next response from Server Streaming RPC sync backend: {0}")]
ServerStreamSyncRecv(
#[from]
server_streaming::ItemError<
MappedConnector<sync::Response, sync::Request, QuinnConnector<Response, Request>>,
>,
),
#[error("Failed to communicate with Bidi Streaming RPC backend: {0}")]
BidiStreamCommunication(#[from] bidi_streaming::Error<QuinnConnection<Response, Request>>),
BidiStreamCommunication(#[from] bidi_streaming::Error<QuinnConnector<Response, Request>>),
#[error("Failed to communicate with Bidi Streaming RPC sync backend: {0}")]
BidiStreamSyncCommunication(
#[from]
bidi_streaming::Error<
MappedConnector<sync::Response, sync::Request, QuinnConnector<Response, Request>>,
>,
),
#[error("Failed to receive next response from Bidi Streaming RPC backend: {0}")]
BidiStreamRecv(#[from] bidi_streaming::ItemError<QuinnConnection<Response, Request>>),
BidiStreamRecv(#[from] bidi_streaming::ItemError<QuinnConnector<Response, Request>>),
#[error("Error from backend: {0}")]
Backend(#[from] sd_cloud_schema::Error),
#[error("Failed to get access token from refresher: {0}")]

View File

@@ -48,7 +48,7 @@ pub use sync::{
};
// Re-exports
pub use quic_rpc::transport::quinn::QuinnConnection;
pub use quic_rpc::transport::quinn::QuinnConnector;
// Export URL for the auth server
pub const AUTH_SERVER_URL: &str = "https://auth.spacedrive.com";

View File

@@ -10,7 +10,7 @@ use std::time::Duration;
use futures_concurrency::future::Join;
use iroh_net::{Endpoint, NodeId};
use quic_rpc::{transport::quinn::QuinnConnection, RpcClient};
use quic_rpc::{transport::quinn::QuinnConnector, RpcClient};
use tokio::time::Instant;
use tracing::{debug, error, instrument, warn};
@@ -24,7 +24,7 @@ pub async fn dispatch_notifier(
devices: Option<(Instant, Vec<(devices::PubId, NodeId)>)>,
msgs_tx: flume::Sender<Message>,
cloud_services: sd_cloud_schema::Client<
QuinnConnection<sd_cloud_schema::Response, sd_cloud_schema::Request>,
QuinnConnector<sd_cloud_schema::Response, sd_cloud_schema::Request>,
>,
token_refresher: TokenRefresher,
endpoint: Endpoint,
@@ -63,7 +63,7 @@ async fn notify_peers(
device_pub_id: devices::PubId,
devices: Option<(Instant, Vec<(devices::PubId, NodeId)>)>,
cloud_services: sd_cloud_schema::Client<
QuinnConnection<sd_cloud_schema::Response, sd_cloud_schema::Request>,
QuinnConnector<sd_cloud_schema::Response, sd_cloud_schema::Request>,
>,
token_refresher: TokenRefresher,
endpoint: Endpoint,
@@ -128,7 +128,7 @@ async fn connect_and_send_notification(
connection_id: &NodeId,
endpoint: &Endpoint,
) -> Result<(), Error> {
let client = Client::new(RpcClient::new(QuinnConnection::from_connection(
let client = Client::new(RpcClient::new(QuinnConnector::from_connection(
endpoint
.connect(*connection_id, CloudP2PALPN::LATEST)
.await

View File

@@ -30,7 +30,7 @@ use futures_concurrency::stream::Merge;
use iroh_net::{Endpoint, NodeId};
use quic_rpc::{
server::{Accepting, RpcChannel, RpcServerError},
transport::quinn::{QuinnConnection, QuinnServerEndpoint},
transport::quinn::{QuinnConnector, QuinnListener},
RpcClient, RpcServer,
};
use tokio::{
@@ -73,7 +73,7 @@ pub struct Runner {
current_device_pub_id: devices::PubId,
token_refresher: TokenRefresher,
cloud_services: sd_cloud_schema::Client<
QuinnConnection<sd_cloud_schema::Response, sd_cloud_schema::Request>,
QuinnConnector<sd_cloud_schema::Response, sd_cloud_schema::Request>,
>,
msgs_tx: flume::Sender<Message>,
endpoint: Endpoint,
@@ -111,13 +111,13 @@ impl Clone for Runner {
}
struct PendingSyncGroupJoin {
channel: RpcChannel<Service, QuinnServerEndpoint<cloud_p2p::Request, cloud_p2p::Response>>,
channel: RpcChannel<Service, QuinnListener<cloud_p2p::Request, cloud_p2p::Response>>,
request: authorize_new_device_in_sync_group::Request,
this_device: Device,
since: Instant,
}
type P2PServerEndpoint = QuinnServerEndpoint<cloud_p2p::Request, cloud_p2p::Response>;
type P2PServerEndpoint = QuinnListener<cloud_p2p::Request, cloud_p2p::Response>;
impl Runner {
pub async fn new(
@@ -596,7 +596,7 @@ impl Runner {
async fn connect_to_first_available_client(
endpoint: &Endpoint,
devices_in_group: &[(devices::PubId, NodeId)],
) -> Result<Client<QuinnConnection<cloud_p2p::Response, cloud_p2p::Request>>, CloudP2PError> {
) -> Result<Client<QuinnConnector<cloud_p2p::Response, cloud_p2p::Request>>, CloudP2PError> {
for (device_pub_id, device_connection_id) in devices_in_group {
if let Ok(connection) = endpoint
.connect(*device_connection_id, CloudP2PALPN::LATEST)
@@ -607,7 +607,7 @@ async fn connect_to_first_available_client(
debug!(%device_pub_id, "Connected to authorizor device candidate");
return Ok(Client::new(RpcClient::new(
QuinnConnection::from_connection(connection),
QuinnConnector::from_connection(connection),
)));
}
}
@@ -627,7 +627,7 @@ fn setup_server_endpoint(
let (connections_tx, connections_rx) = flume::bounded(16);
(
RpcServer::new(QuinnServerEndpoint::handle_connections(
RpcServer::new(QuinnListener::handle_connections(
connections_rx,
local_addr,
)),

View File

@@ -32,7 +32,7 @@ use std::{
use chrono::{DateTime, Utc};
use futures::{FutureExt, StreamExt};
use futures_concurrency::future::{Race, TryJoin};
use quic_rpc::transport::quinn::QuinnConnection;
use quic_rpc::transport::quinn::QuinnConnector;
use serde::{Deserialize, Serialize};
use tokio::{fs, io, sync::Notify, time::sleep};
use tracing::{debug, error, instrument, warn};
@@ -49,7 +49,7 @@ pub struct Receiver {
sync_group_pub_id: groups::PubId,
device_pub_id: devices::PubId,
cloud_services: Arc<CloudServices>,
cloud_client: Client<QuinnConnection<Response, Request>>,
cloud_client: Client<QuinnConnector<Response, Request>>,
key_manager: Arc<KeyManager>,
sync: SyncManager,
notifiers: Arc<ReceiveAndIngestNotifiers>,

View File

@@ -29,7 +29,7 @@ use std::{
use chrono::{DateTime, Utc};
use futures::{FutureExt, StreamExt, TryStreamExt};
use futures_concurrency::future::{Race, TryJoin};
use quic_rpc::transport::quinn::QuinnConnection;
use quic_rpc::transport::quinn::QuinnConnector;
use tokio::{
sync::{broadcast, Notify},
time::sleep,
@@ -60,7 +60,7 @@ pub struct Sender {
sync_group_pub_id: groups::PubId,
sync: SyncManager,
cloud_services: Arc<CloudServices>,
cloud_client: Client<QuinnConnection<Response, Request>>,
cloud_client: Client<QuinnConnector<Response, Request>>,
key_manager: Arc<KeyManager>,
is_active: Arc<AtomicBool>,
state_notify: Arc<Notify>,

View File

@@ -1,6 +1,6 @@
use crate::api::{Ctx, R};
use sd_core_cloud_services::QuinnConnection;
use sd_core_cloud_services::QuinnConnector;
use sd_cloud_schema::{
auth::AccessToken,
@@ -149,7 +149,7 @@ pub fn mount() -> AlphaRouter<Ctx> {
}
pub async fn hello(
client: &Client<QuinnConnection<Response, Request>>,
client: &Client<QuinnConnector<Response, Request>>,
access_token: AccessToken,
device_pub_id: PubId,
hashed_pub_id: Hash,
@@ -270,7 +270,7 @@ pub struct DeviceRegisterData {
}
pub async fn register(
client: &Client<QuinnConnection<Response, Request>>,
client: &Client<QuinnConnector<Response, Request>>,
access_token: AccessToken,
DeviceRegisterData {
pub_id,

View File

@@ -4,7 +4,7 @@ use crate::{
Node,
};
use sd_core_cloud_services::{CloudP2P, KeyManager, QuinnConnection, UserResponse};
use sd_core_cloud_services::{CloudP2P, KeyManager, QuinnConnector, UserResponse};
use sd_cloud_schema::{
auth,
@@ -32,7 +32,7 @@ mod sync_groups;
async fn try_get_cloud_services_client(
node: &Node,
) -> Result<Client<QuinnConnection<Response, Request>>, sd_core_cloud_services::Error> {
) -> Result<Client<QuinnConnector<Response, Request>>, sd_core_cloud_services::Error> {
node.cloud_services
.client()
.await
@@ -302,13 +302,7 @@ async fn initialize_cloud_sync(
async fn get_client_and_access_token(
node: &Node,
) -> Result<
(
Client<QuinnConnection<Response, Request>>,
auth::AccessToken,
),
rspc::Error,
> {
) -> Result<(Client<QuinnConnector<Response, Request>>, auth::AccessToken), rspc::Error> {
(
try_get_cloud_services_client(node),
node.cloud_services