From 04b60bf0e27bc37ace454e2dc452618e49bd8d0d Mon Sep 17 00:00:00 2001 From: Ericson Soares Date: Fri, 23 Aug 2024 17:38:34 -0300 Subject: [PATCH] Integrate cloud p2p into node and rspc routes --- .../crates/cloud-services/src/cloud_client.rs | 15 +++++++ .../cloud-services/src/cloud_p2p/mod.rs | 4 +- core/src/api/cloud/mod.rs | 43 +++++++++++++++++-- core/src/lib.rs | 21 ++++++++- 4 files changed, 75 insertions(+), 8 deletions(-) diff --git a/core/crates/cloud-services/src/cloud_client.rs b/core/crates/cloud-services/src/cloud_client.rs index 848facd76..986765b5b 100644 --- a/core/crates/cloud-services/src/cloud_client.rs +++ b/core/crates/cloud-services/src/cloud_client.rs @@ -5,6 +5,7 @@ use sd_cloud_schema::{Client, Service, ServicesALPN}; use std::{net::SocketAddr, sync::Arc, time::Duration}; use futures::Stream; +use iroh_net::relay::RelayUrl; use quic_rpc::{transport::quinn::QuinnConnection, RpcClient}; use quinn::{ClientConfig, Endpoint}; use reqwest::{IntoUrl, Url}; @@ -38,6 +39,8 @@ pub struct CloudServices { get_cloud_api_address: Url, http_client: ClientWithMiddleware, domain_name: String, + pub cloud_p2p_dns_origin_name: String, + pub cloud_p2p_relay_url: RelayUrl, pub token_refresher: TokenRefresher, key_manager: Arc>>>, cloud_p2p: Arc>>>, @@ -54,6 +57,8 @@ impl CloudServices { /// If the client fails to connect, it will try again the next time it's used. pub async fn new( get_cloud_api_address: impl IntoUrl + Send, + cloud_p2p_relay_url: impl IntoUrl + Send, + cloud_p2p_dns_origin_name: String, domain_name: String, ) -> Result { let http_client_builder = reqwest::Client::builder().timeout(Duration::from_secs(3)); @@ -63,6 +68,11 @@ impl CloudServices { builder = builder.https_only(true); } + let cloud_p2p_relay_url = cloud_p2p_relay_url + .into_url() + .map_err(Error::InvalidUrl)? + .into(); + let http_client = ClientBuilder::new(http_client_builder.build().map_err(Error::HttpClientInit)?) .with(RetryTransientMiddleware::new_with_policy( @@ -102,6 +112,8 @@ impl CloudServices { ), get_cloud_api_address, http_client, + cloud_p2p_dns_origin_name, + cloud_p2p_relay_url, domain_name, key_manager: Arc::default(), cloud_p2p: Arc::default(), @@ -303,10 +315,13 @@ mod tests { use super::*; + #[ignore] #[tokio::test] async fn test_client() { let response = CloudServices::new( "http://localhost:9420/cloud-api-address", + "http://relay.localhost:9999/", + "dns.localhost:9999".to_string(), "localhost".to_string(), ) .await diff --git a/core/crates/cloud-services/src/cloud_p2p/mod.rs b/core/crates/cloud-services/src/cloud_p2p/mod.rs index edf3502c3..49a11807d 100644 --- a/core/crates/cloud-services/src/cloud_p2p/mod.rs +++ b/core/crates/cloud-services/src/cloud_p2p/mod.rs @@ -76,7 +76,7 @@ pub struct CloudP2P { impl CloudP2P { pub async fn new( current_device_pub_id: devices::PubId, - cloud_services: CloudServices, + cloud_services: &CloudServices, mut rng: CryptoRng, iroh_secret_key: IrohSecretKey, dns_origin_domain: String, @@ -95,7 +95,7 @@ impl CloudP2P { let (msgs_tx, msgs_rx) = flume::bounded(16); spawn({ - let runner = Runner::new(current_device_pub_id, &cloud_services, endpoint).await?; + let runner = Runner::new(current_device_pub_id, cloud_services, endpoint).await?; let user_response_rx = cloud_services.user_response_rx.clone(); async move { diff --git a/core/src/api/cloud/mod.rs b/core/src/api/cloud/mod.rs index 2fd6e31b6..bc39cae49 100644 --- a/core/src/api/cloud/mod.rs +++ b/core/src/api/cloud/mod.rs @@ -5,9 +5,13 @@ use sd_cloud_schema::{ error::{ClientSideError, Error}, users, Client, Service, }; -use sd_core_cloud_services::{IrohSecretKey, KeyManager, QuinnConnection}; +use sd_core_cloud_services::{CloudP2P, IrohSecretKey, KeyManager, QuinnConnection, UserResponse}; use sd_crypto::{CryptoRng, SeedableRng}; +use std::pin::pin; + +use async_stream::stream; +use futures::StreamExt; use rspc::alpha::AlphaRouter; use tracing::error; use uuid::Uuid; @@ -128,14 +132,45 @@ pub(crate) fn mount() -> AlphaRouter { node.cloud_services.set_key_manager(key_manager).await; - // TODO: With this device iroh's secret key (NodeId) now known and we can start the iroh - // node for cloud p2p - todo!("Start iroh node for cloud p2p"); + node.cloud_services + .set_cloud_p2p( + CloudP2P::new( + device_pub_id, + &node.cloud_services, + rng, + iroh_secret_key, + node.cloud_services.cloud_p2p_dns_origin_name.clone(), + node.cloud_services.cloud_p2p_relay_url.clone(), + ) + .await?, + ) + .await; Ok(()) }, ) }) + .procedure( + "listenCloudServicesNotifications", + R.subscription(|node, _: ()| async move { + stream! { + let mut notifications_stream = + pin!(node.cloud_services.stream_user_notifications()); + + while let Some(notification) = notifications_stream.next().await { + yield notification; + } + } + }), + ) + .procedure( + "userResponse", + R.mutation(|node, response: UserResponse| async move { + node.cloud_services.send_user_response(response).await; + + Ok(()) + }), + ) } fn handle_comm_error( diff --git a/core/src/lib.rs b/core/src/lib.rs index ea7d02930..44686e23e 100644 --- a/core/src/lib.rs +++ b/core/src/lib.rs @@ -108,12 +108,21 @@ impl Node { let (old_jobs, jobs_actor) = old_job::OldJobs::new(); let libraries = library::Libraries::new(data_dir.join("libraries")).await?; - let (get_cloud_api_address, cloud_services_domain_name) = { + let ( + get_cloud_api_address, + cloud_p2p_relay_url, + cloud_p2p_dns_origin_name, + cloud_services_domain_name, + ) = { #[cfg(debug_assertions)] { ( std::env::var("SD_CLOUD_API_ADDRESS_URL") .unwrap_or_else(|_| "http://localhost:9420/cloud-api-address".to_string()), + std::env::var("SD_CLOUD_P2P_RELAY_URL") + .unwrap_or_else(|_| "http://relay.localhost:9999/".to_string()), + std::env::var("SD_CLOUD_P2P_DNS_ORIGIN_NAME") + .unwrap_or_else(|_| "dnf.localhost:9999".to_string()), std::env::var("SD_CLOUD_API_DOMAIN_NAME") .unwrap_or_else(|_| "localhost".to_string()), ) @@ -122,6 +131,8 @@ impl Node { { ( "https://auth.spacedrive.com/cloud-api-address".to_string(), + "https://relay.spacedrive.com/".to_string(), + "dns.spacedrive.com".to_string(), "api.spacedrive.com".to_string(), ) } @@ -144,7 +155,13 @@ impl Node { event_bus, libraries, cloud_services: Arc::new( - CloudServices::new(&get_cloud_api_address, cloud_services_domain_name).await?, + CloudServices::new( + &get_cloud_api_address, + cloud_p2p_relay_url, + cloud_p2p_dns_origin_name, + cloud_services_domain_name, + ) + .await?, ), master_rng: Arc::new(Mutex::new(CryptoRng::new()?)), });