mirror of
https://github.com/spacedriveapp/spacedrive.git
synced 2026-02-19 23:25:51 -05:00
Merge pull request #2795 from spacedriveapp/update-cloud-services-schema
Update quic-rpc and cloud services schema
This commit is contained in:
BIN
Cargo.lock
generated
BIN
Cargo.lock
generated
Binary file not shown.
@@ -20,7 +20,7 @@ rust-version = "1.81"
|
||||
|
||||
[workspace.dependencies]
|
||||
# First party dependencies
|
||||
sd-cloud-schema = { git = "https://github.com/spacedriveapp/cloud-services-schema", rev = "bbc69c5cb2" }
|
||||
sd-cloud-schema = { git = "https://github.com/spacedriveapp/cloud-services-schema", rev = "99d59e8fab" }
|
||||
|
||||
# Third party dependencies used by one or more of our crates
|
||||
async-channel = "2.3"
|
||||
|
||||
@@ -48,10 +48,10 @@ mkdir -p "$TARGET_DIRECTORY"
|
||||
TARGET_DIRECTORY="$(CDPATH='' cd -- "$TARGET_DIRECTORY" && pwd -P)"
|
||||
|
||||
TARGET_CONFIG=debug
|
||||
# if [ "${CONFIGURATION:-}" = "Release" ]; then
|
||||
# set -- --release
|
||||
# TARGET_CONFIG=release
|
||||
# fi
|
||||
if [ "${CONFIGURATION:-}" = "Release" ]; then
|
||||
set -- --release
|
||||
TARGET_CONFIG=release
|
||||
fi
|
||||
|
||||
trap 'if [ -e "${CARGO_CONFIG}.bak" ]; then mv "${CARGO_CONFIG}.bak" "$CARGO_CONFIG"; fi' EXIT
|
||||
|
||||
|
||||
@@ -41,7 +41,7 @@ anyhow = "1.0.86"
|
||||
dashmap = "6.1.0"
|
||||
iroh-net = { version = "0.27", features = ["discovery-local-network", "iroh-relay"] }
|
||||
paste = "=1.0.15"
|
||||
quic-rpc = { version = "0.12.1", features = ["quinn-transport"] }
|
||||
quic-rpc = { version = "0.13.0", features = ["quinn-transport"] }
|
||||
quinn = { package = "iroh-quinn", version = "0.11" }
|
||||
# Using whatever version of reqwest that reqwest-middleware uses, just putting here to enable some features
|
||||
reqwest = { version = "0.12", features = ["json", "native-tls-vendored", "stream"] }
|
||||
|
||||
@@ -1,12 +1,12 @@
|
||||
use crate::p2p::{NotifyUser, UserResponse};
|
||||
|
||||
use sd_cloud_schema::{Client, Service, ServicesALPN};
|
||||
use sd_cloud_schema::{Client, Request, Response, ServicesALPN};
|
||||
|
||||
use std::{net::SocketAddr, sync::Arc, time::Duration};
|
||||
|
||||
use futures::Stream;
|
||||
use iroh_net::relay::RelayUrl;
|
||||
use quic_rpc::{transport::quinn::QuinnConnection, RpcClient};
|
||||
use quic_rpc::{transport::quinn::QuinnConnection, RpcClient, RpcMessage};
|
||||
use quinn::{crypto::rustls::QuicClientConfig, ClientConfig, Endpoint};
|
||||
use reqwest::{IntoUrl, Url};
|
||||
use reqwest_middleware::{reqwest, ClientBuilder, ClientWithMiddleware};
|
||||
@@ -19,10 +19,10 @@ use super::{
|
||||
};
|
||||
|
||||
#[derive(Debug, Default, Clone)]
|
||||
enum ClientState {
|
||||
enum ClientState<In: RpcMessage, Out: RpcMessage> {
|
||||
#[default]
|
||||
NotConnected,
|
||||
Connected(Client<QuinnConnection<Service>, Service>),
|
||||
Connected(Client<QuinnConnection<In, Out>>),
|
||||
}
|
||||
|
||||
/// Cloud services are a optional feature that allows you to interact with the cloud services
|
||||
@@ -35,7 +35,7 @@ enum ClientState {
|
||||
/// that core can always operate without the cloud services.
|
||||
#[derive(Debug)]
|
||||
pub struct CloudServices {
|
||||
client_state: Arc<RwLock<ClientState>>,
|
||||
client_state: Arc<RwLock<ClientState<Response, Request>>>,
|
||||
get_cloud_api_address: Url,
|
||||
http_client: ClientWithMiddleware,
|
||||
domain_name: String,
|
||||
@@ -157,7 +157,7 @@ impl CloudServices {
|
||||
http_client: &ClientWithMiddleware,
|
||||
get_cloud_api_address: Url,
|
||||
domain_name: String,
|
||||
) -> Result<Client<QuinnConnection<Service>, Service>, Error> {
|
||||
) -> Result<Client<QuinnConnection<Response, Request>>, Error> {
|
||||
let cloud_api_address = http_client
|
||||
.get(get_cloud_api_address)
|
||||
.send()
|
||||
@@ -256,9 +256,6 @@ impl CloudServices {
|
||||
.map_err(Error::FailedToCreateEndpoint)?;
|
||||
endpoint.set_default_client_config(client_config);
|
||||
|
||||
// TODO(@fogodev): It's possible that we can't keep the connection alive all the time,
|
||||
// and need to use single shot connections. I will only be sure when we have
|
||||
// actually battle-tested the cloud services in core.
|
||||
Ok(Client::new(RpcClient::new(QuinnConnection::new(
|
||||
endpoint,
|
||||
cloud_api_address,
|
||||
@@ -271,9 +268,9 @@ impl CloudServices {
|
||||
/// If the client is not connected, it will try to connect to the cloud services.
|
||||
/// Available routes documented in
|
||||
/// [`sd_cloud_schema::Service`](https://github.com/spacedriveapp/cloud-services-schema).
|
||||
pub async fn client(&self) -> Result<Client<QuinnConnection<Service>, Service>, Error> {
|
||||
if let ClientState::Connected(client) = { self.client_state.read().await.clone() } {
|
||||
return Ok(client);
|
||||
pub async fn client(&self) -> Result<Client<QuinnConnection<Response, Request>>, Error> {
|
||||
if let ClientState::Connected(client) = { &*self.client_state.read().await } {
|
||||
return Ok(client.clone());
|
||||
}
|
||||
|
||||
// If we're not connected, we need to try to connect.
|
||||
|
||||
@@ -1,4 +1,4 @@
|
||||
use sd_cloud_schema::{cloud_p2p, sync::groups, Service};
|
||||
use sd_cloud_schema::{cloud_p2p, sync::groups, Request, Response};
|
||||
use sd_utils::error::FileIOError;
|
||||
|
||||
use std::{io, net::AddrParseError};
|
||||
@@ -68,7 +68,9 @@ pub enum Error {
|
||||
#[error("Failed to connect to Cloud P2P node: {0}")]
|
||||
ConnectToCloudP2PNode(anyhow::Error),
|
||||
#[error("Communication error with Cloud P2P node: {0}")]
|
||||
CloudP2PRpcCommunication(#[from] rpc::Error<QuinnConnection<cloud_p2p::Service>>),
|
||||
CloudP2PRpcCommunication(
|
||||
#[from] rpc::Error<QuinnConnection<cloud_p2p::Response, cloud_p2p::Request>>,
|
||||
),
|
||||
#[error("Cloud P2P not initialized")]
|
||||
CloudP2PNotInitialized,
|
||||
#[error("Failed to initialize LocalSwarmDiscovery: {0}")]
|
||||
@@ -78,15 +80,15 @@ pub enum Error {
|
||||
|
||||
// Communication errors
|
||||
#[error("Failed to communicate with RPC backend: {0}")]
|
||||
RpcCommunication(#[from] rpc::Error<QuinnConnection<Service>>),
|
||||
RpcCommunication(#[from] rpc::Error<QuinnConnection<Response, Request>>),
|
||||
#[error("Failed to communicate with Server Streaming RPC backend: {0}")]
|
||||
ServerStreamCommunication(#[from] server_streaming::Error<QuinnConnection<Service>>),
|
||||
ServerStreamCommunication(#[from] server_streaming::Error<QuinnConnection<Response, Request>>),
|
||||
#[error("Failed to receive next response from Server Streaming RPC backend: {0}")]
|
||||
ServerStreamRecv(#[from] server_streaming::ItemError<QuinnConnection<Service>>),
|
||||
ServerStreamRecv(#[from] server_streaming::ItemError<QuinnConnection<Response, Request>>),
|
||||
#[error("Failed to communicate with Bidi Streaming RPC backend: {0}")]
|
||||
BidiStreamCommunication(#[from] bidi_streaming::Error<QuinnConnection<Service>>),
|
||||
BidiStreamCommunication(#[from] bidi_streaming::Error<QuinnConnection<Response, Request>>),
|
||||
#[error("Failed to receive next response from Bidi Streaming RPC backend: {0}")]
|
||||
BidiStreamRecv(#[from] bidi_streaming::ItemError<QuinnConnection<Service>>),
|
||||
BidiStreamRecv(#[from] bidi_streaming::ItemError<QuinnConnection<Response, Request>>),
|
||||
#[error("Error from backend: {0}")]
|
||||
Backend(#[from] sd_cloud_schema::Error),
|
||||
#[error("Failed to get access token from refresher: {0}")]
|
||||
|
||||
@@ -1,7 +1,7 @@
|
||||
use crate::{token_refresher::TokenRefresher, Error};
|
||||
|
||||
use sd_cloud_schema::{
|
||||
cloud_p2p::{Client, CloudP2PALPN, Service},
|
||||
cloud_p2p::{Client, CloudP2PALPN},
|
||||
devices,
|
||||
sync::groups,
|
||||
};
|
||||
@@ -24,8 +24,7 @@ pub async fn dispatch_notifier(
|
||||
devices: Option<(Instant, Vec<(devices::PubId, NodeId)>)>,
|
||||
msgs_tx: flume::Sender<Message>,
|
||||
cloud_services: sd_cloud_schema::Client<
|
||||
QuinnConnection<sd_cloud_schema::Service>,
|
||||
sd_cloud_schema::Service,
|
||||
QuinnConnection<sd_cloud_schema::Response, sd_cloud_schema::Request>,
|
||||
>,
|
||||
token_refresher: TokenRefresher,
|
||||
endpoint: Endpoint,
|
||||
@@ -64,8 +63,7 @@ async fn notify_peers(
|
||||
device_pub_id: devices::PubId,
|
||||
devices: Option<(Instant, Vec<(devices::PubId, NodeId)>)>,
|
||||
cloud_services: sd_cloud_schema::Client<
|
||||
QuinnConnection<sd_cloud_schema::Service>,
|
||||
sd_cloud_schema::Service,
|
||||
QuinnConnection<sd_cloud_schema::Response, sd_cloud_schema::Request>,
|
||||
>,
|
||||
token_refresher: TokenRefresher,
|
||||
endpoint: Endpoint,
|
||||
@@ -130,7 +128,7 @@ async fn connect_and_send_notification(
|
||||
connection_id: &NodeId,
|
||||
endpoint: &Endpoint,
|
||||
) -> Result<(), Error> {
|
||||
let client = Client::new(RpcClient::new(QuinnConnection::<Service>::from_connection(
|
||||
let client = Client::new(RpcClient::new(QuinnConnection::from_connection(
|
||||
endpoint
|
||||
.connect(*connection_id, CloudP2PALPN::LATEST)
|
||||
.await
|
||||
|
||||
@@ -73,8 +73,7 @@ pub struct Runner {
|
||||
current_device_pub_id: devices::PubId,
|
||||
token_refresher: TokenRefresher,
|
||||
cloud_services: sd_cloud_schema::Client<
|
||||
QuinnConnection<sd_cloud_schema::Service>,
|
||||
sd_cloud_schema::Service,
|
||||
QuinnConnection<sd_cloud_schema::Response, sd_cloud_schema::Request>,
|
||||
>,
|
||||
msgs_tx: flume::Sender<Message>,
|
||||
endpoint: Endpoint,
|
||||
@@ -112,12 +111,14 @@ impl Clone for Runner {
|
||||
}
|
||||
|
||||
struct PendingSyncGroupJoin {
|
||||
channel: RpcChannel<Service, QuinnServerEndpoint<Service>>,
|
||||
channel: RpcChannel<Service, QuinnServerEndpoint<cloud_p2p::Request, cloud_p2p::Response>>,
|
||||
request: authorize_new_device_in_sync_group::Request,
|
||||
this_device: Device,
|
||||
since: Instant,
|
||||
}
|
||||
|
||||
type P2PServerEndpoint = QuinnServerEndpoint<cloud_p2p::Request, cloud_p2p::Response>;
|
||||
|
||||
impl Runner {
|
||||
pub async fn new(
|
||||
current_device_pub_id: devices::PubId,
|
||||
@@ -152,10 +153,7 @@ impl Runner {
|
||||
#[allow(clippy::large_enum_variant)]
|
||||
enum StreamMessage {
|
||||
AcceptResult(
|
||||
Result<
|
||||
Accepting<Service, QuinnServerEndpoint<Service>>,
|
||||
RpcServerError<QuinnServerEndpoint<Service>>,
|
||||
>,
|
||||
Result<Accepting<Service, P2PServerEndpoint>, RpcServerError<P2PServerEndpoint>>,
|
||||
),
|
||||
Message(Message),
|
||||
UserResponse(UserResponse),
|
||||
@@ -361,7 +359,7 @@ impl Runner {
|
||||
async fn handle_request(
|
||||
&self,
|
||||
request: cloud_p2p::Request,
|
||||
channel: RpcChannel<Service, QuinnServerEndpoint<Service>>,
|
||||
channel: RpcChannel<Service, P2PServerEndpoint>,
|
||||
) {
|
||||
match request {
|
||||
cloud_p2p::Request::AuthorizeNewDeviceInSyncGroup(
|
||||
@@ -598,7 +596,7 @@ impl Runner {
|
||||
async fn connect_to_first_available_client(
|
||||
endpoint: &Endpoint,
|
||||
devices_in_group: &[(devices::PubId, NodeId)],
|
||||
) -> Result<Client<QuinnConnection<Service>, Service>, CloudP2PError> {
|
||||
) -> Result<Client<QuinnConnection<cloud_p2p::Response, cloud_p2p::Request>>, CloudP2PError> {
|
||||
for (device_pub_id, device_connection_id) in devices_in_group {
|
||||
if let Ok(connection) = endpoint
|
||||
.connect(*device_connection_id, CloudP2PALPN::LATEST)
|
||||
@@ -607,8 +605,9 @@ async fn connect_to_first_available_client(
|
||||
|e| error!(?e, %device_pub_id, "Failed to connect to authorizor device candidate"),
|
||||
) {
|
||||
debug!(%device_pub_id, "Connected to authorizor device candidate");
|
||||
|
||||
return Ok(Client::new(RpcClient::new(
|
||||
QuinnConnection::<Service>::from_connection(connection),
|
||||
QuinnConnection::from_connection(connection),
|
||||
)));
|
||||
}
|
||||
}
|
||||
@@ -618,10 +617,7 @@ async fn connect_to_first_available_client(
|
||||
|
||||
fn setup_server_endpoint(
|
||||
endpoint: Endpoint,
|
||||
) -> (
|
||||
RpcServer<Service, QuinnServerEndpoint<Service>>,
|
||||
JoinHandle<()>,
|
||||
) {
|
||||
) -> (RpcServer<Service, P2PServerEndpoint>, JoinHandle<()>) {
|
||||
let local_addr = {
|
||||
let (ipv4_addr, maybe_ipv6_addr) = endpoint.bound_sockets();
|
||||
// Trying to give preference to IPv6 addresses because it's 2024
|
||||
@@ -631,7 +627,7 @@ fn setup_server_endpoint(
|
||||
let (connections_tx, connections_rx) = flume::bounded(16);
|
||||
|
||||
(
|
||||
RpcServer::new(QuinnServerEndpoint::<Service>::handle_connections(
|
||||
RpcServer::new(QuinnServerEndpoint::handle_connections(
|
||||
connections_rx,
|
||||
local_addr,
|
||||
)),
|
||||
|
||||
@@ -6,7 +6,7 @@ use sd_cloud_schema::{
|
||||
groups,
|
||||
messages::{pull, MessagesCollection},
|
||||
},
|
||||
Client, Service,
|
||||
Client, Request, Response,
|
||||
};
|
||||
use sd_core_sync::{
|
||||
cloud_crdt_op_db, CRDTOperation, CompressedCRDTOperationsPerModel, SyncManager,
|
||||
@@ -49,7 +49,7 @@ pub struct Receiver {
|
||||
sync_group_pub_id: groups::PubId,
|
||||
device_pub_id: devices::PubId,
|
||||
cloud_services: Arc<CloudServices>,
|
||||
cloud_client: Client<QuinnConnection<Service>>,
|
||||
cloud_client: Client<QuinnConnection<Response, Request>>,
|
||||
key_manager: Arc<KeyManager>,
|
||||
sync: SyncManager,
|
||||
notifiers: Arc<ReceiveAndIngestNotifiers>,
|
||||
|
||||
@@ -7,7 +7,7 @@ use sd_cloud_schema::{
|
||||
devices,
|
||||
error::{ClientSideError, NotFoundError},
|
||||
sync::{groups, messages},
|
||||
Client, Service,
|
||||
Client, Request, Response,
|
||||
};
|
||||
use sd_crypto::{
|
||||
cloud::{OneShotEncryption, SecretKey, StreamEncryption},
|
||||
@@ -60,7 +60,7 @@ pub struct Sender {
|
||||
sync_group_pub_id: groups::PubId,
|
||||
sync: SyncManager,
|
||||
cloud_services: Arc<CloudServices>,
|
||||
cloud_client: Client<QuinnConnection<Service>>,
|
||||
cloud_client: Client<QuinnConnection<Response, Request>>,
|
||||
key_manager: Arc<KeyManager>,
|
||||
is_active: Arc<AtomicBool>,
|
||||
state_notify: Arc<Notify>,
|
||||
|
||||
@@ -10,7 +10,7 @@ use sd_cloud_schema::{
|
||||
ClientRegistration, ClientRegistrationFinishParameters, ClientRegistrationFinishResult,
|
||||
ClientRegistrationStartResult,
|
||||
},
|
||||
Client, NodeId, Service, SpacedriveCipherSuite,
|
||||
Client, NodeId, Request, Response, SpacedriveCipherSuite,
|
||||
};
|
||||
use sd_crypto::{cloud::secret_key::SecretKey, CryptoRng};
|
||||
|
||||
@@ -149,7 +149,7 @@ pub fn mount() -> AlphaRouter<Ctx> {
|
||||
}
|
||||
|
||||
pub async fn hello(
|
||||
client: &Client<QuinnConnection<Service>, Service>,
|
||||
client: &Client<QuinnConnection<Response, Request>>,
|
||||
access_token: AccessToken,
|
||||
device_pub_id: PubId,
|
||||
hashed_pub_id: Hash,
|
||||
@@ -270,7 +270,7 @@ pub struct DeviceRegisterData {
|
||||
}
|
||||
|
||||
pub async fn register(
|
||||
client: &Client<QuinnConnection<Service>, Service>,
|
||||
client: &Client<QuinnConnection<Response, Request>>,
|
||||
access_token: AccessToken,
|
||||
DeviceRegisterData {
|
||||
pub_id,
|
||||
|
||||
@@ -10,7 +10,7 @@ use sd_cloud_schema::{
|
||||
auth,
|
||||
error::{ClientSideError, Error},
|
||||
sync::groups,
|
||||
users, Client, SecretKey as IrohSecretKey, Service,
|
||||
users, Client, Request, Response, SecretKey as IrohSecretKey,
|
||||
};
|
||||
use sd_crypto::{CryptoRng, SeedableRng};
|
||||
use sd_utils::error::report_error;
|
||||
@@ -32,7 +32,7 @@ mod sync_groups;
|
||||
|
||||
async fn try_get_cloud_services_client(
|
||||
node: &Node,
|
||||
) -> Result<Client<QuinnConnection<Service>, Service>, sd_core_cloud_services::Error> {
|
||||
) -> Result<Client<QuinnConnection<Response, Request>>, sd_core_cloud_services::Error> {
|
||||
node.cloud_services
|
||||
.client()
|
||||
.await
|
||||
@@ -302,7 +302,13 @@ async fn initialize_cloud_sync(
|
||||
|
||||
async fn get_client_and_access_token(
|
||||
node: &Node,
|
||||
) -> Result<(Client<QuinnConnection<Service>, Service>, auth::AccessToken), rspc::Error> {
|
||||
) -> Result<
|
||||
(
|
||||
Client<QuinnConnection<Response, Request>>,
|
||||
auth::AccessToken,
|
||||
),
|
||||
rspc::Error,
|
||||
> {
|
||||
(
|
||||
try_get_cloud_services_client(node),
|
||||
node.cloud_services
|
||||
|
||||
Reference in New Issue
Block a user