mirror of
https://github.com/spacedriveapp/spacedrive.git
synced 2026-04-21 15:07:54 -04:00
Setup client for Cloud Services in Node
This commit is contained in:
BIN
Cargo.lock
generated
BIN
Cargo.lock
generated
Binary file not shown.
@@ -20,6 +20,9 @@ edition = "2021"
|
||||
repository = "https://github.com/spacedriveapp/spacedrive"
|
||||
|
||||
[workspace.dependencies]
|
||||
# First party dependencies
|
||||
sd-cloud-schema = { git = "https://github.com/spacedriveapp/cloud-services-schema", branch = "main" }
|
||||
|
||||
# Third party dependencies used by one or more of our crates
|
||||
async-channel = "2.3"
|
||||
async-trait = "0.1.80"
|
||||
@@ -65,6 +68,7 @@ tracing = "0.1.40"
|
||||
tracing-subscriber = "0.3.18"
|
||||
tracing-test = "0.2.5"
|
||||
uhlc = "0.6.0" # Must follow version used by specta
|
||||
url = '2.5.0'
|
||||
uuid = "1.8"
|
||||
webp = "0.2.6" # Update blocked by image
|
||||
|
||||
|
||||
@@ -23,6 +23,8 @@ ai = ["dep:sd-ai"]
|
||||
|
||||
[dependencies]
|
||||
# Inner Core Sub-crates
|
||||
sd-cloud-schema = { workspace = true }
|
||||
sd-core-cloud-services = { path = "./crates/cloud-services" }
|
||||
sd-core-file-path-helper = { path = "./crates/file-path-helper" }
|
||||
sd-core-heavy-lifting = { path = "./crates/heavy-lifting" }
|
||||
sd-core-indexer-rules = { path = "./crates/indexer-rules" }
|
||||
|
||||
28
core/crates/cloud-services/Cargo.toml
Normal file
28
core/crates/cloud-services/Cargo.toml
Normal file
@@ -0,0 +1,28 @@
|
||||
[package]
|
||||
name = "sd-core-cloud-services"
|
||||
version = "0.1.0"
|
||||
edition = "2021"
|
||||
|
||||
[dependencies]
|
||||
# First party dependencies
|
||||
sd-cloud-schema = { workspace = true }
|
||||
|
||||
# Workspace dependencies
|
||||
reqwest = { workspace = true, features = ["native-tls-vendored"] }
|
||||
thiserror = { workspace = true }
|
||||
tokio = { workspace = true, features = ["sync"] }
|
||||
tracing = { workspace = true }
|
||||
|
||||
# External dependencies
|
||||
quic-rpc = { version = "0.11.0", features = ["quinn-transport"] }
|
||||
quinn = { package = "iroh-quinn", version = "=0.10.5" }
|
||||
# rustls-old is locked to the same version that quic-rpc uses and should be only used with quic-rpc
|
||||
rustls-old = { package = "rustls", version = "0.21.12", default-features = false, features = [
|
||||
"logging",
|
||||
"quic",
|
||||
"dangerous_configuration",
|
||||
] }
|
||||
|
||||
|
||||
[dev-dependencies]
|
||||
tokio = { workspace = true, features = ["time", "rt", "sync"] }
|
||||
21
core/crates/cloud-services/src/error.rs
Normal file
21
core/crates/cloud-services/src/error.rs
Normal file
@@ -0,0 +1,21 @@
|
||||
use std::{io, net::AddrParseError};
|
||||
|
||||
#[derive(thiserror::Error, Debug)]
|
||||
pub enum Error {
|
||||
#[error("Couldn't parse Cloud Services API address URL: {0}")]
|
||||
InvalidUrl(reqwest::Error),
|
||||
#[error("Failed to initialize http client: {0}")]
|
||||
HttpClientInit(reqwest::Error),
|
||||
#[error("Failed to request Cloud Services API address from Auth Server route: {0}")]
|
||||
FailedToRequestApiAddress(reqwest::Error),
|
||||
#[error("Auth Server's Cloud Services API address route returned an error: {0}")]
|
||||
AuthServerError(reqwest::Error),
|
||||
#[error(
|
||||
"Failed to extract response body from Auth Server's Cloud Services API address route: {0}"
|
||||
)]
|
||||
FailedToExtractApiAddress(reqwest::Error),
|
||||
#[error("Failed to parse auth server's Cloud Services API address: {0}")]
|
||||
FailedToParseApiAddress(#[from] AddrParseError),
|
||||
#[error("Failed to create endpoint: {0}")]
|
||||
FailedToCreateEndpoint(io::Error),
|
||||
}
|
||||
203
core/crates/cloud-services/src/lib.rs
Normal file
203
core/crates/cloud-services/src/lib.rs
Normal file
@@ -0,0 +1,203 @@
|
||||
use sd_cloud_schema::{Client, Service};
|
||||
|
||||
use std::{net::SocketAddr, sync::Arc, time::Duration};
|
||||
|
||||
use quic_rpc::{transport::quinn::QuinnConnection, RpcClient};
|
||||
use quinn::{ClientConfig, Endpoint};
|
||||
use reqwest::{IntoUrl, Url};
|
||||
use tokio::sync::RwLock;
|
||||
use tracing::warn;
|
||||
|
||||
mod error;
|
||||
|
||||
pub use error::Error;
|
||||
|
||||
#[derive(Debug, Default)]
|
||||
enum ClientState {
|
||||
#[default]
|
||||
NotConnected,
|
||||
Connected(Client<QuinnConnection<Service>, Service>),
|
||||
}
|
||||
|
||||
/// Cloud services are a optional feature that allows you to interact with the cloud services
|
||||
/// of Spacedrive.
|
||||
/// They're optional in two different ways:
|
||||
/// - The cloud services depends on a user being logged in with our server.
|
||||
/// - The user being connected to the internet to begin with.
|
||||
/// As we don't want to force the user to be connected to the internet, we have to make sure
|
||||
/// that core can always operate without the cloud services.
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct CloudServices {
|
||||
client_state: Arc<RwLock<ClientState>>,
|
||||
get_cloud_api_address: Url,
|
||||
http_client: reqwest::Client,
|
||||
domain_name: String,
|
||||
}
|
||||
|
||||
impl CloudServices {
|
||||
/// Creates a new cloud services client that can be used to interact with the cloud services.
|
||||
/// The client will try to connect to the cloud services on a best effort basis, as the user
|
||||
/// might not be connected to the internet.
|
||||
/// If the client fails to connect, it will try again the next time it's used.
|
||||
pub async fn new(
|
||||
get_cloud_api_address: impl IntoUrl,
|
||||
domain_name: String,
|
||||
) -> Result<Self, Error> {
|
||||
let http_client_builder = reqwest::Client::builder().timeout(Duration::from_secs(3));
|
||||
|
||||
#[cfg(not(debug_assertions))]
|
||||
{
|
||||
builder = builder.https_only(true);
|
||||
}
|
||||
|
||||
let http_client = http_client_builder.build().map_err(Error::HttpClientInit)?;
|
||||
let get_cloud_api_address = get_cloud_api_address
|
||||
.into_url()
|
||||
.map_err(Error::InvalidUrl)?;
|
||||
|
||||
let client_state = match Self::init_client(
|
||||
&http_client,
|
||||
get_cloud_api_address.clone(),
|
||||
domain_name.clone(),
|
||||
)
|
||||
.await
|
||||
{
|
||||
Ok(client) => Arc::new(RwLock::new(ClientState::Connected(client))),
|
||||
Err(e) => {
|
||||
warn!(
|
||||
?e,
|
||||
"Failed to initialize cloud services client; \
|
||||
This is a best effort and we will continue in Not Connected mode"
|
||||
);
|
||||
Arc::new(RwLock::new(ClientState::NotConnected))
|
||||
}
|
||||
};
|
||||
|
||||
Ok(Self {
|
||||
client_state,
|
||||
get_cloud_api_address,
|
||||
http_client,
|
||||
domain_name,
|
||||
})
|
||||
}
|
||||
|
||||
async fn init_client(
|
||||
http_client: &reqwest::Client,
|
||||
get_cloud_api_address: Url,
|
||||
domain_name: String,
|
||||
) -> Result<Client<QuinnConnection<Service>, Service>, Error> {
|
||||
let cloud_api_address = http_client
|
||||
.get(get_cloud_api_address)
|
||||
.send()
|
||||
.await
|
||||
.map_err(Error::FailedToRequestApiAddress)?
|
||||
.error_for_status()
|
||||
.map_err(Error::AuthServerError)?
|
||||
.text()
|
||||
.await
|
||||
.map_err(Error::FailedToExtractApiAddress)?
|
||||
.parse::<SocketAddr>()?;
|
||||
|
||||
let crypto_config = {
|
||||
#[cfg(debug_assertions)]
|
||||
{
|
||||
struct SkipServerVerification;
|
||||
impl rustls_old::client::ServerCertVerifier for SkipServerVerification {
|
||||
fn verify_server_cert(
|
||||
&self,
|
||||
_end_entity: &rustls_old::Certificate,
|
||||
_intermediates: &[rustls_old::Certificate],
|
||||
_server_name: &rustls_old::ServerName,
|
||||
_scts: &mut dyn Iterator<Item = &[u8]>,
|
||||
_ocsp_response: &[u8],
|
||||
_now: std::time::SystemTime,
|
||||
) -> Result<rustls_old::client::ServerCertVerified, rustls_old::Error> {
|
||||
Ok(rustls_old::client::ServerCertVerified::assertion())
|
||||
}
|
||||
}
|
||||
|
||||
rustls_old::ClientConfig::builder()
|
||||
.with_safe_defaults()
|
||||
.with_custom_certificate_verifier(Arc::new(SkipServerVerification))
|
||||
.with_no_client_auth()
|
||||
}
|
||||
|
||||
#[cfg(not(debug_assertions))]
|
||||
{
|
||||
rustls_old::ClientConfig::builder()
|
||||
.with_safe_defaults()
|
||||
.with_no_client_auth()
|
||||
}
|
||||
};
|
||||
|
||||
let client_config = ClientConfig::new(Arc::new(crypto_config));
|
||||
|
||||
let mut endpoint = Endpoint::client("[::]:0".parse().expect("hardcoded address"))
|
||||
.map_err(Error::FailedToCreateEndpoint)?;
|
||||
endpoint.set_default_client_config(client_config);
|
||||
|
||||
// TODO(@fogodev): It's possible that we can't keep the connection alive all the time,
|
||||
// and need to use single shot connections. I will only be sure when we have
|
||||
// actually battle-tested the cloud services in core.
|
||||
Ok(Client::new(RpcClient::new(QuinnConnection::new(
|
||||
endpoint,
|
||||
cloud_api_address,
|
||||
domain_name,
|
||||
))))
|
||||
}
|
||||
|
||||
/// Returns a client to the cloud services.
|
||||
///
|
||||
/// If the client is not connected, it will try to connect to the cloud services.
|
||||
/// Available routes documented in
|
||||
/// [`sd_cloud_schema::Service`](https://github.com/spacedriveapp/cloud-services-schema).
|
||||
pub async fn client(&self) -> Result<Client<QuinnConnection<Service>, Service>, Error> {
|
||||
if let ClientState::Connected(client) = &*self.client_state.read().await {
|
||||
return Ok(client.clone());
|
||||
}
|
||||
|
||||
// If we're not connected, we need to try to connect.
|
||||
let client = Self::init_client(
|
||||
&self.http_client,
|
||||
self.get_cloud_api_address.clone(),
|
||||
self.domain_name.clone(),
|
||||
)
|
||||
.await?;
|
||||
*self.client_state.write().await = ClientState::Connected(client.clone());
|
||||
|
||||
Ok(client)
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use sd_cloud_schema::{auth, devices};
|
||||
|
||||
use super::*;
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_client() {
|
||||
let response = CloudServices::new(
|
||||
"http://localhost:9420/cloud-api-address",
|
||||
"localhost".to_string(),
|
||||
)
|
||||
.await
|
||||
.unwrap()
|
||||
.client()
|
||||
.await
|
||||
.unwrap()
|
||||
.devices()
|
||||
.list(devices::list::Request {
|
||||
access_token: auth::AccessToken("invalid".to_string()),
|
||||
})
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
assert!(matches!(
|
||||
response,
|
||||
Err(sd_cloud_schema::Error::Client(
|
||||
sd_cloud_schema::error::ClientSideError::Unauthorized
|
||||
))
|
||||
))
|
||||
}
|
||||
}
|
||||
@@ -6,6 +6,7 @@ use crate::{
|
||||
location::LocationManagerError,
|
||||
};
|
||||
|
||||
use sd_core_cloud_services::CloudServices;
|
||||
use sd_core_heavy_lifting::{media_processor::ThumbnailKind, JobSystem};
|
||||
use sd_core_prisma_helpers::CasId;
|
||||
|
||||
@@ -80,6 +81,7 @@ pub struct Node {
|
||||
pub http: reqwest::Client,
|
||||
pub task_system: TaskSystem<sd_core_heavy_lifting::Error>,
|
||||
pub job_system: JobSystem<NodeContext, JobContext<NodeContext>>,
|
||||
pub cloud_services: Arc<CloudServices>,
|
||||
#[cfg(feature = "ai")]
|
||||
pub old_image_labeller: Option<OldImageLabeler>,
|
||||
}
|
||||
@@ -128,6 +130,25 @@ impl Node {
|
||||
let (old_jobs, jobs_actor) = old_job::OldJobs::new();
|
||||
let libraries = library::Libraries::new(data_dir.join("libraries")).await?;
|
||||
|
||||
let (get_cloud_api_address, cloud_services_domain_name) = {
|
||||
#[cfg(debug_assertions)]
|
||||
{
|
||||
(
|
||||
std::env::var("SD_CLOUD_API_ADDRESS_URL")
|
||||
.unwrap_or_else(|_| "http://localhost:9420/cloud-api-address".to_string()),
|
||||
std::env::var("SD_CLOUD_API_DOMAIN_NAME")
|
||||
.unwrap_or_else(|_| "localhost".to_string()),
|
||||
)
|
||||
}
|
||||
#[cfg(not(debug_assertions))]
|
||||
{
|
||||
(
|
||||
"https://auth.spacedrive.com/cloud-api-address".to_string(),
|
||||
"api.spacedrive.com".to_string(),
|
||||
)
|
||||
}
|
||||
};
|
||||
|
||||
let task_system = TaskSystem::new();
|
||||
|
||||
let (p2p, start_p2p) = p2p::P2PManager::new(config.clone(), libraries.clone())
|
||||
@@ -149,6 +170,9 @@ impl Node {
|
||||
)),
|
||||
http: reqwest::Client::new(),
|
||||
env,
|
||||
cloud_services: Arc::new(
|
||||
CloudServices::new(&get_cloud_api_address, cloud_services_domain_name).await?,
|
||||
),
|
||||
#[cfg(feature = "ai")]
|
||||
old_image_labeller: OldImageLabeler::new(
|
||||
YoloV8::model(image_labeler_version)?,
|
||||
@@ -441,6 +465,8 @@ pub enum NodeError {
|
||||
Logger(#[from] FromEnvError),
|
||||
#[error(transparent)]
|
||||
JobSystem(#[from] sd_core_heavy_lifting::JobSystemError),
|
||||
#[error(transparent)]
|
||||
CloudServices(#[from] sd_core_cloud_services::Error),
|
||||
|
||||
#[cfg(feature = "ai")]
|
||||
#[error("ai error: {0}")]
|
||||
|
||||
@@ -36,12 +36,12 @@ thiserror = { workspace = true }
|
||||
tokio = { workspace = true, features = ["fs"] }
|
||||
tokio-stream = { workspace = true }
|
||||
tracing = { workspace = true }
|
||||
url = { workspace = true }
|
||||
uuid = { workspace = true, features = ["v4", "serde"] }
|
||||
|
||||
# Note: half and ndarray version must be the same as used in ort
|
||||
half = { version = "2.1", features = ['num-traits'] }
|
||||
ndarray = "0.15"
|
||||
url = '2.5.0'
|
||||
|
||||
# Microsoft does not provide a release for osx-gpu. See: https://github.com/microsoft/onnxruntime/releases
|
||||
# "gpu" means CUDA or TensorRT EP. Thus, the ort crate cannot download them at build time.
|
||||
|
||||
Reference in New Issue
Block a user