diff --git a/apps/desktop/src-tauri/src/main.rs b/apps/desktop/src-tauri/src/main.rs index bdb4d16dc..87c0d807e 100644 --- a/apps/desktop/src-tauri/src/main.rs +++ b/apps/desktop/src-tauri/src/main.rs @@ -182,14 +182,7 @@ async fn main() -> tauri::Result<()> { let (_guard, result) = match Node::init_logger(&data_dir) { Ok(guard) => ( Some(guard), - Node::new( - data_dir, - sd_core::Env { - api_url: "https://app.spacedrive.com".to_string(), - client_id: CLIENT_ID.to_string(), - }, - ) - .await, + Node::new(data_dir, sd_core::Env::new(CLIENT_ID)).await, ), Err(err) => (None, Err(NodeError::Logger(err))), }; @@ -446,7 +439,8 @@ fn mouse_position(window: &Window) -> (f64, f64) { fn difference(a: f64, b: f64) -> f64 { let x = a - b; if x < 0.0 { - return x * -1.0; + x * -1.0 + } else { + x } - return x; } diff --git a/apps/desktop/src-tauri/src/tauri_plugins.rs b/apps/desktop/src-tauri/src/tauri_plugins.rs index 6128511b9..9d483fe81 100644 --- a/apps/desktop/src-tauri/src/tauri_plugins.rs +++ b/apps/desktop/src-tauri/src/tauri_plugins.rs @@ -1,5 +1,4 @@ use std::{ - io, net::Ipv4Addr, pin::Pin, sync::Arc, diff --git a/apps/mobile/modules/sd-core/core/src/lib.rs b/apps/mobile/modules/sd-core/core/src/lib.rs index e740ab4ba..891bab37b 100644 --- a/apps/mobile/modules/sd-core/core/src/lib.rs +++ b/apps/mobile/modules/sd-core/core/src/lib.rs @@ -75,15 +75,9 @@ pub fn handle_core_msg( let _guard = Node::init_logger(&data_dir); // TODO: probably don't unwrap - let new_node = Node::new( - data_dir, - sd_core::Env { - api_url: "https://app.spacedrive.com".to_string(), - client_id: CLIENT_ID.to_string(), - }, - ) - .await - .unwrap(); + let new_node = Node::new(data_dir, sd_core::Env::new(CLIENT_ID)) + .await + .unwrap(); node.replace(new_node.clone()); new_node } diff --git a/apps/server/src/main.rs b/apps/server/src/main.rs index a50283fb6..8d80008b2 100644 --- a/apps/server/src/main.rs +++ b/apps/server/src/main.rs @@ -40,8 +40,10 @@ async fn main() { let (node, router) = match Node::new( data_dir, sd_core::Env { - api_url: std::env::var("SD_API_URL") - .unwrap_or_else(|_| "https://app.spacedrive.com".to_string()), + api_url: tokio::sync::Mutex::new( + std::env::var("SD_API_URL") + .unwrap_or_else(|_| "https://app.spacedrive.com".to_string()), + ), client_id: std::env::var("SD_CLIENT_ID") .unwrap_or_else(|_| "04701823-a498-406e-aef9-22081c1dae34".to_string()), }, diff --git a/core/crates/sync/src/ingest.rs b/core/crates/sync/src/ingest.rs index afc784e00..78d2dfbf3 100644 --- a/core/crates/sync/src/ingest.rs +++ b/core/crates/sync/src/ingest.rs @@ -136,35 +136,13 @@ impl Actor { if !is_old { self.apply_op(op).await.ok(); - } - // self.db - // ._transaction() - // .run({ - // let timestamps = self.timestamps.clone(); - // |db| async move { - // match db - // .instance() - // .update( - // instance::pub_id::equals(uuid_to_bytes(op_instance)), - // vec![instance::timestamp::set(Some(timestamp.as_u64() as i64))], - // ) - // .exec() - // .await - // { - // Ok(_) => { - self.timestamps.write().await.insert(op_instance, timestamp); - // Ok(()) - // } - // Err(e) => Err(e), - // } - // } - // }) - // .await - // .unwrap(); + self.timestamps.write().await.insert(op_instance, timestamp); + } } async fn apply_op(&mut self, op: CRDTOperation) -> prisma_client_rust::Result<()> { + // TODO: Needs to be transaction-ified ModelSyncData::from_op(op.clone()) .unwrap() .exec(&self.db) diff --git a/core/crates/sync/tests/lib.rs b/core/crates/sync/tests/lib.rs index 02866cca5..e8521093b 100644 --- a/core/crates/sync/tests/lib.rs +++ b/core/crates/sync/tests/lib.rs @@ -208,7 +208,6 @@ async fn bruh() -> Result<(), Box> { .await?; assert_eq!(out.len(), 3); - assert!(matches!(out[0].typ, CRDTOperationType::Shared(_))); instance1.teardown().await; instance2.teardown().await; diff --git a/core/src/api/auth.rs b/core/src/api/auth.rs index 0b023345a..79a672514 100644 --- a/core/src/api/auth.rs +++ b/core/src/api/auth.rs @@ -1,23 +1,12 @@ -use crate::util::http::ensure_response; - use std::time::Duration; -use reqwest::{Response, StatusCode}; +use reqwest::StatusCode; use rspc::alpha::AlphaRouter; -use serde::{de::DeserializeOwned, Deserialize, Serialize}; +use serde::{Deserialize, Serialize}; use specta::Type; use super::{Ctx, R}; -async fn parse_json_body(response: Response) -> Result { - response.json().await.map_err(|_| { - rspc::Error::new( - rspc::ErrorCode::InternalServerError, - "JSON conversion failed".to_string(), - ) - }) -} - pub(crate) fn mount() -> AlphaRouter { R.router() .procedure("loginSession", { @@ -30,30 +19,42 @@ pub(crate) fn mount() -> AlphaRouter { verification_url_complete: String, }, Complete, - Error, + Error(String), } R.subscription(|node, _: ()| async move { - async_stream::stream! { - #[derive(Deserialize, Type)] - struct DeviceAuthorizationResponse { - device_code: String, - user_code: String, - verification_url: String, - verification_uri_complete: String, - } + #[derive(Deserialize, Type)] + struct DeviceAuthorizationResponse { + device_code: String, + user_code: String, + verification_url: String, + verification_uri_complete: String, + } - let Ok(auth_response) = (match node.http - .post(&format!("{}/login/device/code", &node.env.api_url)) + async_stream::stream! { + let auth_response = match match node + .http + .post(&format!( + "{}/login/device/code", + &node.env.api_url.lock().await + )) .form(&[("client_id", &node.env.client_id)]) .send() - .await { - Ok(r) => r.json::().await, - Err(e) => Err(e) - }) else { - yield Response::Error; - return; - }; + .await + .map_err(|e| e.to_string()) + { + Ok(r) => r.json::().await.map_err(|e| e.to_string()), + Err(e) => { + yield Response::Error(e.to_string()); + return + }, + } { + Ok(v) => v, + Err(e) => { + yield Response::Error(e.to_string()); + return + }, + }; yield Response::Start { user_code: auth_response.user_code.clone(), @@ -64,28 +65,30 @@ pub(crate) fn mount() -> AlphaRouter { yield loop { tokio::time::sleep(Duration::from_secs(5)).await; - let Ok(token_resp) = node.http - .post(&format!("{}/login/oauth/access_token", &node.env.api_url)) + let token_resp = match node.http + .post(&format!("{}/login/oauth/access_token", &node.env.api_url.lock().await)) .form(&[ ("grant_type", sd_cloud_api::auth::DEVICE_CODE_URN), ("device_code", &auth_response.device_code), ("client_id", &node.env.client_id) ]) .send() - .await else { - break Response::Error; + .await { + Ok(v) => v, + Err(e) => break Response::Error(e.to_string()) }; match token_resp.status() { StatusCode::OK => { - let Ok(token) = token_resp.json().await else { - break Response::Error; + let token = match token_resp.json().await { + Ok(v) => v, + Err(e) => break Response::Error(e.to_string()) }; - if node.config + if let Err(e) = node.config .write(|c| c.auth_token = Some(token)) - .await.is_err() { - break Response::Error; + .await { + break Response::Error(e.to_string()); }; @@ -97,19 +100,20 @@ pub(crate) fn mount() -> AlphaRouter { error: String } - let Ok(resp) = token_resp.json::().await else { - break Response::Error; + let resp = match token_resp.json::().await { + Ok(v) => v, + Err(e) => break Response::Error(e.to_string()) }; match resp.error.as_str() { "authorization_pending" => continue, - _ => { - break Response::Error; + e => { + break Response::Error(e.to_string()) } } }, - _ => { - break Response::Error; + s => { + break Response::Error(s.to_string()); } } } @@ -133,21 +137,9 @@ pub(crate) fn mount() -> AlphaRouter { ) .procedure("me", { R.query(|node, _: ()| async move { - #[derive(Serialize, Deserialize, Type)] - #[specta(inline)] - struct Response { - id: String, - email: String, - } + let resp = sd_cloud_api::user::me(node.cloud_api_config().await).await?; - node.authed_api_request( - node.http - .get(&format!("{}/api/v1/user/me", &node.env.api_url)), - ) - .await - .and_then(ensure_response) - .map(parse_json_body::)? - .await + Ok(resp) }) }) } diff --git a/core/src/api/cloud.rs b/core/src/api/cloud.rs index 67a4444ae..270ac482a 100644 --- a/core/src/api/cloud.rs +++ b/core/src/api/cloud.rs @@ -22,10 +22,22 @@ pub(crate) fn mount() -> AlphaRouter { R.router() .merge("library.", library::mount()) .merge("locations.", locations::mount()) + .procedure("getApiOrigin", { + R.query(|node, _: ()| async move { Ok(node.env.api_url.lock().await.to_string()) }) + }) + .procedure("setApiOrigin", { + R.mutation(|node, origin: String| async move { + let mut origin_env = node.env.api_url.lock().await; + *origin_env = origin; + + node.config.write(|c| c.auth_token = None).await.ok(); + + Ok(()) + }) + }) } mod library { - use super::*; pub fn mount() -> AlphaRouter { @@ -116,11 +128,8 @@ mod locations { }; use http_body::Full; use serde::{Deserialize, Serialize}; - use serde_json::json; use specta::Type; - use crate::util::http::ensure_response; - use super::*; #[derive(Type, Serialize, Deserialize)] @@ -129,58 +138,27 @@ mod locations { name: String, } - #[derive(Debug, Clone, Type, Deserialize)] - pub struct AuthoriseResponse { - access_key_id: String, - secret_access_key: String, - session_token: String, - } - pub fn mount() -> AlphaRouter { R.router() .procedure("list", { R.query(|node, _: ()| async move { - let api_url = &node.env.api_url; - - node.authed_api_request(node.http.get(&format!("{api_url}/api/v1/locations"))) - .await - .and_then(ensure_response) - .map(parse_json_body::>)? + sd_cloud_api::locations::list(node.cloud_api_config().await) .await + .map_err(Into::into) }) }) .procedure("create", { R.mutation(|node, name: String| async move { - let api_url = &node.env.api_url; - - node.authed_api_request( - node.http - .post(&format!("{api_url}/api/v1/locations")) - .json(&json!({ - "name": name - })), - ) - .await - .and_then(ensure_response) - .map(parse_json_body::)? - .await + sd_cloud_api::locations::create(node.cloud_api_config().await, name) + .await + .map_err(Into::into) }) }) .procedure("remove", { R.mutation(|node, id: String| async move { - let api_url = &node.env.api_url; - - node.authed_api_request( - node.http - .post(&format!("{api_url}/api/v1/locations/delete")) - .json(&json!({ - "id": id - })), - ) - .await - .and_then(ensure_response)?; - - Ok(()) + sd_cloud_api::locations::create(node.cloud_api_config().await, id) + .await + .map_err(Into::into) }) }) // TODO: Remove this @@ -190,7 +168,7 @@ mod locations { // Lazy::new(|| Mutex::new(None)); #[derive(Debug)] - pub struct CredentialsProvider(AuthoriseResponse); + pub struct CredentialsProvider(sd_cloud_api::locations::authorise::Response); impl ProvideCredentials for CredentialsProvider { fn provide_credentials<'a>(&'a self) -> future::ProvideCredentials<'a> @@ -221,19 +199,11 @@ mod locations { let token = { let token = &mut None; // AUTH_TOKEN.lock().await; // TODO: Caching of the token. For now it's annoying when debugging. if token.is_none() { - let api_url = &node.env.api_url; - *token = Some( - node.authed_api_request( - node.http - .post(&format!("{api_url}/api/v1/locations/authorise")) - .json(&json!({ - "id": params.id - })), + sd_cloud_api::locations::authorise( + node.cloud_api_config().await, + params.id, ) - .await - .and_then(ensure_response) - .map(parse_json_body::)? .await?, ); } diff --git a/core/src/api/search/mod.rs b/core/src/api/search/mod.rs index 1a45a1aae..5d54844eb 100644 --- a/core/src/api/search/mod.rs +++ b/core/src/api/search/mod.rs @@ -210,7 +210,7 @@ pub fn mount() -> AlphaRouter { }| async move { let Library { db, .. } = library.as_ref(); - let mut query = db.file_path().find_many({ + let params = { let mut params = Vec::new(); for filter in filters { @@ -218,7 +218,9 @@ pub fn mount() -> AlphaRouter { } params - }); + }; + + let mut query = db.file_path().find_many(params); if let Some(take) = take { query = query.take(take as i64); diff --git a/core/src/api/web_api.rs b/core/src/api/web_api.rs index 674484957..49802bb5b 100644 --- a/core/src/api/web_api.rs +++ b/core/src/api/web_api.rs @@ -1,5 +1,3 @@ -use crate::util::http::ensure_response; - use rspc::alpha::AlphaRouter; use serde::{Deserialize, Serialize}; use specta::Type; @@ -17,21 +15,12 @@ pub(crate) fn mount() -> AlphaRouter { } |node, args: Feedback| async move { - node.add_auth_header( - node.http - .post(&format!("{}/api/v1/feedback", &node.env.api_url)), + sd_cloud_api::feedback::send( + node.cloud_api_config().await, + args.message, + args.emoji, ) - .await - .json(&args) - .send() - .await - .map_err(|_| { - rspc::Error::new( - rspc::ErrorCode::InternalServerError, - "Request failed".to_string(), - ) - }) - .and_then(ensure_response)?; + .await?; Ok(()) } diff --git a/core/src/cloud/sync/ingest.rs b/core/src/cloud/sync/ingest.rs index 10eef8531..ab59d2377 100644 --- a/core/src/cloud/sync/ingest.rs +++ b/core/src/cloud/sync/ingest.rs @@ -3,6 +3,7 @@ use crate::cloud::sync::err_return; use std::sync::Arc; use tokio::sync::Notify; +use tracing::info; use super::Library; @@ -39,6 +40,8 @@ pub async fn run_actor((library, notify): (Arc, Arc)) { .await ); + info!("Got {} cloud ops to ingest", ops.len()); + err_return!( sync.ingest .event_tx diff --git a/core/src/cloud/sync/mod.rs b/core/src/cloud/sync/mod.rs index f26cceba6..dac42b6c3 100644 --- a/core/src/cloud/sync/mod.rs +++ b/core/src/cloud/sync/mod.rs @@ -1,4 +1,8 @@ use crate::{library::Library, Node}; +use sd_sync::*; +use serde::{Deserialize, Serialize}; +use serde_json::Value; +use uuid::Uuid; use std::sync::{atomic, Arc}; @@ -63,3 +67,108 @@ macro_rules! err_return { pub(crate) use err_return; use tokio::sync::Notify; + +pub type CompressedCRDTOperationsForModel = Vec<(Value, Vec)>; + +#[derive(Serialize, Deserialize)] +pub struct CompressedCRDTOperations(Vec<(Uuid, Vec<(String, CompressedCRDTOperationsForModel)>)>); + +impl CompressedCRDTOperations { + pub fn new(ops: Vec) -> Self { + let mut compressed = vec![]; + + let mut ops_iter = ops.into_iter(); + + let Some(first) = ops_iter.next() else { + return Self(vec![]); + }; + + let mut instance_id = first.instance; + let mut instance = vec![]; + + let mut model_str = first.model.clone(); + let mut model = vec![]; + + let mut record_id = first.record_id.clone(); + let mut record = vec![first.into()]; + + for op in ops_iter { + if instance_id != op.instance { + model.push(( + std::mem::replace(&mut record_id, op.record_id.clone()), + std::mem::take(&mut record), + )); + instance.push(( + std::mem::replace(&mut model_str, op.model.clone()), + std::mem::take(&mut model), + )); + compressed.push(( + std::mem::replace(&mut instance_id, op.instance), + std::mem::take(&mut instance), + )); + } else if model_str != op.model { + model.push(( + std::mem::replace(&mut record_id, op.record_id.clone()), + std::mem::take(&mut record), + )); + instance.push(( + std::mem::replace(&mut model_str, op.model.clone()), + std::mem::take(&mut model), + )); + } else if record_id != op.record_id { + model.push(( + std::mem::replace(&mut record_id, op.record_id.clone()), + std::mem::take(&mut record), + )); + } + + record.push(CompressedCRDTOperation::from(op)) + } + + model.push((record_id, record)); + instance.push((model_str, model)); + compressed.push((instance_id, instance)); + + Self(compressed) + } + + pub fn into_ops(self) -> Vec { + let mut ops = vec![]; + + for (instance_id, instance) in self.0 { + for (model_str, model) in instance { + for (record_id, record) in model { + for op in record { + ops.push(CRDTOperation { + instance: instance_id, + model: model_str.clone(), + record_id: record_id.clone(), + timestamp: op.timestamp, + id: op.id, + data: op.data, + }) + } + } + } + } + + ops + } +} + +#[derive(PartialEq, Eq, Serialize, Deserialize, Clone)] +pub struct CompressedCRDTOperation { + pub timestamp: NTP64, + pub id: Uuid, + pub data: CRDTOperationData, +} + +impl From for CompressedCRDTOperation { + fn from(value: CRDTOperation) -> Self { + Self { + timestamp: value.timestamp, + id: value.id, + data: value.data, + } + } +} diff --git a/core/src/cloud/sync/receive.rs b/core/src/cloud/sync/receive.rs index 13b8d2b89..68c5f2dd6 100644 --- a/core/src/cloud/sync/receive.rs +++ b/core/src/cloud/sync/receive.rs @@ -1,13 +1,14 @@ use crate::{ - cloud::sync::{err_break, err_return}, + cloud::sync::{err_break, err_return, CompressedCRDTOperations}, library::Library, Node, }; use sd_core_sync::NTP64; use sd_prisma::prisma::{cloud_crdt_operation, instance, PrismaClient, SortOrder}; -use sd_sync::*; +use sd_sync::CRDTOperation; use sd_utils::{from_bytes_to_uuid, uuid_to_bytes}; +use tracing::info; use std::{ collections::{hash_map::Entry, HashMap}, @@ -17,14 +18,12 @@ use std::{ use base64::prelude::*; use chrono::Utc; -use serde::Deserialize; -use serde_json::{json, to_vec}; +use serde_json::to_vec; use tokio::{sync::Notify, time::sleep}; use uuid::Uuid; pub async fn run_actor((library, node, ingest_notify): (Arc, Arc, Arc)) { let db = &library.db; - let api_url = &library.env.api_url; let library_id = library.id; let mut cloud_timestamps = { @@ -57,53 +56,49 @@ pub async fn run_actor((library, node, ingest_notify): (Arc, Arc, .collect::>() }; + info!( + "Fetched timestamps for {} local instances", + cloud_timestamps.len() + ); + loop { - let instances = { - err_break!( - db.instance() - .find_many(vec![]) - .select(instance::select!({ pub_id })) - .exec() - .await - ) - .into_iter() - .map(|i| { - let uuid = from_bytes_to_uuid(&i.pub_id); - - json!({ - "instanceUuid": uuid, - "fromTime": cloud_timestamps.get(&uuid).cloned().unwrap_or_default().as_u64().to_string() - }) - }) - .collect::>() - }; - - #[derive(Deserialize, Debug)] - #[serde(rename_all = "camelCase")] - struct MessageCollection { - instance_uuid: Uuid, - // start_time: String, - end_time: String, - contents: String, - } + let instances = err_break!( + db.instance() + .find_many(vec![]) + .select(instance::select!({ pub_id })) + .exec() + .await + ); { - let collections = node - .authed_api_request( - node.http - .post(&format!( - "{api_url}/api/v1/libraries/{library_id}/messageCollections/get" - )) - .json(&json!({ - "instanceUuid": library.instance_uuid, - "timestamps": instances - })), + let collections = { + use sd_cloud_api::library::message_collections; + message_collections::get( + node.cloud_api_config().await, + library_id, + library.instance_uuid, + instances + .into_iter() + .map(|i| { + let uuid = from_bytes_to_uuid(&i.pub_id); + + message_collections::get::InstanceTimestamp { + instance_uuid: uuid, + from_time: cloud_timestamps + .get(&uuid) + .cloned() + .unwrap_or_default() + .as_u64() + .to_string(), + } + }) + .collect::>(), ) .await - .expect("couldn't get response") - .json::>() - .await - .expect("couldn't deserialize response"); + }; + let collections = err_break!(collections); + + info!("Received {} collections", collections.len()); let mut cloud_library_data: Option> = None; @@ -152,15 +147,12 @@ pub async fn run_actor((library, node, ingest_notify): (Arc, Arc, e.insert(NTP64(0)); } - err_break!( - write_cloud_ops_to_db( - err_break!(serde_json::from_slice(err_break!( - &BASE64_STANDARD.decode(collection.contents) - ))), - db - ) - .await - ); + let compressed_operations: CompressedCRDTOperations = + err_break!(serde_json::from_slice(err_break!( + &BASE64_STANDARD.decode(collection.contents) + ))); + + err_break!(write_cloud_ops_to_db(compressed_operations.into_ops(), db).await); let collection_timestamp = NTP64(collection.end_time.parse().expect("unable to parse time")); @@ -196,7 +188,7 @@ fn crdt_op_db(op: &CRDTOperation) -> cloud_crdt_operation::Create { id: op.id.as_bytes().to_vec(), timestamp: op.timestamp.0 as i64, instance: instance::pub_id::equals(op.instance.as_bytes().to_vec()), - kind: op.kind().to_string(), + kind: op.data.as_kind().to_string(), data: to_vec(&op.data).expect("unable to serialize data"), model: op.model.to_string(), record_id: to_vec(&op.record_id).expect("unable to serialize record id"), diff --git a/core/src/cloud/sync/send.rs b/core/src/cloud/sync/send.rs index d1b64885c..c542f20cd 100644 --- a/core/src/cloud/sync/send.rs +++ b/core/src/cloud/sync/send.rs @@ -1,4 +1,4 @@ -use crate::{cloud::sync::err_break, Node}; +use crate::{cloud::sync::CompressedCRDTOperations, Node}; use sd_core_sync::{GetOpsArgs, SyncMessage, NTP64}; use sd_prisma::prisma::instance; @@ -6,16 +6,12 @@ use sd_utils::from_bytes_to_uuid; use std::{sync::Arc, time::Duration}; -use serde::Deserialize; -use serde_json::json; use tokio::time::sleep; -use uuid::Uuid; -use super::Library; +use super::{err_break, Library}; pub async fn run_actor((library, node): (Arc, Arc)) { let db = &library.db; - let api_url = &library.env.api_url; let library_id = library.id; loop { @@ -28,35 +24,22 @@ pub async fn run_actor((library, node): (Arc, Arc)) { .await ) .into_iter() - .map(|i| json!({ "instanceUuid": from_bytes_to_uuid(&i.pub_id).to_string() })) + .map(|i| from_bytes_to_uuid(&i.pub_id)) .collect::>(); - #[derive(Deserialize, Debug)] - #[serde(rename_all = "camelCase")] - struct RequestAdd { - instance_uuid: Uuid, - from_time: Option, - // mutex key on the instance - key: String, - } - let req_adds = err_break!( - err_break!( - node.authed_api_request( - node.http - .post(&format!( - "{api_url}/api/v1/libraries/{library_id}/messageCollections/requestAdd" - )) - .json(&json!({ "instances": instances })), - ) - .await + sd_cloud_api::library::message_collections::request_add( + node.cloud_api_config().await, + library_id, + instances, ) - .json::>() .await ); let mut instances = vec![]; + use sd_cloud_api::library::message_collections::do_add; + for req_add in req_adds { let ops = err_break!( library @@ -84,49 +67,21 @@ pub async fn run_actor((library, node): (Arc, Arc)) { let start_time = ops[0].timestamp.0.to_string(); let end_time = ops[ops.len() - 1].timestamp.0.to_string(); - instances.push(json!({ - "uuid": req_add.instance_uuid, - "key": req_add.key, - "startTime": start_time, - "endTime": end_time, - "contents": ops, - })) + instances.push(do_add::Input { + uuid: req_add.instance_uuid, + key: req_add.key, + start_time, + end_time, + contents: serde_json::to_value(CompressedCRDTOperations::new(ops)) + .expect("CompressedCRDTOperation should serialize!"), + }) } - tracing::debug!("Number of instances: {}", instances.len()); - tracing::debug!( - "Number of messages: {}", - instances - .iter() - .map(|i| i["contents"].as_array().expect("no contents found").len()) - .sum::() - ); - if instances.is_empty() { break; } - #[derive(Deserialize, Debug)] - #[serde(rename_all = "camelCase")] - struct DoAdd { - // instance_uuid: Uuid, - // from_time: String, - } - - let _responses = err_break!( - err_break!( - node.authed_api_request( - node.http - .post(&format!( - "{api_url}/api/v1/libraries/{library_id}/messageCollections/doAdd", - )) - .json(&json!({ "instances": instances })), - ) - .await - ) - .json::>() - .await - ); + err_break!(do_add(node.cloud_api_config().await, library_id, instances,).await); } { diff --git a/core/src/env.rs b/core/src/env.rs index c031bbd9b..426c8fca9 100644 --- a/core/src/env.rs +++ b/core/src/env.rs @@ -1,4 +1,15 @@ +use tokio::sync::Mutex; + pub struct Env { - pub api_url: String, + pub api_url: Mutex, pub client_id: String, } + +impl Env { + pub fn new(client_id: &str) -> Self { + Self { + api_url: Mutex::new("https://app.spacedrive.com".to_string()), + client_id: client_id.to_string(), + } + } +} diff --git a/core/src/lib.rs b/core/src/lib.rs index af65c522c..552a61ef8 100644 --- a/core/src/lib.rs +++ b/core/src/lib.rs @@ -291,7 +291,7 @@ impl Node { pub async fn cloud_api_config(&self) -> sd_cloud_api::RequestConfig { sd_cloud_api::RequestConfig { client: self.http.clone(), - api_url: self.env.api_url.clone(), + api_url: self.env.api_url.lock().await.clone(), auth_token: self.config.get().await.auth_token, } } diff --git a/core/src/library/actors.rs b/core/src/library/actors.rs index 7e659802b..01fc33f15 100644 --- a/core/src/library/actors.rs +++ b/core/src/library/actors.rs @@ -23,9 +23,7 @@ impl Actors { actor_fn: impl FnOnce() -> F + Send + Sync + Clone + 'static, autostart: bool, ) { - let mut actors = self.actors.lock().await; - - actors.insert( + self.actors.lock().await.insert( name.to_string(), Arc::new(Actor { abort_handle: Default::default(), diff --git a/core/src/location/non_indexed.rs b/core/src/location/non_indexed.rs index 90b360110..30c3fb394 100644 --- a/core/src/location/non_indexed.rs +++ b/core/src/location/non_indexed.rs @@ -362,9 +362,9 @@ impl Entry { pub async fn get_all_entries(path: PathBuf) -> Result, NonIndexedLocationError> { tokio::task::spawn_blocking(move || { let path = &path; - let mut dir = std::fs::read_dir(&path).map_err(|e| (path, e))?; + let dir = std::fs::read_dir(path).map_err(|e| (path, e))?; let mut entries = Vec::new(); - while let Some(entry) = dir.next() { + for entry in dir { let entry = entry.map_err(|e| (path, e))?; // We must not keep `entry` around as we will quickly hit the OS limit on open file descriptors diff --git a/core/src/object/orphan_remover.rs b/core/src/object/orphan_remover.rs index cd7faaefc..9538c417e 100644 --- a/core/src/object/orphan_remover.rs +++ b/core/src/object/orphan_remover.rs @@ -90,6 +90,7 @@ impl OrphanRemoverActor { .await { error!("Failed to remove orphaned objects: {e:#?}"); + break; } } } diff --git a/core/src/util/batched_stream.rs b/core/src/util/batched_stream.rs index 3e301d967..1cd350840 100644 --- a/core/src/util/batched_stream.rs +++ b/core/src/util/batched_stream.rs @@ -58,10 +58,10 @@ impl Stream for BatchedStream { Poll::Pending } else { let batch = std::mem::take(batch); - return Poll::Ready(Some(batch)); + Poll::Ready(Some(batch)) } } - BatchedStreamProj::Complete => return Poll::Ready(None), + BatchedStreamProj::Complete => Poll::Ready(None), } } } diff --git a/core/src/util/http.rs b/core/src/util/http.rs deleted file mode 100644 index 5cad9cd69..000000000 --- a/core/src/util/http.rs +++ /dev/null @@ -1,6 +0,0 @@ -use reqwest::Response; - -pub fn ensure_response(resp: Response) -> Result { - resp.error_for_status() - .map_err(|e| rspc::Error::new(rspc::ErrorCode::InternalServerError, e.to_string())) -} diff --git a/core/src/util/mod.rs b/core/src/util/mod.rs index b28158fad..e8096c65b 100644 --- a/core/src/util/mod.rs +++ b/core/src/util/mod.rs @@ -2,7 +2,6 @@ mod abort_on_drop; mod batched_stream; #[cfg(debug_assertions)] pub mod debug_initializer; -pub mod http; mod infallible_request; mod maybe_undefined; pub mod mpscrr; diff --git a/crates/cloud-api/src/lib.rs b/crates/cloud-api/src/lib.rs index e598e893a..eae0f61c5 100644 --- a/crates/cloud-api/src/lib.rs +++ b/crates/cloud-api/src/lib.rs @@ -41,6 +41,16 @@ pub struct Instance { pub identity: String, } +#[derive(Serialize, Deserialize, Debug, Type)] +#[serde(rename_all = "camelCase")] +#[specta(rename = "CloudMessageCollection")] +pub struct MessageCollection { + pub instance_uuid: Uuid, + pub start_time: String, + pub end_time: String, + pub contents: String, +} + trait WithAuth { fn with_auth(self, token: OAuthToken) -> Self; } @@ -54,6 +64,69 @@ impl WithAuth for reqwest::RequestBuilder { } } +pub mod feedback { + use super::*; + + pub use send::exec as send; + pub mod send { + use super::*; + + pub async fn exec(config: RequestConfig, message: String, emoji: u8) -> Result<(), Error> { + let mut req = config + .client + .post(format!("{}/api/v1/feedback", config.api_url)) + .json(&json!({ + "message": message, + "emoji": emoji, + })); + + if let Some(auth_token) = config.auth_token { + req = req.with_auth(auth_token); + } + + req.send() + .await + .and_then(|r| r.error_for_status()) + .map_err(|e| Error(e.to_string()))?; + + Ok(()) + } + } +} + +pub mod user { + use super::*; + + pub use me::exec as me; + pub mod me { + use super::*; + + #[derive(Serialize, Deserialize, Type)] + #[specta(inline)] + pub struct Response { + id: String, + email: String, + } + + pub async fn exec(config: RequestConfig) -> Result { + let Some(auth_token) = config.auth_token else { + return Err(Error("Authentication required".to_string())); + }; + + config + .client + .get(&format!("{}/api/v1/user/me", config.api_url)) + .with_auth(auth_token) + .send() + .await + .map_err(|e| Error(e.to_string()))? + .json() + .await + .map_err(|e| Error(e.to_string())) + } + } +} + pub mod library { use super::*; @@ -182,4 +255,255 @@ pub mod library { Ok(()) } } + + pub mod message_collections { + use super::*; + + pub use get::exec as get; + pub mod get { + use super::*; + + #[derive(Serialize)] + #[serde(rename_all = "camelCase")] + pub struct InstanceTimestamp { + pub instance_uuid: Uuid, + pub from_time: String, + } + + pub async fn exec( + config: RequestConfig, + library_id: Uuid, + this_instance_uuid: Uuid, + timestamps: Vec, + ) -> Result { + let Some(auth_token) = config.auth_token else { + return Err(Error("Authentication required".to_string())); + }; + + config + .client + .post(&format!( + "{}/api/v1/libraries/{}/messageCollections/get", + config.api_url, library_id + )) + .json(&json!({ + "instanceUuid": this_instance_uuid, + "timestamps": timestamps + })) + .with_auth(auth_token) + .send() + .await + .map_err(|e| Error(e.to_string()))? + .json() + .await + .map_err(|e| Error(e.to_string())) + } + + pub type Response = Vec; + } + + pub use request_add::exec as request_add; + pub mod request_add { + use super::*; + + #[derive(Deserialize, Debug)] + #[serde(rename_all = "camelCase")] + pub struct RequestAdd { + pub instance_uuid: Uuid, + pub from_time: Option, + // mutex key on the instance + pub key: String, + } + + pub async fn exec( + config: RequestConfig, + library_id: Uuid, + instances: Vec, + ) -> Result { + let Some(auth_token) = config.auth_token else { + return Err(Error("Authentication required".to_string())); + }; + + let instances = instances + .into_iter() + .map(|i| json!({"instanceUuid": i })) + .collect::>(); + + config + .client + .post(&format!( + "{}/api/v1/libraries/{}/messageCollections/requestAdd", + config.api_url, library_id + )) + .json(&json!({ "instances": instances })) + .with_auth(auth_token) + .send() + .await + .map_err(|e| Error(e.to_string()))? + .json() + .await + .map_err(|e| Error(e.to_string())) + } + + pub type Response = Vec; + } + + pub use do_add::exec as do_add; + pub mod do_add { + use super::*; + + #[derive(Serialize, Debug)] + #[serde(rename_all = "camelCase")] + pub struct Input { + pub uuid: Uuid, + pub key: String, + pub start_time: String, + pub end_time: String, + pub contents: serde_json::Value, + } + + pub async fn exec( + config: RequestConfig, + library_id: Uuid, + instances: Vec, + ) -> Result<(), Error> { + let Some(auth_token) = config.auth_token else { + return Err(Error("Authentication required".to_string())); + }; + + config + .client + .post(&format!( + "{}/api/v1/libraries/{}/messageCollections/requestAdd", + config.api_url, library_id + )) + .json(&json!({ "instances": instances })) + .with_auth(auth_token) + .send() + .await + .and_then(|r| r.error_for_status()) + .map_err(|e| Error(e.to_string()))?; + + Ok(()) + } + } + } +} + +#[derive(Type, Serialize, Deserialize)] +pub struct CloudLocation { + id: String, + name: String, +} + +pub mod locations { + use super::*; + + pub use list::exec as list; + pub mod list { + use super::*; + + pub async fn exec(config: RequestConfig) -> Result { + let Some(auth_token) = config.auth_token else { + return Err(Error("Authentication required".to_string())); + }; + + config + .client + .get(&format!("{}/api/v1/locations", config.api_url)) + .with_auth(auth_token) + .send() + .await + .map_err(|e| Error(e.to_string()))? + .json() + .await + .map_err(|e| Error(e.to_string())) + } + + pub type Response = Vec; + } + + pub use create::exec as create; + pub mod create { + use super::*; + + pub async fn exec(config: RequestConfig, name: String) -> Result { + let Some(auth_token) = config.auth_token else { + return Err(Error("Authentication required".to_string())); + }; + + config + .client + .post(&format!("{}/api/v1/locations", config.api_url)) + .json(&json!({ + "name": name, + })) + .with_auth(auth_token) + .send() + .await + .map_err(|e| Error(e.to_string()))? + .json() + .await + .map_err(|e| Error(e.to_string())) + } + + pub type Response = CloudLocation; + } + + pub use remove::exec as remove; + pub mod remove { + use super::*; + + pub async fn exec(config: RequestConfig, id: String) -> Result { + let Some(auth_token) = config.auth_token else { + return Err(Error("Authentication required".to_string())); + }; + + config + .client + .post(&format!("{}/api/v1/locations/delete", config.api_url)) + .json(&json!({ + "id": id, + })) + .with_auth(auth_token) + .send() + .await + .map_err(|e| Error(e.to_string()))? + .json() + .await + .map_err(|e| Error(e.to_string())) + } + + pub type Response = CloudLocation; + } + + pub use authorise::exec as authorise; + pub mod authorise { + use super::*; + + pub async fn exec(config: RequestConfig, id: String) -> Result { + let Some(auth_token) = config.auth_token else { + return Err(Error("Authentication required".to_string())); + }; + + config + .client + .post(&format!("{}/api/v1/locations/authorise", config.api_url)) + .json(&json!({ "id": id })) + .with_auth(auth_token) + .send() + .await + .map_err(|e| Error(e.to_string()))? + .json() + .await + .map_err(|e| Error(e.to_string())) + } + + #[derive(Debug, Clone, Type, Deserialize)] + pub struct Response { + pub access_key_id: String, + pub secret_access_key: String, + pub session_token: String, + } + } } diff --git a/crates/sync/src/compressed.rs b/crates/sync/src/compressed.rs new file mode 100644 index 000000000..0b68b8b2e --- /dev/null +++ b/crates/sync/src/compressed.rs @@ -0,0 +1,84 @@ +#[derive(Serialize, Deserialize)] +pub struct CompressedCRDTOperations( + Vec<( + Uuid, + Vec<(String, Vec<(Value, Vec)>)>, + )>, +); + +impl CompressedCRDTOperations { + pub fn new(ops: Vec) -> Self { + let mut compressed = vec![]; + + let mut ops_iter = ops.into_iter(); + + let Some(first) = ops_iter.next() else { + return Self(vec![]); + }; + + let mut instance_id = first.instance; + let mut instance = vec![]; + + let mut model_str = first.model.clone(); + let mut model = vec![]; + + let mut record_id = first.record_id.clone(); + let mut record = vec![first.into()]; + + for op in ops_iter { + if instance_id != op.instance { + model.push(( + std::mem::replace(&mut record_id, op.record_id), + std::mem::take(&mut record), + )); + instance.push(( + std::mem::replace(&mut model_str, op.model), + std::mem::take(&mut model), + )); + compressed.push(( + std::mem::replace(&mut instance_id, op.instance), + std::mem::take(&mut instance), + )); + } else if model_str != op.model { + model.push(( + std::mem::replace(&mut record_id, op.record_id), + std::mem::take(&mut record), + )); + instance.push(( + std::mem::replace(&mut model_str, op.model), + std::mem::take(&mut model), + )); + } else if record_id != op.record_id { + model.push(( + std::mem::replace(&mut record_id, op.record_id), + std::mem::take(&mut record), + )); + } + + record.push(CompressedCRDTOperation::from(op)) + } + + model.push((record_id, record)); + instance.push((model_str, model)); + compressed.push((instance_id, instance)); + + Self(compressed) + } +} + +#[derive(PartialEq, Eq, Serialize, Deserialize, Clone)] +pub struct CompressedCRDTOperation { + pub timestamp: NTP64, + pub id: Uuid, + pub data: CRDTOperationData, +} + +impl From for CompressedCRDTOperation { + fn from(value: CRDTOperation) -> Self { + Self { + timestamp: value.timestamp, + id: value.id, + data: value.data, + } + } +} diff --git a/crates/sync/src/crdt.rs b/crates/sync/src/crdt.rs index 38677d0bd..97822e691 100644 --- a/crates/sync/src/crdt.rs +++ b/crates/sync/src/crdt.rs @@ -33,7 +33,7 @@ pub enum CRDTOperationData { } impl CRDTOperationData { - fn as_kind(&self) -> OperationKind { + pub fn as_kind(&self) -> OperationKind { match self { Self::Create => OperationKind::Create, Self::Update { field, .. } => OperationKind::Update(field), diff --git a/interface/app/$libraryId/Layout/Sidebar/DebugPopover.tsx b/interface/app/$libraryId/Layout/Sidebar/DebugPopover.tsx index 77b71ff0e..8e8e45c68 100644 --- a/interface/app/$libraryId/Layout/Sidebar/DebugPopover.tsx +++ b/interface/app/$libraryId/Layout/Sidebar/DebugPopover.tsx @@ -1,9 +1,12 @@ import { CheckSquare } from '@phosphor-icons/react'; +import { useQueryClient } from '@tanstack/react-query'; import { useNavigate } from 'react-router'; import { + auth, backendFeatures, features, toggleFeatureFlag, + useBridgeMutation, useBridgeQuery, useDebugState, useFeatureFlags, @@ -43,6 +46,10 @@ export default () => { } >
+ + + + // ); // } + +function CloudOriginSelect() { + const origin = useBridgeQuery(['cloud.getApiOrigin']); + const setOrigin = useBridgeMutation(['cloud.setApiOrigin']); + + const queryClient = useQueryClient(); + + return ( + <> + {origin.data && ( + + )} + + ); +} diff --git a/interface/app/$libraryId/Layout/index.tsx b/interface/app/$libraryId/Layout/index.tsx index a1536ed20..3e64be336 100644 --- a/interface/app/$libraryId/Layout/index.tsx +++ b/interface/app/$libraryId/Layout/index.tsx @@ -13,6 +13,7 @@ import { } from '@sd/client'; import { useRootContext } from '~/app/RootContext'; import { LibraryIdParamsSchema } from '~/app/route-schemas'; +import ErrorFallback, { BetterErrorBoundary } from '~/ErrorFallback'; import { useKeybindEventHandler, useOperatingSystem, @@ -79,22 +80,24 @@ const Layout = () => { showControls.transparentBg ? 'bg-app/80' : 'bg-app' )} > - {library ? ( - - - } - > - - - - - - ) : ( -

- Please select or create a library in the sidebar. -

- )} + + {library ? ( + + + } + > + + + + + + ) : ( +

+ Please select or create a library in the sidebar. +

+ )} +
diff --git a/interface/app/$libraryId/debug/cloud.tsx b/interface/app/$libraryId/debug/cloud.tsx index 37abad3cc..66b2010c2 100644 --- a/interface/app/$libraryId/debug/cloud.tsx +++ b/interface/app/$libraryId/debug/cloud.tsx @@ -9,15 +9,17 @@ export const Component = () => { const authState = auth.useStateSnapshot(); - if (authState.status === 'loggedIn') return ; - if (authState.status === 'notLoggedIn' || authState.status === 'loggingIn') - return ( -
- -
- ); + const authSensitiveChild = () => { + if (authState.status === 'loggedIn') return ; + if (authState.status === 'notLoggedIn' || authState.status === 'loggingIn') + return ; - return null; + return null; + }; + + return ( +
{authSensitiveChild()}
+ ); }; function Authenticated() { @@ -32,7 +34,7 @@ function Authenticated() { ); return ( -
+ <> {cloudLibrary.data ? (
@@ -77,6 +79,6 @@ function Authenticated() {
)} -
+ ); } diff --git a/interface/app/$libraryId/settings/node/libraries/CreateDialog.tsx b/interface/app/$libraryId/settings/node/libraries/CreateDialog.tsx index 065a59a23..c1af377c1 100644 --- a/interface/app/$libraryId/settings/node/libraries/CreateDialog.tsx +++ b/interface/app/$libraryId/settings/node/libraries/CreateDialog.tsx @@ -49,7 +49,7 @@ export default (props: UseDialogProps) => { event: { type: 'libraryCreate' } }); - platform.refreshMenuBar && platform.refreshMenuBar(); + platform.refreshMenuBar?.(); navigate(`/${library.uuid}`); } catch (e) { diff --git a/packages/client/src/core.ts b/packages/client/src/core.ts index b82a49b97..06711775d 100644 --- a/packages/client/src/core.ts +++ b/packages/client/src/core.ts @@ -6,6 +6,7 @@ export type Procedures = { { key: "auth.me", input: never, result: { id: string; email: string } } | { key: "backups.getAll", input: never, result: GetAll } | { key: "buildInfo", input: never, result: BuildInfo } | + { key: "cloud.getApiOrigin", input: never, result: string } | { key: "cloud.library.get", input: LibraryArgs, result: { uuid: string; name: string; instances: CloudInstance[]; ownerId: string } | null } | { key: "cloud.library.list", input: never, result: CloudLibrary[] } | { key: "cloud.locations.list", input: never, result: CloudLocation[] } | @@ -58,8 +59,9 @@ export type Procedures = { { key: "cloud.library.create", input: LibraryArgs, result: null } | { key: "cloud.library.join", input: string, result: LibraryConfigWrapped } | { key: "cloud.locations.create", input: string, result: CloudLocation } | - { key: "cloud.locations.remove", input: string, result: null } | + { key: "cloud.locations.remove", input: string, result: CloudLocation } | { key: "cloud.locations.testing", input: TestingParams, result: null } | + { key: "cloud.setApiOrigin", input: string, result: null } | { key: "ephemeralFiles.copyFiles", input: LibraryArgs, result: null } | { key: "ephemeralFiles.createFolder", input: LibraryArgs, result: string } | { key: "ephemeralFiles.cutFiles", input: LibraryArgs, result: null } | @@ -529,7 +531,7 @@ export type RescanArgs = { location_id: number; sub_path: string } export type Resolution = { width: number; height: number } -export type Response = { Start: { user_code: string; verification_url: string; verification_url_complete: string } } | "Complete" | "Error" +export type Response = { Start: { user_code: string; verification_url: string; verification_url_complete: string } } | "Complete" | { Error: string } export type RuleKind = "AcceptFilesByGlob" | "RejectFilesByGlob" | "AcceptIfChildrenDirectoriesArePresent" | "RejectIfChildrenDirectoriesArePresent" diff --git a/packages/client/src/stores/auth.ts b/packages/client/src/stores/auth.ts index 7fcbb8a1d..81bd7c425 100644 --- a/packages/client/src/stores/auth.ts +++ b/packages/client/src/stores/auth.ts @@ -34,11 +34,11 @@ nonLibraryClient store.state = { status: 'notLoggedIn' }; }); -type CallbackStatus = 'success' | 'error' | 'cancel'; +type CallbackStatus = 'success' | { error: string } | 'cancel'; const loginCallbacks = new Set<(status: CallbackStatus) => void>(); -function onError() { - loginCallbacks.forEach((cb) => cb('error')); +function onError(error: string) { + loginCallbacks.forEach((cb) => cb({ error })); } export function login(config: ProviderConfig) { @@ -51,12 +51,14 @@ export function login(config: ProviderConfig) { if (data === 'Complete') { config.finish?.(authCleanup); loginCallbacks.forEach((cb) => cb('success')); - } else if (data === 'Error') onError(); + } else if ('Error' in data) onError(data.Error); else { authCleanup = config.start(data.Start.verification_url_complete); } }, - onError + onError(e) { + onError(e.message); + } }); return new Promise((res, rej) => { @@ -69,7 +71,7 @@ export function login(config: ProviderConfig) { res(); } else { store.state = { status: 'notLoggedIn' }; - rej(); + rej(JSON.stringify(status)); } }; loginCallbacks.add(cb);