diff --git a/Cargo.lock b/Cargo.lock index 8db081fa4..ee6c3188e 100644 Binary files a/Cargo.lock and b/Cargo.lock differ diff --git a/Cargo.toml b/Cargo.toml index 1cc1388c7..0657f6401 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -20,6 +20,9 @@ edition = "2021" repository = "https://github.com/spacedriveapp/spacedrive" [workspace.dependencies] +# First party dependencies +sd-cloud-schema = { git = "https://github.com/spacedriveapp/cloud-services-schema", branch = "main" } + # Third party dependencies used by one or more of our crates async-channel = "2.3" async-trait = "0.1.80" @@ -65,6 +68,7 @@ tracing = "0.1.40" tracing-subscriber = "0.3.18" tracing-test = "0.2.5" uhlc = "0.6.0" # Must follow version used by specta +url = '2.5.0' uuid = "1.8" webp = "0.2.6" # Update blocked by image diff --git a/core/Cargo.toml b/core/Cargo.toml index 2837a9555..1bf128394 100644 --- a/core/Cargo.toml +++ b/core/Cargo.toml @@ -23,6 +23,8 @@ ai = ["dep:sd-ai"] [dependencies] # Inner Core Sub-crates +sd-cloud-schema = { workspace = true } +sd-core-cloud-services = { path = "./crates/cloud-services" } sd-core-file-path-helper = { path = "./crates/file-path-helper" } sd-core-heavy-lifting = { path = "./crates/heavy-lifting" } sd-core-indexer-rules = { path = "./crates/indexer-rules" } diff --git a/core/crates/cloud-services/Cargo.toml b/core/crates/cloud-services/Cargo.toml new file mode 100644 index 000000000..2ae1db975 --- /dev/null +++ b/core/crates/cloud-services/Cargo.toml @@ -0,0 +1,28 @@ +[package] +name = "sd-core-cloud-services" +version = "0.1.0" +edition = "2021" + +[dependencies] +# First party dependencies +sd-cloud-schema = { workspace = true } + +# Workspace dependencies +reqwest = { workspace = true, features = ["native-tls-vendored"] } +thiserror = { workspace = true } +tokio = { workspace = true, features = ["sync"] } +tracing = { workspace = true } + +# External dependencies +quic-rpc = { version = "0.11.0", features = ["quinn-transport"] } +quinn = { package = "iroh-quinn", version = "=0.10.5" } +# rustls-old is locked to the same version that quic-rpc uses and should be only used with quic-rpc +rustls-old = { package = "rustls", version = "0.21.12", default-features = false, features = [ + "logging", + "quic", + "dangerous_configuration", +] } + + +[dev-dependencies] +tokio = { workspace = true, features = ["time", "rt", "sync"] } diff --git a/core/crates/cloud-services/src/error.rs b/core/crates/cloud-services/src/error.rs new file mode 100644 index 000000000..f12a6941f --- /dev/null +++ b/core/crates/cloud-services/src/error.rs @@ -0,0 +1,21 @@ +use std::{io, net::AddrParseError}; + +#[derive(thiserror::Error, Debug)] +pub enum Error { + #[error("Couldn't parse Cloud Services API address URL: {0}")] + InvalidUrl(reqwest::Error), + #[error("Failed to initialize http client: {0}")] + HttpClientInit(reqwest::Error), + #[error("Failed to request Cloud Services API address from Auth Server route: {0}")] + FailedToRequestApiAddress(reqwest::Error), + #[error("Auth Server's Cloud Services API address route returned an error: {0}")] + AuthServerError(reqwest::Error), + #[error( + "Failed to extract response body from Auth Server's Cloud Services API address route: {0}" + )] + FailedToExtractApiAddress(reqwest::Error), + #[error("Failed to parse auth server's Cloud Services API address: {0}")] + FailedToParseApiAddress(#[from] AddrParseError), + #[error("Failed to create endpoint: {0}")] + FailedToCreateEndpoint(io::Error), +} diff --git a/core/crates/cloud-services/src/lib.rs b/core/crates/cloud-services/src/lib.rs new file mode 100644 index 000000000..fa0c83690 --- /dev/null +++ b/core/crates/cloud-services/src/lib.rs @@ -0,0 +1,203 @@ +use sd_cloud_schema::{Client, Service}; + +use std::{net::SocketAddr, sync::Arc, time::Duration}; + +use quic_rpc::{transport::quinn::QuinnConnection, RpcClient}; +use quinn::{ClientConfig, Endpoint}; +use reqwest::{IntoUrl, Url}; +use tokio::sync::RwLock; +use tracing::warn; + +mod error; + +pub use error::Error; + +#[derive(Debug, Default)] +enum ClientState { + #[default] + NotConnected, + Connected(Client, Service>), +} + +/// Cloud services are a optional feature that allows you to interact with the cloud services +/// of Spacedrive. +/// They're optional in two different ways: +/// - The cloud services depends on a user being logged in with our server. +/// - The user being connected to the internet to begin with. +/// As we don't want to force the user to be connected to the internet, we have to make sure +/// that core can always operate without the cloud services. +#[derive(Debug, Clone)] +pub struct CloudServices { + client_state: Arc>, + get_cloud_api_address: Url, + http_client: reqwest::Client, + domain_name: String, +} + +impl CloudServices { + /// Creates a new cloud services client that can be used to interact with the cloud services. + /// The client will try to connect to the cloud services on a best effort basis, as the user + /// might not be connected to the internet. + /// If the client fails to connect, it will try again the next time it's used. + pub async fn new( + get_cloud_api_address: impl IntoUrl, + domain_name: String, + ) -> Result { + let http_client_builder = reqwest::Client::builder().timeout(Duration::from_secs(3)); + + #[cfg(not(debug_assertions))] + { + builder = builder.https_only(true); + } + + let http_client = http_client_builder.build().map_err(Error::HttpClientInit)?; + let get_cloud_api_address = get_cloud_api_address + .into_url() + .map_err(Error::InvalidUrl)?; + + let client_state = match Self::init_client( + &http_client, + get_cloud_api_address.clone(), + domain_name.clone(), + ) + .await + { + Ok(client) => Arc::new(RwLock::new(ClientState::Connected(client))), + Err(e) => { + warn!( + ?e, + "Failed to initialize cloud services client; \ + This is a best effort and we will continue in Not Connected mode" + ); + Arc::new(RwLock::new(ClientState::NotConnected)) + } + }; + + Ok(Self { + client_state, + get_cloud_api_address, + http_client, + domain_name, + }) + } + + async fn init_client( + http_client: &reqwest::Client, + get_cloud_api_address: Url, + domain_name: String, + ) -> Result, Service>, Error> { + let cloud_api_address = http_client + .get(get_cloud_api_address) + .send() + .await + .map_err(Error::FailedToRequestApiAddress)? + .error_for_status() + .map_err(Error::AuthServerError)? + .text() + .await + .map_err(Error::FailedToExtractApiAddress)? + .parse::()?; + + let crypto_config = { + #[cfg(debug_assertions)] + { + struct SkipServerVerification; + impl rustls_old::client::ServerCertVerifier for SkipServerVerification { + fn verify_server_cert( + &self, + _end_entity: &rustls_old::Certificate, + _intermediates: &[rustls_old::Certificate], + _server_name: &rustls_old::ServerName, + _scts: &mut dyn Iterator, + _ocsp_response: &[u8], + _now: std::time::SystemTime, + ) -> Result { + Ok(rustls_old::client::ServerCertVerified::assertion()) + } + } + + rustls_old::ClientConfig::builder() + .with_safe_defaults() + .with_custom_certificate_verifier(Arc::new(SkipServerVerification)) + .with_no_client_auth() + } + + #[cfg(not(debug_assertions))] + { + rustls_old::ClientConfig::builder() + .with_safe_defaults() + .with_no_client_auth() + } + }; + + let client_config = ClientConfig::new(Arc::new(crypto_config)); + + let mut endpoint = Endpoint::client("[::]:0".parse().expect("hardcoded address")) + .map_err(Error::FailedToCreateEndpoint)?; + endpoint.set_default_client_config(client_config); + + // TODO(@fogodev): It's possible that we can't keep the connection alive all the time, + // and need to use single shot connections. I will only be sure when we have + // actually battle-tested the cloud services in core. + Ok(Client::new(RpcClient::new(QuinnConnection::new( + endpoint, + cloud_api_address, + domain_name, + )))) + } + + /// Returns a client to the cloud services. + /// + /// If the client is not connected, it will try to connect to the cloud services. + /// Available routes documented in + /// [`sd_cloud_schema::Service`](https://github.com/spacedriveapp/cloud-services-schema). + pub async fn client(&self) -> Result, Service>, Error> { + if let ClientState::Connected(client) = &*self.client_state.read().await { + return Ok(client.clone()); + } + + // If we're not connected, we need to try to connect. + let client = Self::init_client( + &self.http_client, + self.get_cloud_api_address.clone(), + self.domain_name.clone(), + ) + .await?; + *self.client_state.write().await = ClientState::Connected(client.clone()); + + Ok(client) + } +} + +#[cfg(test)] +mod tests { + use sd_cloud_schema::{auth, devices}; + + use super::*; + + #[tokio::test] + async fn test_client() { + let response = CloudServices::new( + "http://localhost:9420/cloud-api-address", + "localhost".to_string(), + ) + .await + .unwrap() + .client() + .await + .unwrap() + .devices() + .list(devices::list::Request { + access_token: auth::AccessToken("invalid".to_string()), + }) + .await + .unwrap(); + + assert!(matches!( + response, + Err(sd_cloud_schema::Error::Client( + sd_cloud_schema::error::ClientSideError::Unauthorized + )) + )) + } +} diff --git a/core/src/lib.rs b/core/src/lib.rs index bac56e73f..57bfdd11e 100644 --- a/core/src/lib.rs +++ b/core/src/lib.rs @@ -6,6 +6,7 @@ use crate::{ location::LocationManagerError, }; +use sd_core_cloud_services::CloudServices; use sd_core_heavy_lifting::{media_processor::ThumbnailKind, JobSystem}; use sd_core_prisma_helpers::CasId; @@ -80,6 +81,7 @@ pub struct Node { pub http: reqwest::Client, pub task_system: TaskSystem, pub job_system: JobSystem>, + pub cloud_services: Arc, #[cfg(feature = "ai")] pub old_image_labeller: Option, } @@ -128,6 +130,25 @@ impl Node { let (old_jobs, jobs_actor) = old_job::OldJobs::new(); let libraries = library::Libraries::new(data_dir.join("libraries")).await?; + let (get_cloud_api_address, cloud_services_domain_name) = { + #[cfg(debug_assertions)] + { + ( + std::env::var("SD_CLOUD_API_ADDRESS_URL") + .unwrap_or_else(|_| "http://localhost:9420/cloud-api-address".to_string()), + std::env::var("SD_CLOUD_API_DOMAIN_NAME") + .unwrap_or_else(|_| "localhost".to_string()), + ) + } + #[cfg(not(debug_assertions))] + { + ( + "https://auth.spacedrive.com/cloud-api-address".to_string(), + "api.spacedrive.com".to_string(), + ) + } + }; + let task_system = TaskSystem::new(); let (p2p, start_p2p) = p2p::P2PManager::new(config.clone(), libraries.clone()) @@ -149,6 +170,9 @@ impl Node { )), http: reqwest::Client::new(), env, + cloud_services: Arc::new( + CloudServices::new(&get_cloud_api_address, cloud_services_domain_name).await?, + ), #[cfg(feature = "ai")] old_image_labeller: OldImageLabeler::new( YoloV8::model(image_labeler_version)?, @@ -441,6 +465,8 @@ pub enum NodeError { Logger(#[from] FromEnvError), #[error(transparent)] JobSystem(#[from] sd_core_heavy_lifting::JobSystemError), + #[error(transparent)] + CloudServices(#[from] sd_core_cloud_services::Error), #[cfg(feature = "ai")] #[error("ai error: {0}")] diff --git a/crates/ai/Cargo.toml b/crates/ai/Cargo.toml index a1cb437e6..edb1a36ea 100644 --- a/crates/ai/Cargo.toml +++ b/crates/ai/Cargo.toml @@ -36,12 +36,12 @@ thiserror = { workspace = true } tokio = { workspace = true, features = ["fs"] } tokio-stream = { workspace = true } tracing = { workspace = true } +url = { workspace = true } uuid = { workspace = true, features = ["v4", "serde"] } # Note: half and ndarray version must be the same as used in ort half = { version = "2.1", features = ['num-traits'] } ndarray = "0.15" -url = '2.5.0' # Microsoft does not provide a release for osx-gpu. See: https://github.com/microsoft/onnxruntime/releases # "gpu" means CUDA or TensorRT EP. Thus, the ort crate cannot download them at build time.