Integrate cloud p2p into node and rspc routes

This commit is contained in:
Ericson Soares
2024-08-23 17:38:34 -03:00
parent 35c4b1549c
commit 04b60bf0e2
4 changed files with 75 additions and 8 deletions

View File

@@ -5,6 +5,7 @@ use sd_cloud_schema::{Client, Service, ServicesALPN};
use std::{net::SocketAddr, sync::Arc, time::Duration};
use futures::Stream;
use iroh_net::relay::RelayUrl;
use quic_rpc::{transport::quinn::QuinnConnection, RpcClient};
use quinn::{ClientConfig, Endpoint};
use reqwest::{IntoUrl, Url};
@@ -38,6 +39,8 @@ pub struct CloudServices {
get_cloud_api_address: Url,
http_client: ClientWithMiddleware,
domain_name: String,
pub cloud_p2p_dns_origin_name: String,
pub cloud_p2p_relay_url: RelayUrl,
pub token_refresher: TokenRefresher,
key_manager: Arc<RwLock<Option<Arc<KeyManager>>>>,
cloud_p2p: Arc<RwLock<Option<Arc<CloudP2P>>>>,
@@ -54,6 +57,8 @@ impl CloudServices {
/// If the client fails to connect, it will try again the next time it's used.
pub async fn new(
get_cloud_api_address: impl IntoUrl + Send,
cloud_p2p_relay_url: impl IntoUrl + Send,
cloud_p2p_dns_origin_name: String,
domain_name: String,
) -> Result<Self, Error> {
let http_client_builder = reqwest::Client::builder().timeout(Duration::from_secs(3));
@@ -63,6 +68,11 @@ impl CloudServices {
builder = builder.https_only(true);
}
let cloud_p2p_relay_url = cloud_p2p_relay_url
.into_url()
.map_err(Error::InvalidUrl)?
.into();
let http_client =
ClientBuilder::new(http_client_builder.build().map_err(Error::HttpClientInit)?)
.with(RetryTransientMiddleware::new_with_policy(
@@ -102,6 +112,8 @@ impl CloudServices {
),
get_cloud_api_address,
http_client,
cloud_p2p_dns_origin_name,
cloud_p2p_relay_url,
domain_name,
key_manager: Arc::default(),
cloud_p2p: Arc::default(),
@@ -303,10 +315,13 @@ mod tests {
use super::*;
#[ignore]
#[tokio::test]
async fn test_client() {
let response = CloudServices::new(
"http://localhost:9420/cloud-api-address",
"http://relay.localhost:9999/",
"dns.localhost:9999".to_string(),
"localhost".to_string(),
)
.await

View File

@@ -76,7 +76,7 @@ pub struct CloudP2P {
impl CloudP2P {
pub async fn new(
current_device_pub_id: devices::PubId,
cloud_services: CloudServices,
cloud_services: &CloudServices,
mut rng: CryptoRng,
iroh_secret_key: IrohSecretKey,
dns_origin_domain: String,
@@ -95,7 +95,7 @@ impl CloudP2P {
let (msgs_tx, msgs_rx) = flume::bounded(16);
spawn({
let runner = Runner::new(current_device_pub_id, &cloud_services, endpoint).await?;
let runner = Runner::new(current_device_pub_id, cloud_services, endpoint).await?;
let user_response_rx = cloud_services.user_response_rx.clone();
async move {

View File

@@ -5,9 +5,13 @@ use sd_cloud_schema::{
error::{ClientSideError, Error},
users, Client, Service,
};
use sd_core_cloud_services::{IrohSecretKey, KeyManager, QuinnConnection};
use sd_core_cloud_services::{CloudP2P, IrohSecretKey, KeyManager, QuinnConnection, UserResponse};
use sd_crypto::{CryptoRng, SeedableRng};
use std::pin::pin;
use async_stream::stream;
use futures::StreamExt;
use rspc::alpha::AlphaRouter;
use tracing::error;
use uuid::Uuid;
@@ -128,14 +132,45 @@ pub(crate) fn mount() -> AlphaRouter<Ctx> {
node.cloud_services.set_key_manager(key_manager).await;
// TODO: With this device iroh's secret key (NodeId) now known and we can start the iroh
// node for cloud p2p
todo!("Start iroh node for cloud p2p");
node.cloud_services
.set_cloud_p2p(
CloudP2P::new(
device_pub_id,
&node.cloud_services,
rng,
iroh_secret_key,
node.cloud_services.cloud_p2p_dns_origin_name.clone(),
node.cloud_services.cloud_p2p_relay_url.clone(),
)
.await?,
)
.await;
Ok(())
},
)
})
.procedure(
"listenCloudServicesNotifications",
R.subscription(|node, _: ()| async move {
stream! {
let mut notifications_stream =
pin!(node.cloud_services.stream_user_notifications());
while let Some(notification) = notifications_stream.next().await {
yield notification;
}
}
}),
)
.procedure(
"userResponse",
R.mutation(|node, response: UserResponse| async move {
node.cloud_services.send_user_response(response).await;
Ok(())
}),
)
}
fn handle_comm_error<T, E: std::error::Error + std::fmt::Debug + Send + Sync + 'static>(

View File

@@ -108,12 +108,21 @@ impl Node {
let (old_jobs, jobs_actor) = old_job::OldJobs::new();
let libraries = library::Libraries::new(data_dir.join("libraries")).await?;
let (get_cloud_api_address, cloud_services_domain_name) = {
let (
get_cloud_api_address,
cloud_p2p_relay_url,
cloud_p2p_dns_origin_name,
cloud_services_domain_name,
) = {
#[cfg(debug_assertions)]
{
(
std::env::var("SD_CLOUD_API_ADDRESS_URL")
.unwrap_or_else(|_| "http://localhost:9420/cloud-api-address".to_string()),
std::env::var("SD_CLOUD_P2P_RELAY_URL")
.unwrap_or_else(|_| "http://relay.localhost:9999/".to_string()),
std::env::var("SD_CLOUD_P2P_DNS_ORIGIN_NAME")
.unwrap_or_else(|_| "dnf.localhost:9999".to_string()),
std::env::var("SD_CLOUD_API_DOMAIN_NAME")
.unwrap_or_else(|_| "localhost".to_string()),
)
@@ -122,6 +131,8 @@ impl Node {
{
(
"https://auth.spacedrive.com/cloud-api-address".to_string(),
"https://relay.spacedrive.com/".to_string(),
"dns.spacedrive.com".to_string(),
"api.spacedrive.com".to_string(),
)
}
@@ -144,7 +155,13 @@ impl Node {
event_bus,
libraries,
cloud_services: Arc::new(
CloudServices::new(&get_cloud_api_address, cloud_services_domain_name).await?,
CloudServices::new(
&get_cloud_api_address,
cloud_p2p_relay_url,
cloud_p2p_dns_origin_name,
cloud_services_domain_name,
)
.await?,
),
master_rng: Arc::new(Mutex::new(CryptoRng::new()?)),
});