From ef12e5d1c0cdfa1c2c6b4c56ef4ebcfc5b9f6627 Mon Sep 17 00:00:00 2001 From: Oscar Beaumont Date: Thu, 29 Feb 2024 15:46:30 +0800 Subject: [PATCH] Everything over P2P (#2135) * Fix websockets over P2P + multi-node cache * cleanup features * Explorer over P2P demo * wip * wip * working for ephemeral stuff * Button to get out of remote viewer * remove * Cleanup * more cleanup * Fix cross-node library queries * Files over P2P * Drop `Header::File` * Fix Clippy * Don't let noobs get hacked --- Cargo.lock | Bin 263854 -> 264175 bytes Cargo.toml | 2 +- apps/desktop/src/platform.ts | 13 +- apps/web/src/App.tsx | 8 +- core/Cargo.toml | 2 +- core/src/custom_uri/mod.rs | 203 ++++++++------- core/src/custom_uri/utils.rs | 9 +- core/src/lib.rs | 22 +- core/src/p2p/manager.rs | 8 - core/src/p2p/operations/mod.rs | 2 - core/src/p2p/operations/request_file.rs | 242 ------------------ core/src/p2p/operations/rspc.rs | 6 + core/src/p2p/protocol.rs | 59 +---- crates/p2p2/src/quic/transport.rs | 2 - interface/app/$libraryId/debug/p2p-rspc.tsx | 61 +---- interface/app/$libraryId/ephemeral.tsx | 1 + interface/app/index.tsx | 217 +++++++++++++++- interface/util/Platform.tsx | 1 + packages/client/src/cache.tsx | 35 ++- .../client/src/hooks/useClientContext.tsx | 7 +- packages/client/src/rspc.tsx | 18 +- 21 files changed, 409 insertions(+), 509 deletions(-) delete mode 100644 core/src/p2p/operations/request_file.rs diff --git a/Cargo.lock b/Cargo.lock index 57150f29f2b6ca8281156511d7b7173696b816b5..61c38518875d25d60536925fbd4497d97f440426 100644 GIT binary patch delta 175 zcmZ42EAYNwU_+|!^k^3QaEe{mGn>XgIy_CX@K|X*ZepMN3Na(u+${^D;|P zm2|iiluGi;Q;Q})@V99`?bm+Vj}eHOwx9N6<`SH4SH+xbXJlxUVw#eiWRz%_oN8c} zVqs`#V4iH8Vvv@WXl!U?n3ig3ZjxkfoMvEVk!)dPVU(O?m~3ohYy?zjoHTi%l=$=q ae2g5E-`R0a-*+9zn4zw}{d_$0{bc|WLp{C# delta 54 zcmaFgFR-pxU_+|!=5k+d#_20=G6_ze constructServerUrl(`/local-file-by-path/${encodeURIComponent(path)}`), getRemoteRspcEndpoint: (remote_identity) => ({ - url: `${customUriServerUrl?.[0]}/remote/${encodeURIComponent(remote_identity)}/rspc`, - headers: { - authorization: `Bearer ${customUriAuthToken}` - } + url: `${customUriServerUrl?.[0] + ?.replace('https', 'wss') + ?.replace('http', 'ws')}/remote/${encodeURIComponent( + remote_identity + )}/rspc/ws?token=${customUriAuthToken}` }), + constructRemoteRspcPath: (remote_identity, path) => + constructServerUrl( + `/remote/${encodeURIComponent(remote_identity)}/uri/${path}?token=${customUriAuthToken}` + ), openLink: shell.open, getOs, openDirectoryPickerDialog: (opts) => { diff --git a/apps/web/src/App.tsx b/apps/web/src/App.tsx index e4492e238..5e669c424 100644 --- a/apps/web/src/App.tsx +++ b/apps/web/src/App.tsx @@ -50,8 +50,12 @@ const platform: Platform = { )}/${encodeURIComponent(filePathId)}`, getFileUrlByPath: (path) => `${spacedriveURL}/local-file-by-path/${encodeURIComponent(path)}`, getRemoteRspcEndpoint: (remote_identity) => ({ - url: `${spacedriveURL}/remote/${encodeURIComponent(remote_identity)}/rspc` + url: `${spacedriveURL + .replace('https', 'wss') + .replace('http', 'ws')}/remote/${encodeURIComponent(remote_identity)}/rspc/ws` }), + constructRemoteRspcPath: (remote_identity, path) => + `${spacedriveURL}/remote/${encodeURIComponent(remote_identity)}/uri/${path}`, openLink: (url) => window.open(url, '_blank')?.focus(), confirm: (message, cb) => cb(window.confirm(message)), auth: { @@ -151,7 +155,7 @@ function useRouter() { event.historyAction === 'PUSH' ? currentIndex : // sometimes the max index is 0 when the current index is > 0, like when reloading the page -_- - Math.max(router.maxIndex, currentIndex) + Math.max(router.maxIndex, currentIndex) }; }); }); diff --git a/core/Cargo.toml b/core/Cargo.toml index 08aa83ebd..8b16e2b92 100644 --- a/core/Cargo.toml +++ b/core/Cargo.toml @@ -52,7 +52,7 @@ sd-cloud-api = { version = "0.1.0", path = "../crates/cloud-api" } # Workspace dependencies async-channel = { workspace = true } async-trait = { workspace = true } -axum = { workspace = true } +axum = { workspace = true, features = ["ws"] } base64 = { workspace = true } blake3 = { workspace = true } chrono = { workspace = true, features = ["serde"] } diff --git a/core/src/custom_uri/mod.rs b/core/src/custom_uri/mod.rs index 29811893d..a9cad8a07 100644 --- a/core/src/custom_uri/mod.rs +++ b/core/src/custom_uri/mod.rs @@ -7,10 +7,10 @@ use crate::{ Node, }; +use hyper::{header, upgrade::OnUpgrade}; use sd_file_ext::text::is_text; use sd_file_path_helper::{file_path_to_handle_custom_uri, IsolatedFilePathData}; use sd_p2p2::{IdentityOrRemoteIdentity, RemoteIdentity}; -use sd_p2p_block::Range; use sd_prisma::prisma::{file_path, location}; use sd_utils::db::maybe_missing; @@ -21,12 +21,11 @@ use std::{ fs::Metadata, path::{Path, PathBuf}, str::FromStr, - sync::{atomic::Ordering, Arc}, + sync::Arc, }; -use async_stream::stream; use axum::{ - body::{self, Body, BoxBody, Full, StreamBody}, + body::{self, Body, BoxBody, Full}, extract::{self, State}, http::{HeaderMap, HeaderValue, Request, Response, StatusCode}, middleware, @@ -34,17 +33,15 @@ use axum::{ routing::get, Router, }; -use bytes::Bytes; use mini_moka::sync::Cache; use tokio::{ fs::{self, File}, - io::{self, AsyncReadExt, AsyncSeekExt, SeekFrom}, + io::{self, copy_bidirectional, AsyncReadExt, AsyncSeekExt, SeekFrom}, }; -use tokio_util::sync::PollSender; -use tracing::error; +use tracing::{error, warn}; use uuid::Uuid; -use self::{mpsc_to_async_write::MpscToAsyncWrite, serve_file::serve_file, utils::*}; +use self::{serve_file::serve_file, utils::*}; mod async_read_body; mod mpsc_to_async_write; @@ -72,7 +69,7 @@ pub enum ServeFrom { } #[derive(Clone)] -struct LocalState { +pub struct LocalState { node: Arc, // This LRU cache allows us to avoid doing a DB lookup on every request. @@ -148,8 +145,7 @@ async fn get_or_init_lru_entry( } } -// We are using Axum on all platforms because Tauri's custom URI protocols can't be async! -pub fn router(node: Arc) -> Router<()> { +pub fn base_router() -> Router { Router::new() .route( "/thumbnail/*path", @@ -196,11 +192,11 @@ pub fn router(node: Arc) -> Router<()> { CacheValue { name: file_path_full_path, ext: extension, - file_path_pub_id, + file_path_pub_id: _file_path_pub_id, serve_from, .. }, - library, + _library, ) = get_or_init_lru_entry(&state, path).await?; match serve_from { @@ -236,49 +232,8 @@ pub fn router(node: Arc) -> Router<()> { serve_file(file, Ok(metadata), request.into_parts().0, resp).await } - ServeFrom::Remote(identity) => { - if !state.node.files_over_p2p_flag.load(Ordering::Relaxed) { - return Ok(not_found(())); - } - - // TODO: Support `Range` requests and `ETag` headers - let stream = state - .node - .p2p - .get_instance(&library.id, identity) - .ok_or_else(|| { - not_found(format!("Error connecting to {identity}: no connection method available")) - })? - .new_stream() - .await - .map_err(|err| { - not_found(format!("Error connecting to {identity}: {err:?}")) - })?; - - let (tx, mut rx) = tokio::sync::mpsc::channel::>(150); - // TODO: We only start a thread because of stupid `ManagerStreamAction2` and libp2p's `!Send/!Sync` bounds on a stream. - tokio::spawn(async move { - let Ok(()) = operations::request_file( - stream, - &library, - file_path_pub_id, - Range::Full, - MpscToAsyncWrite::new(PollSender::new(tx)), - ) - .await - else { - return; - }; - }); - - // TODO: Content Type - Ok(InfallibleResponse::builder().status(StatusCode::OK).body( - body::boxed(StreamBody::new(stream! { - while let Some(item) = rx.recv().await { - yield item; - } - })), - )) + ServeFrom::Remote(_identity) => { + Err(not_implemented("Can't serve file from remote node")) // TODO: Reimplement this } } }, @@ -321,8 +276,45 @@ pub fn router(node: Arc) -> Router<()> { }, ), ) +} + +pub fn with_state(node: Arc) -> LocalState { + let file_metadata_cache = Arc::new(Cache::new(150)); + + tokio::spawn({ + let file_metadata_cache = file_metadata_cache.clone(); + let mut tx = node.event_bus.0.subscribe(); + async move { + while let Ok(event) = tx.recv().await { + if let CoreEvent::InvalidateOperation(e) = event { + match e { + InvalidateOperationEvent::Single(event) => { + // TODO: This is inefficent as any change will invalidate who cache. We need the new invalidation system!!! + // TODO: It's also error prone and a fine-grained resource based invalidation system would avoid that. + if event.key == "search.objects" || event.key == "search.paths" { + file_metadata_cache.invalidate_all(); + } + } + InvalidateOperationEvent::All => { + file_metadata_cache.invalidate_all(); + } + } + } + } + } + }); + + LocalState { + node, + file_metadata_cache, + } +} + +// We are using Axum on all platforms because Tauri's custom URI protocols can't be async! +pub fn router(node: Arc) -> Router<()> { + Router::new() .route( - "/remote/:identity/rspc/*path", + "/remote/:identity/*path", get( |State(state): State, extract::Path((identity, rest)): extract::Path<(String, String)>, @@ -330,60 +322,79 @@ pub fn router(node: Arc) -> Router<()> { let identity = match RemoteIdentity::from_str(&identity) { Ok(identity) => identity, Err(err) => { - error!("Error parsing identity '{}': {}", identity, err); + warn!("Error parsing identity '{}': {}", identity, err); return (StatusCode::BAD_REQUEST, HeaderMap::new(), vec![]) .into_response(); } }; + *request.uri_mut() = format!("/{rest}") .parse() .expect("url was validated by Axum"); - match operations::remote_rspc(state.node.p2p.p2p.clone(), identity, request) - .await + let request_upgrade_header = + request.headers().get(header::UPGRADE).map(Clone::clone); + let maybe_client_upgrade = request.extensions_mut().remove::(); + + let mut response = match operations::remote_rspc( + state.node.p2p.p2p.clone(), + identity, + request, + ) + .await { - Ok(response) => response.into_response(), + Ok(v) => v, Err(err) => { - error!("Error doing remote rspc query with '{identity}': {err:?}"); - (StatusCode::INTERNAL_SERVER_ERROR, HeaderMap::new()).into_response() + warn!("Error doing remote rspc query with '{identity}': {err:?}"); + return StatusCode::BAD_GATEWAY.into_response(); } + }; + if response.status() == StatusCode::SWITCHING_PROTOCOLS { + if response.headers().get(header::UPGRADE) + != request_upgrade_header.as_ref() + { + return StatusCode::BAD_REQUEST.into_response(); + } + + let Some(request_upgraded) = maybe_client_upgrade else { + return StatusCode::BAD_REQUEST.into_response(); + }; + let Some(response_upgraded) = + response.extensions_mut().remove::() + else { + return StatusCode::BAD_REQUEST.into_response(); + }; + + tokio::spawn(async move { + let Ok(mut request_upgraded) = request_upgraded.await.map_err(|err| { + warn!("Error upgrading websocket request: {err}"); + }) else { + return; + }; + let Ok(mut response_upgraded) = + response_upgraded.await.map_err(|err| { + warn!("Error upgrading websocket response: {err}"); + }) + else { + return; + }; + + copy_bidirectional(&mut request_upgraded, &mut response_upgraded) + .await + .map_err(|err| { + warn!("Error upgrading websocket response: {err}"); + }) + .ok(); + }); } + + response.into_response() }, ), ) + .merge(base_router()) .route_layer(middleware::from_fn(cors_middleware)) - .with_state({ - let file_metadata_cache = Arc::new(Cache::new(150)); - - tokio::spawn({ - let file_metadata_cache = file_metadata_cache.clone(); - let mut tx = node.event_bus.0.subscribe(); - async move { - while let Ok(event) = tx.recv().await { - if let CoreEvent::InvalidateOperation(e) = event { - match e { - InvalidateOperationEvent::Single(event) => { - // TODO: This is inefficent as any change will invalidate who cache. We need the new invalidation system!!! - // TODO: It's also error prone and a fine-grained resource based invalidation system would avoid that. - if event.key == "search.objects" || event.key == "search.paths" - { - file_metadata_cache.invalidate_all(); - } - } - InvalidateOperationEvent::All => { - file_metadata_cache.invalidate_all(); - } - } - } - } - } - }); - - LocalState { - node, - file_metadata_cache, - } - }) + .with_state(with_state(node)) } // TODO: This should possibly be determined from magic bytes when the file is indexed and stored it in the DB on the file path diff --git a/core/src/custom_uri/utils.rs b/core/src/custom_uri/utils.rs index 030173d93..70171e2eb 100644 --- a/core/src/custom_uri/utils.rs +++ b/core/src/custom_uri/utils.rs @@ -61,6 +61,8 @@ pub(crate) async fn cors_middleware(req: Request, next: Next) -> Respon .expect("Invalid static response!"); } + let is_upgrade_request = req.headers().get("Upgrade").is_some(); + let mut response = next.run(req).await; { @@ -73,8 +75,11 @@ pub(crate) async fn cors_middleware(req: Request, next: Next) -> Respon HeaderValue::from_static("*"), ); - // https://developer.mozilla.org/en-US/docs/Web/HTTP/Headers/Connection - headers.insert("Connection", HeaderValue::from_static("Keep-Alive")); + // With websocket requests, setting this causes the browser to loose it's shit. + if !is_upgrade_request { + // https://developer.mozilla.org/en-US/docs/Web/HTTP/Headers/Connection + headers.insert("Connection", HeaderValue::from_static("Keep-Alive")); + } headers.insert("Server", HeaderValue::from_static("Spacedrive")); } diff --git a/core/src/lib.rs b/core/src/lib.rs index d6d22e458..53ebdd3ca 100644 --- a/core/src/lib.rs +++ b/core/src/lib.rs @@ -166,13 +166,21 @@ impl Node { jobs_actor.start(node.clone()); start_p2p( node.clone(), - router - .clone() - .endpoint({ - let node = node.clone(); - move |_| node.clone() - }) - .axum::<()>() + axum::Router::new() + .nest( + "/uri", + custom_uri::base_router().with_state(custom_uri::with_state(node.clone())), + ) + .nest( + "/rspc", + router + .clone() + .endpoint({ + let node = node.clone(); + move |_| node.clone() + }) + .axum::<()>(), + ) .into_make_service(), ); diff --git a/core/src/p2p/manager.rs b/core/src/p2p/manager.rs index 9e67ac952..74cfdb149 100644 --- a/core/src/p2p/manager.rs +++ b/core/src/p2p/manager.rs @@ -292,14 +292,6 @@ async fn start( } }; } - Header::File(req) => { - let Err(()) = operations::request_file::receiver(&node, req, stream).await - else { - return; - }; - - error!("Failed to handle file request"); - } Header::Http => { let remote = stream.remote_identity(); let Err(err) = operations::rspc::receiver(stream, &mut service).await else { diff --git a/core/src/p2p/operations/mod.rs b/core/src/p2p/operations/mod.rs index adbaaa809..02d01766d 100644 --- a/core/src/p2p/operations/mod.rs +++ b/core/src/p2p/operations/mod.rs @@ -1,8 +1,6 @@ pub mod ping; -pub mod request_file; pub mod rspc; pub mod spacedrop; -pub use request_file::request_file; pub use rspc::remote_rspc; pub use spacedrop::spacedrop; diff --git a/core/src/p2p/operations/request_file.rs b/core/src/p2p/operations/request_file.rs deleted file mode 100644 index 077d5f412..000000000 --- a/core/src/p2p/operations/request_file.rs +++ /dev/null @@ -1,242 +0,0 @@ -use crate::{ - library::Library, - p2p::{Header, HeaderFile}, - Node, -}; - -use sd_file_path_helper::{file_path_to_handle_p2p_serve_file, IsolatedFilePathData}; -use sd_p2p2::UnicastStream; -use sd_p2p_block::{BlockSize, Range, SpaceblockRequest, SpaceblockRequests, Transfer}; -use sd_prisma::prisma::file_path; -use tokio::{ - fs::File, - io::{AsyncReadExt, AsyncWrite, AsyncWriteExt, BufReader}, -}; -use tracing::{debug, warn}; -use uuid::Uuid; - -use std::{ - path::Path, - sync::{ - atomic::{AtomicBool, Ordering}, - Arc, - }, -}; - -/// Request a file from the remote machine over P2P. This is used for preview media and quick preview. -/// -/// DO NOT USE THIS WITHOUT `node.files_over_p2p_flag == true` -pub async fn request_file( - mut stream: UnicastStream, - library: &Library, - file_path_id: Uuid, - range: Range, - output: impl AsyncWrite + Unpin, -) -> Result<(), ()> { - let id = Uuid::new_v4(); - // TODO: Tunnel for encryption + authentication - - stream - .write_all( - &Header::File(HeaderFile { - id, - library_id: library.id, - file_path_id, - range: range.clone(), - }) - .to_bytes(), - ) - .await - .map_err(|err| { - warn!("({id}): failed to read `Header::File`: {err:?}"); - - // TODO: UI error - // TODO: Error sent to remote peer - })?; - - let block_size = BlockSize::from_stream(&mut stream).await.map_err(|err| { - warn!("({id}): failed to read block size: {err:?}"); - - // TODO: UI error - // TODO: Error sent to remote peer - })?; - let size = stream.read_u64_le().await.map_err(|err| { - warn!("({id}): failed to read file size: {err:?}"); - - // TODO: UI error - // TODO: Error sent to remote peer - })?; - - Transfer::new( - &SpaceblockRequests { - id, - block_size, - requests: vec![SpaceblockRequest { - // TODO: Removing need for this field in this case - name: "todo".to_string(), - // TODO: Maybe removing need for `size` from this side - size, - range, - }], - }, - |percent| { - debug!( - "P2P receiving file path '{}' - progress {}%", - file_path_id, percent - ); - }, - &Arc::new(AtomicBool::new(false)), - ) - .receive(&mut stream, output) - .await - .map_err(|err| { - warn!("({id}): transfer failed: {err:?}"); - - // TODO: Error in UI - // TODO: Send error to remote peer??? - })?; - - Ok(()) -} - -pub(crate) async fn receiver( - node: &Arc, - HeaderFile { - id, - library_id, - file_path_id, - range, - }: HeaderFile, - mut stream: UnicastStream, -) -> Result<(), ()> { - #[allow(clippy::panic)] // If you've made it this far that's on you. - if !node.files_over_p2p_flag.load(Ordering::Relaxed) { - panic!("Files over P2P is disabled!"); - } - - // TODO: Tunnel and authentication - // TODO: Use BufReader - - let library = node - .libraries - .get_library(&library_id) - .await - .ok_or_else(|| { - warn!("({id}): library not found'{library_id:?}'"); - - // TODO: Error in UI - // TODO: Send error to remote peer??? -> Can we avoid constructing connection until this is done so it's only an error on one side? - })?; - - let file_path = library - .db - .file_path() - .find_unique(file_path::pub_id::equals(file_path_id.as_bytes().to_vec())) - .select(file_path_to_handle_p2p_serve_file::select()) - .exec() - .await - .map_err(|err| { - warn!("({id}): error querying for file_path '{file_path_id:?}': {err:?}",); - - // TODO: Error in UI - // TODO: Send error to remote peer??? -> Can we avoid constructing connection until this is done so it's only an error on one side? - })? - .ok_or_else(|| { - warn!("({id}): file_path not found '{file_path_id:?}'"); - - // TODO: Error in UI - // TODO: Send error to remote peer??? -> Can we avoid constructing connection until this is done so it's only an error on one side? - })?; - - let location = file_path.location.as_ref().ok_or_else(|| { - warn!("({id}): file_path '{file_path_id:?} is missing 'location' property"); - - // TODO: Error in UI - // TODO: Send error to remote peer??? - })?; - let location_path = location.path.as_ref().ok_or_else(|| { - warn!( - "({id}): location '{:?} is missing 'path' property", - location.id - ); - - // TODO: Error in UI - // TODO: Send error to remote peer??? - })?; - let path = Path::new(location_path) - .join(IsolatedFilePathData::try_from((location.id, &file_path)).map_err(|err| { - warn!("({id}): failed to construct 'IsolatedFilePathData' for location '{:?} '{file_path:?}': {err:?}", location.id); - - // TODO: Error in UI - // TODO: Send error to remote peer??? - })?); - - debug!("Serving path '{:?}' over P2P", path); - - let file = File::open(&path).await.map_err(|err| { - warn!("({id}): failed to open file '{path:?}': {err:?}"); - - // TODO: Error in UI - // TODO: Send error to remote peer??? - })?; - - let metadata = file.metadata().await.map_err(|err| { - warn!("({id}): failed to get metadata for file '{path:?}': {err:?}"); - - // TODO: Error in UI - // TODO: Send error to remote peer??? - })?; - let block_size = BlockSize::from_size(metadata.len()); - - stream - .write_all(&block_size.to_bytes()) - .await - .map_err(|err| { - warn!("({id}): failed to write block size: {err:?}"); - - // TODO: Error in UI - // TODO: Send error to remote peer??? - })?; - stream - .write_all(&metadata.len().to_le_bytes()) - .await - .map_err(|err| { - warn!("({id}): failed to write length: {err:?}"); - - // TODO: Error in UI - // TODO: Send error to remote peer??? - })?; - - let file = BufReader::new(file); - Transfer::new( - &SpaceblockRequests { - id, - block_size, - requests: vec![SpaceblockRequest { - // TODO: Removing need for this field in this case - name: "todo".to_string(), - size: metadata.len(), - range, - }], - }, - |percent| { - debug!( - "P2P loading file path '{}' - progress {}%", - file_path_id, percent - ); - }, - &Arc::new(AtomicBool::new(false)), - ) - .send(&mut stream, file) - .await - .map_err(|err| { - warn!("({id}): transfer failed: {err:?}"); - - // TODO: Error in UI - // TODO: Send error to remote peer??? - })?; - - Ok(()) -} - -// TODO: Unit tests diff --git a/core/src/p2p/operations/rspc.rs b/core/src/p2p/operations/rspc.rs index f40db45d3..746d38e60 100644 --- a/core/src/p2p/operations/rspc.rs +++ b/core/src/p2p/operations/rspc.rs @@ -43,6 +43,12 @@ pub(crate) async fn receiver( stream.remote_identity(), ); + // TODO: Authentication + #[allow(clippy::todo)] + if true { + todo!("You wouldn't download a car!"); + } + Http::new() .http1_only(true) .http1_keep_alive(true) diff --git a/core/src/p2p/protocol.rs b/core/src/p2p/protocol.rs index 6948befeb..436151b5c 100644 --- a/core/src/p2p/protocol.rs +++ b/core/src/p2p/protocol.rs @@ -1,18 +1,9 @@ -use sd_p2p_block::{Range, SpaceblockRequests, SpaceblockRequestsError}; +use sd_p2p_block::{SpaceblockRequests, SpaceblockRequestsError}; use sd_p2p_proto::{decode, encode}; use thiserror::Error; use tokio::io::{AsyncRead, AsyncReadExt}; use uuid::Uuid; -#[derive(Debug, PartialEq, Eq)] -pub struct HeaderFile { - // Request ID - pub(crate) id: Uuid, - pub(crate) library_id: Uuid, - pub(crate) file_path_id: Uuid, - pub(crate) range: Range, -} - /// TODO #[derive(Debug, PartialEq, Eq)] pub enum Header { @@ -20,7 +11,6 @@ pub enum Header { Ping, Spacedrop(SpaceblockRequests), Sync(Uuid), - File(HeaderFile), // A HTTP server used for rspc requests and streaming files Http, } @@ -35,10 +25,6 @@ pub enum HeaderError { SpacedropRequest(#[from] SpaceblockRequestsError), #[error("error reading sync request: {0}")] SyncRequest(decode::Error), - #[error("error reading header file: {0}")] - HeaderFile(decode::Error), - #[error("error invalid header file discriminator '{0}'")] - HeaderFileDiscriminatorInvalid(u8), } impl Header { @@ -58,36 +44,6 @@ impl Header { .await .map_err(HeaderError::SyncRequest)?, )), - 4 => Ok(Self::File(HeaderFile { - id: decode::uuid(stream) - .await - .map_err(HeaderError::HeaderFile)?, - library_id: decode::uuid(stream) - .await - .map_err(HeaderError::HeaderFile)?, - file_path_id: decode::uuid(stream) - .await - .map_err(HeaderError::HeaderFile)?, - range: match stream - .read_u8() - .await - .map_err(|err| HeaderError::HeaderFile(err.into()))? - { - 0 => Range::Full, - 1 => { - let start = stream - .read_u64_le() - .await - .map_err(|err| HeaderError::HeaderFile(err.into()))?; - let end = stream - .read_u64_le() - .await - .map_err(|err| HeaderError::HeaderFile(err.into()))?; - Range::Partial(start..end) - } - i => return Err(HeaderError::HeaderFileDiscriminatorInvalid(i)), - }, - })), 5 => Ok(Self::Http), d => Err(HeaderError::DiscriminatorInvalid(d)), } @@ -106,19 +62,6 @@ impl Header { encode::uuid(&mut bytes, uuid); bytes } - Self::File(HeaderFile { - id, - library_id, - file_path_id, - range, - }) => { - let mut buf = vec![4]; - encode::uuid(&mut buf, id); - encode::uuid(&mut buf, library_id); - encode::uuid(&mut buf, file_path_id); - buf.extend_from_slice(&range.to_bytes()); - buf - } Self::Http => vec![5], } } diff --git a/crates/p2p2/src/quic/transport.rs b/crates/p2p2/src/quic/transport.rs index c2dd3a601..05abd509a 100644 --- a/crates/p2p2/src/quic/transport.rs +++ b/crates/p2p2/src/quic/transport.rs @@ -188,8 +188,6 @@ async fn start( tokio::select! { Ok(event) = rx.recv_async() => match event { HookEvent::PeerExpiredBy(_, identity) => { - println!("CHECKING {:?}", identity); // TODO - let Some(peer) = p2p.peers.read().unwrap_or_else(PoisonError::into_inner).get(&identity).map(Clone::clone) else { continue; }; diff --git a/interface/app/$libraryId/debug/p2p-rspc.tsx b/interface/app/$libraryId/debug/p2p-rspc.tsx index 36e7aa43c..657c9f28f 100644 --- a/interface/app/$libraryId/debug/p2p-rspc.tsx +++ b/interface/app/$libraryId/debug/p2p-rspc.tsx @@ -1,26 +1,19 @@ -import { httpLink, initRspc, type AlphaClient } from '@oscartbeaumont-sd/rspc-client/v2'; -import { useEffect, useRef, useState } from 'react'; -import { useDiscoveredPeers, type Procedures } from '@sd/client'; +import { useNavigate } from 'react-router'; +import { useCache, useDiscoveredPeers, useLibraryQuery, useNodes } from '@sd/client'; import { Button } from '@sd/ui'; -import { usePlatform } from '~/util/Platform'; export const Component = () => { // TODO: Handle if P2P is disabled - const [activePeer, setActivePeer] = useState(null); - return (
- {activePeer ? ( - - ) : ( - - )} +
); }; -function PeerSelector({ setActivePeer }: { setActivePeer: (peer: string) => void }) { +function PeerSelector() { const peers = useDiscoveredPeers(); + const navigate = useNavigate(); return ( <> @@ -32,7 +25,9 @@ function PeerSelector({ setActivePeer }: { setActivePeer: (peer: string) => void {[...peers.entries()].map(([id, _node]) => (
  • {id} - +
  • ))} @@ -40,43 +35,3 @@ function PeerSelector({ setActivePeer }: { setActivePeer: (peer: string) => void ); } - -function P2PInfo({ peer }: { peer: string }) { - const platform = usePlatform(); - const ref = useRef>(); - const [result, setResult] = useState(''); - useEffect(() => { - // TODO: Cleanup when URL changed - const endpoint = platform.getRemoteRspcEndpoint(peer); - ref.current = initRspc({ - links: [ - httpLink({ - url: endpoint.url, - headers: endpoint.headers - }) - ] - }); - }, [peer]); - - useEffect(() => { - if (!ref.current) return; - ref.current.query(['nodeState']).then((data) => setResult(JSON.stringify(data, null, 2))); - }, [ref, result]); - - return ( -
    -

    Connected with: {peer}

    - - -
    {result}
    -
    - ); -} diff --git a/interface/app/$libraryId/ephemeral.tsx b/interface/app/$libraryId/ephemeral.tsx index 6af18f4b3..70faf5030 100644 --- a/interface/app/$libraryId/ephemeral.tsx +++ b/interface/app/$libraryId/ephemeral.tsx @@ -1,3 +1,4 @@ +import { type AlphaClient } from '@oscartbeaumont-sd/rspc-client/v2'; import { ArrowLeft, ArrowRight, Info } from '@phosphor-icons/react'; import * as Dialog from '@radix-ui/react-dialog'; import { iconNames } from '@sd/assets/util'; diff --git a/interface/app/index.tsx b/interface/app/index.tsx index 9af1bdb6e..7980ce298 100644 --- a/interface/app/index.tsx +++ b/interface/app/index.tsx @@ -1,18 +1,41 @@ -import { useMemo } from 'react'; -import { Navigate, Outlet, redirect, useMatches, type RouteObject } from 'react-router-dom'; +import { initRspc, wsBatchLink, type AlphaClient } from '@oscartbeaumont-sd/rspc-client/v2'; +import { QueryClient, QueryClientProvider } from '@tanstack/react-query'; +import { useEffect, useMemo, useState } from 'react'; import { + Link, + Navigate, + Outlet, + redirect, + useMatches, + useNavigate, + type RouteObject +} from 'react-router-dom'; +import { + CacheProvider, + ClientContextProvider, + context, + context2, + createCache, currentLibraryCache, getCachedLibraries, + LibraryContextProvider, + nonLibraryClient, NormalisedCache, + Procedures, + useBridgeQuery, + useCache, useCachedLibraries, useFeatureFlag, - WithSolid + useNodes, + WithSolid, + type LibraryProceduresDef, + type NonLibraryProceduresDef } from '@sd/client'; -import { Dialogs, Toaster } from '@sd/ui'; +import { Button, Dialogs, Toaster, z } from '@sd/ui'; import { RouterErrorBoundary } from '~/ErrorFallback'; import { useRoutingContext } from '~/RoutingContext'; -import { Platform } from '..'; +import { Platform, PlatformProvider, usePlatform } from '..'; import libraryRoutes from './$libraryId'; import { DragAndDropDebug } from './$libraryId/debug/dnd'; import { Demo, Demo2 } from './demo.solid'; @@ -21,10 +44,14 @@ import { RootContext } from './RootContext'; import './style.scss'; +import { useZodRouteParams } from '~/hooks'; + // NOTE: all route `Layout`s below should contain // the `usePlausiblePageViewMonitor` hook, as early as possible (ideally within the layout itself). // the hook should only be included if there's a valid `ClientContext` (so not onboarding) +const LibraryIdParamsSchema = z.object({ libraryId: z.string() }); + export const createRoutes = (platform: Platform, cache: NormalisedCache) => [ { @@ -67,7 +94,7 @@ export const createRoutes = (platform: Platform, cache: NormalisedCache) => return ; }, loader: async () => { - const libraries = await getCachedLibraries(cache); + const libraries = await getCachedLibraries(cache, nonLibraryClient); const currentLibrary = libraries.find( (l) => l.uuid === currentLibraryCache.id @@ -86,11 +113,61 @@ export const createRoutes = (platform: Platform, cache: NormalisedCache) => lazy: () => import('./onboarding/Layout'), children: onboardingRoutes }, + { + path: 'remote/:node', + Component: (props) => , + children: [ + { + path: 'browse', + Component: BrowsePage + }, + { + path: ':libraryId', + Component: () => { + const params = useZodRouteParams(LibraryIdParamsSchema); + const result = useBridgeQuery(['library.list']); + useNodes(result.data?.nodes); + const libraries = useCache(result.data?.items); + + const library = libraries?.find((l) => l.uuid === params.libraryId); + + useEffect(() => { + if (!result.data) return; + + if (!library) { + alert('Library not found'); + // TODO: Redirect + } + }); + + if (!library) return <>; // TODO: Using suspense for loading + + return ( + + +
    + YOUR ON A REMOTE NODE Go Back +
    + +
    +
    + ); + }, + children: [ + { + path: '*', + lazy: () => import('./$libraryId/Layout'), + children: libraryRoutes(platform) + } + ] + } + ] + }, { path: ':libraryId', lazy: () => import('./$libraryId/Layout'), loader: async ({ params: { libraryId } }) => { - const libraries = await getCachedLibraries(cache); + const libraries = await getCachedLibraries(cache, nonLibraryClient); const library = libraries.find((l) => l.uuid === libraryId); if (!library) { @@ -109,6 +186,132 @@ export const createRoutes = (platform: Platform, cache: NormalisedCache) => } ] satisfies RouteObject[]; +const ParamsSchema = z.object({ node: z.string() }); + +function RemoteLayout() { + const platform = usePlatform(); + const params = useZodRouteParams(ParamsSchema); + + // TODO: The caches should instead be prefixed by the remote node ID, instead of completely being recreated but that's too hard to do right now. + const [rspcClient, setRspcClient] = + useState< + [ + AlphaClient, + AlphaClient, + QueryClient, + NormalisedCache + ] + >(); + useEffect(() => { + const endpoint = platform.getRemoteRspcEndpoint(params.node); + + const links = [ + wsBatchLink({ + url: endpoint.url + }) + ]; + + const client = initRspc({ + links + }).dangerouslyHookIntoInternals(); + const libraryClient = initRspc({ + links + }).dangerouslyHookIntoInternals({ + mapQueryKey: (keyAndInput) => { + const libraryId = currentLibraryCache.id; + if (libraryId === null) + throw new Error('Attempted to do library operation with no library set!'); + return [keyAndInput[0], { library_id: libraryId, arg: keyAndInput[1] ?? null }]; + } + }); + const cache = createCache(); + setRspcClient([client, libraryClient, new QueryClient(), cache]); + + return () => { + // TODO: We *really* need to cleanup `client` so we aren't leaking all the resources. + }; + }, [params.node, platform]); + + // TODO: Detect if the remote node if offline and render something to show that + + const newPlatform = useMemo( + () => + ({ + ...platform, + getThumbnailUrlByThumbKey: (thumbKey) => + platform.constructRemoteRspcPath( + params.node, + `thumbnail/${thumbKey.map((i) => encodeURIComponent(i)).join('/')}.webp` + ), + getFileUrl: (libraryId, locationLocalId, filePathId) => + platform.constructRemoteRspcPath( + params.node, + `file/${encodeURIComponent(libraryId)}/${encodeURIComponent( + locationLocalId + )}/${encodeURIComponent(filePathId)}` + ), + getFileUrlByPath: (path) => + platform.constructRemoteRspcPath( + params.node, + `local-file-by-path/${encodeURIComponent(path)}` + ) + }) satisfies Platform, + [platform, params.node] + ); + + return ( + + {/* TODO: Maybe library context too? */} + {rspcClient && ( + + + + + + + + + + )} + + ); +} + +function BrowsePage() { + const navigate = useNavigate(); + const result = useBridgeQuery(['library.list']); + useNodes(result.data?.nodes); + const libraries = useCache(result.data?.items); + + return ( +
    +

    Browse Libraries On Remote Node:

    + {libraries?.map((l) => ( + + ))} +
    + ); +} + /** * Combines the `path` segments of the current route into a single string. * This is useful for things like analytics, where we want the route path diff --git a/interface/util/Platform.tsx b/interface/util/Platform.tsx index 71f1b6227..2c10c50b8 100644 --- a/interface/util/Platform.tsx +++ b/interface/util/Platform.tsx @@ -21,6 +21,7 @@ export type Platform = { url: string; headers?: Record; }; + constructRemoteRspcPath: (remote_identity: string, path: string) => string; openLink: (url: string) => void; // Tauri patches `window.confirm` to return `Promise` not `bool` confirm(msg: string, cb: (result: boolean) => void): void; diff --git a/packages/client/src/cache.tsx b/packages/client/src/cache.tsx index ec6dc3d64..b95b71144 100644 --- a/packages/client/src/cache.tsx +++ b/packages/client/src/cache.tsx @@ -34,11 +34,11 @@ export function createCache() { const cache = proxy(defaultStore()); return { cache, - withNodes(data: CacheNode[] | undefined) { - updateNodes(cache, data); + withNodes(data: CacheNode[] | undefined, suffix?: string) { + updateNodes(cache, data, suffix); }, - withCache(data: T | undefined): UseCacheResult { - return restore(cache, new Map(), data) as any; + withCache(data: T | undefined, suffix?: string): UseCacheResult { + return restore(cache, new Map(), data, suffix) as any; } }; } @@ -123,7 +123,12 @@ function scanDataForKeys(cache: Store, keys: StableSet<[string, string]>, item: } } -function restore(cache: Store, subscribed: Map>, item: unknown): unknown { +function restore( + cache: Store, + subscribed: Map>, + item: unknown, + suffix?: string +): unknown { if (item === undefined || item === null) { return item; } else if (Array.isArray(item)) { @@ -132,15 +137,16 @@ function restore(cache: Store, subscribed: Map>, item: unkn if ('__type' in item && '__id' in item) { if (typeof item.__type !== 'string') throw new Error('Invalid `__type`'); if (typeof item.__id !== 'string') throw new Error('Invalid `__id`'); - const result = cache.nodes?.[item.__type]?.[item.__id]; - if (!result) - throw new Error(`Missing node for id '${item.__id}' of type '${item.__type}'`); + const ty = suffix ? `${suffix}:${item.__type}` : item.__type; - const v = subscribed.get(item.__type); + const result = cache.nodes?.[ty]?.[item.__id]; + if (!result) throw new Error(`Missing node for id '${item.__id}' of type '${ty}'`); + + const v = subscribed.get(ty); if (v) { v.add(item.__id); } else { - subscribed.set(item.__type, new Set([item.__id])); + subscribed.set(ty, new Set([item.__id])); } // We call restore again for arrays and objects to deal with nested relations. @@ -180,24 +186,25 @@ export function useNormalisedCache() { }; } -function updateNodes(cache: Store, data: CacheNode[] | undefined) { +function updateNodes(cache: Store, data: CacheNode[] | undefined, suffix?: string) { if (!data) return; for (const item of data) { if (!('__type' in item && '__id' in item)) throw new Error('Missing `__type` or `__id`'); if (typeof item.__type !== 'string') throw new Error('Invalid `__type`'); if (typeof item.__id !== 'string') throw new Error('Invalid `__id`'); + const ty = suffix ? `${suffix}:${item.__type}` : item.__type; const copy = { ...item } as any; delete copy.__type; delete copy.__id; - const original = cache.nodes?.[item.__type]?.[item.__id]; + const original = cache.nodes?.[ty]?.[item.__id]; specialMerge(copy, original); - if (!cache.nodes[item.__type]) cache.nodes[item.__type] = {}; + if (!cache.nodes[ty]) cache.nodes[ty] = {}; // TODO: This should be a deepmerge but that would break stuff like `size_in_bytes` or `inode` as the arrays are joined. - cache.nodes[item.__type]![item.__id] = copy; + cache.nodes[ty]![item.__id] = copy; } } diff --git a/packages/client/src/hooks/useClientContext.tsx b/packages/client/src/hooks/useClientContext.tsx index 97acbb11a..fc14a19a0 100644 --- a/packages/client/src/hooks/useClientContext.tsx +++ b/packages/client/src/hooks/useClientContext.tsx @@ -1,7 +1,8 @@ +import { AlphaClient } from '@oscartbeaumont-sd/rspc-client/v2'; import { createContext, PropsWithChildren, useContext, useEffect, useMemo } from 'react'; import { NormalisedCache, useCache, useNodes } from '../cache'; -import { LibraryConfigWrapped } from '../core'; +import { LibraryConfigWrapped, Procedures } from '../core'; import { valtioPersist } from '../lib'; import { nonLibraryClient, useBridgeQuery } from '../rspc'; @@ -38,7 +39,7 @@ export const useCachedLibraries = () => { }; }; -export async function getCachedLibraries(cache: NormalisedCache) { +export async function getCachedLibraries(cache: NormalisedCache, client: AlphaClient) { const cachedData = localStorage.getItem(libraryCacheLocalStorageKey); if (cachedData) { @@ -52,7 +53,7 @@ export async function getCachedLibraries(cache: NormalisedCache) { } } - const result = await nonLibraryClient.query(['library.list']); + const result = await client.query(['library.list']); cache.withNodes(result.nodes); const libraries = cache.withCache(result.items); diff --git a/packages/client/src/rspc.tsx b/packages/client/src/rspc.tsx index 23dbe9eab..43944edb2 100644 --- a/packages/client/src/rspc.tsx +++ b/packages/client/src/rspc.tsx @@ -1,7 +1,11 @@ -import { ProcedureDef } from '@oscartbeaumont-sd/rspc-client'; +import { + inferMutationInput, + inferMutationResult, + ProcedureDef +} from '@oscartbeaumont-sd/rspc-client'; import { AlphaRSPCError, initRspc } from '@oscartbeaumont-sd/rspc-client/v2'; -import { Context, createReactQueryHooks } from '@oscartbeaumont-sd/rspc-react/v2'; -import { QueryClient } from '@tanstack/react-query'; +import { BaseOptions, Context, createReactQueryHooks } from '@oscartbeaumont-sd/rspc-react/v2'; +import { QueryClient, useMutation, UseMutationOptions, useQuery } from '@tanstack/react-query'; import { createContext, PropsWithChildren, useContext } from 'react'; import { match, P } from 'ts-pattern'; @@ -26,11 +30,11 @@ type StripLibraryArgsFromInput< key: T['key']; input: NeverOverNull extends true ? (E extends null ? never : E) : E; result: T['result']; - } + } : never : never; -type NonLibraryProceduresDef = { +export type NonLibraryProceduresDef = { queries: NonLibraryProcedure<'queries'>; mutations: NonLibraryProcedure<'mutations'>; subscriptions: NonLibraryProcedure<'subscriptions'>; @@ -42,8 +46,8 @@ export type LibraryProceduresDef = { subscriptions: StripLibraryArgsFromInput, true>; }; -const context = createContext>(undefined!); -const context2 = createContext>(undefined!); +export const context = createContext>(undefined!); +export const context2 = createContext>(undefined!); export const useRspcContext = () => useContext(context); export const useRspcLibraryContext = () => useContext(context2);