Everything over P2P (#2135)

* Fix websockets over P2P + multi-node cache

* cleanup features

* Explorer over P2P demo

* wip

* wip

* working for ephemeral stuff

* Button to get out of remote viewer

* remove

* Cleanup

* more cleanup

* Fix cross-node library queries

* Files over P2P

* Drop `Header::File`

* Fix Clippy

* Don't let noobs get hacked
This commit is contained in:
Oscar Beaumont
2024-02-29 15:46:30 +08:00
committed by GitHub
parent 95f83ef275
commit ef12e5d1c0
21 changed files with 409 additions and 509 deletions

BIN
Cargo.lock generated
View File

Binary file not shown.

View File

@@ -53,7 +53,7 @@ swift-rs = { version = "1.0.6" }
anyhow = "1.0.75"
async-channel = "2.0.0"
async-trait = "0.1.77"
axum = "0.6.20"
axum = "=0.6.20"
base64 = "0.21.5"
blake3 = "1.5.0"
chrono = "0.4.31"

View File

@@ -54,11 +54,16 @@ export const platform = {
getFileUrlByPath: (path) =>
constructServerUrl(`/local-file-by-path/${encodeURIComponent(path)}`),
getRemoteRspcEndpoint: (remote_identity) => ({
url: `${customUriServerUrl?.[0]}/remote/${encodeURIComponent(remote_identity)}/rspc`,
headers: {
authorization: `Bearer ${customUriAuthToken}`
}
url: `${customUriServerUrl?.[0]
?.replace('https', 'wss')
?.replace('http', 'ws')}/remote/${encodeURIComponent(
remote_identity
)}/rspc/ws?token=${customUriAuthToken}`
}),
constructRemoteRspcPath: (remote_identity, path) =>
constructServerUrl(
`/remote/${encodeURIComponent(remote_identity)}/uri/${path}?token=${customUriAuthToken}`
),
openLink: shell.open,
getOs,
openDirectoryPickerDialog: (opts) => {

View File

@@ -50,8 +50,12 @@ const platform: Platform = {
)}/${encodeURIComponent(filePathId)}`,
getFileUrlByPath: (path) => `${spacedriveURL}/local-file-by-path/${encodeURIComponent(path)}`,
getRemoteRspcEndpoint: (remote_identity) => ({
url: `${spacedriveURL}/remote/${encodeURIComponent(remote_identity)}/rspc`
url: `${spacedriveURL
.replace('https', 'wss')
.replace('http', 'ws')}/remote/${encodeURIComponent(remote_identity)}/rspc/ws`
}),
constructRemoteRspcPath: (remote_identity, path) =>
`${spacedriveURL}/remote/${encodeURIComponent(remote_identity)}/uri/${path}`,
openLink: (url) => window.open(url, '_blank')?.focus(),
confirm: (message, cb) => cb(window.confirm(message)),
auth: {
@@ -151,7 +155,7 @@ function useRouter() {
event.historyAction === 'PUSH'
? currentIndex
: // sometimes the max index is 0 when the current index is > 0, like when reloading the page -_-
Math.max(router.maxIndex, currentIndex)
Math.max(router.maxIndex, currentIndex)
};
});
});

View File

@@ -52,7 +52,7 @@ sd-cloud-api = { version = "0.1.0", path = "../crates/cloud-api" }
# Workspace dependencies
async-channel = { workspace = true }
async-trait = { workspace = true }
axum = { workspace = true }
axum = { workspace = true, features = ["ws"] }
base64 = { workspace = true }
blake3 = { workspace = true }
chrono = { workspace = true, features = ["serde"] }

View File

@@ -7,10 +7,10 @@ use crate::{
Node,
};
use hyper::{header, upgrade::OnUpgrade};
use sd_file_ext::text::is_text;
use sd_file_path_helper::{file_path_to_handle_custom_uri, IsolatedFilePathData};
use sd_p2p2::{IdentityOrRemoteIdentity, RemoteIdentity};
use sd_p2p_block::Range;
use sd_prisma::prisma::{file_path, location};
use sd_utils::db::maybe_missing;
@@ -21,12 +21,11 @@ use std::{
fs::Metadata,
path::{Path, PathBuf},
str::FromStr,
sync::{atomic::Ordering, Arc},
sync::Arc,
};
use async_stream::stream;
use axum::{
body::{self, Body, BoxBody, Full, StreamBody},
body::{self, Body, BoxBody, Full},
extract::{self, State},
http::{HeaderMap, HeaderValue, Request, Response, StatusCode},
middleware,
@@ -34,17 +33,15 @@ use axum::{
routing::get,
Router,
};
use bytes::Bytes;
use mini_moka::sync::Cache;
use tokio::{
fs::{self, File},
io::{self, AsyncReadExt, AsyncSeekExt, SeekFrom},
io::{self, copy_bidirectional, AsyncReadExt, AsyncSeekExt, SeekFrom},
};
use tokio_util::sync::PollSender;
use tracing::error;
use tracing::{error, warn};
use uuid::Uuid;
use self::{mpsc_to_async_write::MpscToAsyncWrite, serve_file::serve_file, utils::*};
use self::{serve_file::serve_file, utils::*};
mod async_read_body;
mod mpsc_to_async_write;
@@ -72,7 +69,7 @@ pub enum ServeFrom {
}
#[derive(Clone)]
struct LocalState {
pub struct LocalState {
node: Arc<Node>,
// This LRU cache allows us to avoid doing a DB lookup on every request.
@@ -148,8 +145,7 @@ async fn get_or_init_lru_entry(
}
}
// We are using Axum on all platforms because Tauri's custom URI protocols can't be async!
pub fn router(node: Arc<Node>) -> Router<()> {
pub fn base_router() -> Router<LocalState> {
Router::new()
.route(
"/thumbnail/*path",
@@ -196,11 +192,11 @@ pub fn router(node: Arc<Node>) -> Router<()> {
CacheValue {
name: file_path_full_path,
ext: extension,
file_path_pub_id,
file_path_pub_id: _file_path_pub_id,
serve_from,
..
},
library,
_library,
) = get_or_init_lru_entry(&state, path).await?;
match serve_from {
@@ -236,49 +232,8 @@ pub fn router(node: Arc<Node>) -> Router<()> {
serve_file(file, Ok(metadata), request.into_parts().0, resp).await
}
ServeFrom::Remote(identity) => {
if !state.node.files_over_p2p_flag.load(Ordering::Relaxed) {
return Ok(not_found(()));
}
// TODO: Support `Range` requests and `ETag` headers
let stream = state
.node
.p2p
.get_instance(&library.id, identity)
.ok_or_else(|| {
not_found(format!("Error connecting to {identity}: no connection method available"))
})?
.new_stream()
.await
.map_err(|err| {
not_found(format!("Error connecting to {identity}: {err:?}"))
})?;
let (tx, mut rx) = tokio::sync::mpsc::channel::<io::Result<Bytes>>(150);
// TODO: We only start a thread because of stupid `ManagerStreamAction2` and libp2p's `!Send/!Sync` bounds on a stream.
tokio::spawn(async move {
let Ok(()) = operations::request_file(
stream,
&library,
file_path_pub_id,
Range::Full,
MpscToAsyncWrite::new(PollSender::new(tx)),
)
.await
else {
return;
};
});
// TODO: Content Type
Ok(InfallibleResponse::builder().status(StatusCode::OK).body(
body::boxed(StreamBody::new(stream! {
while let Some(item) = rx.recv().await {
yield item;
}
})),
))
ServeFrom::Remote(_identity) => {
Err(not_implemented("Can't serve file from remote node")) // TODO: Reimplement this
}
}
},
@@ -321,8 +276,45 @@ pub fn router(node: Arc<Node>) -> Router<()> {
},
),
)
}
pub fn with_state(node: Arc<Node>) -> LocalState {
let file_metadata_cache = Arc::new(Cache::new(150));
tokio::spawn({
let file_metadata_cache = file_metadata_cache.clone();
let mut tx = node.event_bus.0.subscribe();
async move {
while let Ok(event) = tx.recv().await {
if let CoreEvent::InvalidateOperation(e) = event {
match e {
InvalidateOperationEvent::Single(event) => {
// TODO: This is inefficent as any change will invalidate who cache. We need the new invalidation system!!!
// TODO: It's also error prone and a fine-grained resource based invalidation system would avoid that.
if event.key == "search.objects" || event.key == "search.paths" {
file_metadata_cache.invalidate_all();
}
}
InvalidateOperationEvent::All => {
file_metadata_cache.invalidate_all();
}
}
}
}
}
});
LocalState {
node,
file_metadata_cache,
}
}
// We are using Axum on all platforms because Tauri's custom URI protocols can't be async!
pub fn router(node: Arc<Node>) -> Router<()> {
Router::new()
.route(
"/remote/:identity/rspc/*path",
"/remote/:identity/*path",
get(
|State(state): State<LocalState>,
extract::Path((identity, rest)): extract::Path<(String, String)>,
@@ -330,60 +322,79 @@ pub fn router(node: Arc<Node>) -> Router<()> {
let identity = match RemoteIdentity::from_str(&identity) {
Ok(identity) => identity,
Err(err) => {
error!("Error parsing identity '{}': {}", identity, err);
warn!("Error parsing identity '{}': {}", identity, err);
return (StatusCode::BAD_REQUEST, HeaderMap::new(), vec![])
.into_response();
}
};
*request.uri_mut() = format!("/{rest}")
.parse()
.expect("url was validated by Axum");
match operations::remote_rspc(state.node.p2p.p2p.clone(), identity, request)
.await
let request_upgrade_header =
request.headers().get(header::UPGRADE).map(Clone::clone);
let maybe_client_upgrade = request.extensions_mut().remove::<OnUpgrade>();
let mut response = match operations::remote_rspc(
state.node.p2p.p2p.clone(),
identity,
request,
)
.await
{
Ok(response) => response.into_response(),
Ok(v) => v,
Err(err) => {
error!("Error doing remote rspc query with '{identity}': {err:?}");
(StatusCode::INTERNAL_SERVER_ERROR, HeaderMap::new()).into_response()
warn!("Error doing remote rspc query with '{identity}': {err:?}");
return StatusCode::BAD_GATEWAY.into_response();
}
};
if response.status() == StatusCode::SWITCHING_PROTOCOLS {
if response.headers().get(header::UPGRADE)
!= request_upgrade_header.as_ref()
{
return StatusCode::BAD_REQUEST.into_response();
}
let Some(request_upgraded) = maybe_client_upgrade else {
return StatusCode::BAD_REQUEST.into_response();
};
let Some(response_upgraded) =
response.extensions_mut().remove::<OnUpgrade>()
else {
return StatusCode::BAD_REQUEST.into_response();
};
tokio::spawn(async move {
let Ok(mut request_upgraded) = request_upgraded.await.map_err(|err| {
warn!("Error upgrading websocket request: {err}");
}) else {
return;
};
let Ok(mut response_upgraded) =
response_upgraded.await.map_err(|err| {
warn!("Error upgrading websocket response: {err}");
})
else {
return;
};
copy_bidirectional(&mut request_upgraded, &mut response_upgraded)
.await
.map_err(|err| {
warn!("Error upgrading websocket response: {err}");
})
.ok();
});
}
response.into_response()
},
),
)
.merge(base_router())
.route_layer(middleware::from_fn(cors_middleware))
.with_state({
let file_metadata_cache = Arc::new(Cache::new(150));
tokio::spawn({
let file_metadata_cache = file_metadata_cache.clone();
let mut tx = node.event_bus.0.subscribe();
async move {
while let Ok(event) = tx.recv().await {
if let CoreEvent::InvalidateOperation(e) = event {
match e {
InvalidateOperationEvent::Single(event) => {
// TODO: This is inefficent as any change will invalidate who cache. We need the new invalidation system!!!
// TODO: It's also error prone and a fine-grained resource based invalidation system would avoid that.
if event.key == "search.objects" || event.key == "search.paths"
{
file_metadata_cache.invalidate_all();
}
}
InvalidateOperationEvent::All => {
file_metadata_cache.invalidate_all();
}
}
}
}
}
});
LocalState {
node,
file_metadata_cache,
}
})
.with_state(with_state(node))
}
// TODO: This should possibly be determined from magic bytes when the file is indexed and stored it in the DB on the file path

View File

@@ -61,6 +61,8 @@ pub(crate) async fn cors_middleware<B>(req: Request<B>, next: Next<B>) -> Respon
.expect("Invalid static response!");
}
let is_upgrade_request = req.headers().get("Upgrade").is_some();
let mut response = next.run(req).await;
{
@@ -73,8 +75,11 @@ pub(crate) async fn cors_middleware<B>(req: Request<B>, next: Next<B>) -> Respon
HeaderValue::from_static("*"),
);
// https://developer.mozilla.org/en-US/docs/Web/HTTP/Headers/Connection
headers.insert("Connection", HeaderValue::from_static("Keep-Alive"));
// With websocket requests, setting this causes the browser to loose it's shit.
if !is_upgrade_request {
// https://developer.mozilla.org/en-US/docs/Web/HTTP/Headers/Connection
headers.insert("Connection", HeaderValue::from_static("Keep-Alive"));
}
headers.insert("Server", HeaderValue::from_static("Spacedrive"));
}

View File

@@ -166,13 +166,21 @@ impl Node {
jobs_actor.start(node.clone());
start_p2p(
node.clone(),
router
.clone()
.endpoint({
let node = node.clone();
move |_| node.clone()
})
.axum::<()>()
axum::Router::new()
.nest(
"/uri",
custom_uri::base_router().with_state(custom_uri::with_state(node.clone())),
)
.nest(
"/rspc",
router
.clone()
.endpoint({
let node = node.clone();
move |_| node.clone()
})
.axum::<()>(),
)
.into_make_service(),
);

View File

@@ -292,14 +292,6 @@ async fn start(
}
};
}
Header::File(req) => {
let Err(()) = operations::request_file::receiver(&node, req, stream).await
else {
return;
};
error!("Failed to handle file request");
}
Header::Http => {
let remote = stream.remote_identity();
let Err(err) = operations::rspc::receiver(stream, &mut service).await else {

View File

@@ -1,8 +1,6 @@
pub mod ping;
pub mod request_file;
pub mod rspc;
pub mod spacedrop;
pub use request_file::request_file;
pub use rspc::remote_rspc;
pub use spacedrop::spacedrop;

View File

@@ -1,242 +0,0 @@
use crate::{
library::Library,
p2p::{Header, HeaderFile},
Node,
};
use sd_file_path_helper::{file_path_to_handle_p2p_serve_file, IsolatedFilePathData};
use sd_p2p2::UnicastStream;
use sd_p2p_block::{BlockSize, Range, SpaceblockRequest, SpaceblockRequests, Transfer};
use sd_prisma::prisma::file_path;
use tokio::{
fs::File,
io::{AsyncReadExt, AsyncWrite, AsyncWriteExt, BufReader},
};
use tracing::{debug, warn};
use uuid::Uuid;
use std::{
path::Path,
sync::{
atomic::{AtomicBool, Ordering},
Arc,
},
};
/// Request a file from the remote machine over P2P. This is used for preview media and quick preview.
///
/// DO NOT USE THIS WITHOUT `node.files_over_p2p_flag == true`
pub async fn request_file(
mut stream: UnicastStream,
library: &Library,
file_path_id: Uuid,
range: Range,
output: impl AsyncWrite + Unpin,
) -> Result<(), ()> {
let id = Uuid::new_v4();
// TODO: Tunnel for encryption + authentication
stream
.write_all(
&Header::File(HeaderFile {
id,
library_id: library.id,
file_path_id,
range: range.clone(),
})
.to_bytes(),
)
.await
.map_err(|err| {
warn!("({id}): failed to read `Header::File`: {err:?}");
// TODO: UI error
// TODO: Error sent to remote peer
})?;
let block_size = BlockSize::from_stream(&mut stream).await.map_err(|err| {
warn!("({id}): failed to read block size: {err:?}");
// TODO: UI error
// TODO: Error sent to remote peer
})?;
let size = stream.read_u64_le().await.map_err(|err| {
warn!("({id}): failed to read file size: {err:?}");
// TODO: UI error
// TODO: Error sent to remote peer
})?;
Transfer::new(
&SpaceblockRequests {
id,
block_size,
requests: vec![SpaceblockRequest {
// TODO: Removing need for this field in this case
name: "todo".to_string(),
// TODO: Maybe removing need for `size` from this side
size,
range,
}],
},
|percent| {
debug!(
"P2P receiving file path '{}' - progress {}%",
file_path_id, percent
);
},
&Arc::new(AtomicBool::new(false)),
)
.receive(&mut stream, output)
.await
.map_err(|err| {
warn!("({id}): transfer failed: {err:?}");
// TODO: Error in UI
// TODO: Send error to remote peer???
})?;
Ok(())
}
pub(crate) async fn receiver(
node: &Arc<Node>,
HeaderFile {
id,
library_id,
file_path_id,
range,
}: HeaderFile,
mut stream: UnicastStream,
) -> Result<(), ()> {
#[allow(clippy::panic)] // If you've made it this far that's on you.
if !node.files_over_p2p_flag.load(Ordering::Relaxed) {
panic!("Files over P2P is disabled!");
}
// TODO: Tunnel and authentication
// TODO: Use BufReader
let library = node
.libraries
.get_library(&library_id)
.await
.ok_or_else(|| {
warn!("({id}): library not found'{library_id:?}'");
// TODO: Error in UI
// TODO: Send error to remote peer??? -> Can we avoid constructing connection until this is done so it's only an error on one side?
})?;
let file_path = library
.db
.file_path()
.find_unique(file_path::pub_id::equals(file_path_id.as_bytes().to_vec()))
.select(file_path_to_handle_p2p_serve_file::select())
.exec()
.await
.map_err(|err| {
warn!("({id}): error querying for file_path '{file_path_id:?}': {err:?}",);
// TODO: Error in UI
// TODO: Send error to remote peer??? -> Can we avoid constructing connection until this is done so it's only an error on one side?
})?
.ok_or_else(|| {
warn!("({id}): file_path not found '{file_path_id:?}'");
// TODO: Error in UI
// TODO: Send error to remote peer??? -> Can we avoid constructing connection until this is done so it's only an error on one side?
})?;
let location = file_path.location.as_ref().ok_or_else(|| {
warn!("({id}): file_path '{file_path_id:?} is missing 'location' property");
// TODO: Error in UI
// TODO: Send error to remote peer???
})?;
let location_path = location.path.as_ref().ok_or_else(|| {
warn!(
"({id}): location '{:?} is missing 'path' property",
location.id
);
// TODO: Error in UI
// TODO: Send error to remote peer???
})?;
let path = Path::new(location_path)
.join(IsolatedFilePathData::try_from((location.id, &file_path)).map_err(|err| {
warn!("({id}): failed to construct 'IsolatedFilePathData' for location '{:?} '{file_path:?}': {err:?}", location.id);
// TODO: Error in UI
// TODO: Send error to remote peer???
})?);
debug!("Serving path '{:?}' over P2P", path);
let file = File::open(&path).await.map_err(|err| {
warn!("({id}): failed to open file '{path:?}': {err:?}");
// TODO: Error in UI
// TODO: Send error to remote peer???
})?;
let metadata = file.metadata().await.map_err(|err| {
warn!("({id}): failed to get metadata for file '{path:?}': {err:?}");
// TODO: Error in UI
// TODO: Send error to remote peer???
})?;
let block_size = BlockSize::from_size(metadata.len());
stream
.write_all(&block_size.to_bytes())
.await
.map_err(|err| {
warn!("({id}): failed to write block size: {err:?}");
// TODO: Error in UI
// TODO: Send error to remote peer???
})?;
stream
.write_all(&metadata.len().to_le_bytes())
.await
.map_err(|err| {
warn!("({id}): failed to write length: {err:?}");
// TODO: Error in UI
// TODO: Send error to remote peer???
})?;
let file = BufReader::new(file);
Transfer::new(
&SpaceblockRequests {
id,
block_size,
requests: vec![SpaceblockRequest {
// TODO: Removing need for this field in this case
name: "todo".to_string(),
size: metadata.len(),
range,
}],
},
|percent| {
debug!(
"P2P loading file path '{}' - progress {}%",
file_path_id, percent
);
},
&Arc::new(AtomicBool::new(false)),
)
.send(&mut stream, file)
.await
.map_err(|err| {
warn!("({id}): transfer failed: {err:?}");
// TODO: Error in UI
// TODO: Send error to remote peer???
})?;
Ok(())
}
// TODO: Unit tests

View File

@@ -43,6 +43,12 @@ pub(crate) async fn receiver(
stream.remote_identity(),
);
// TODO: Authentication
#[allow(clippy::todo)]
if true {
todo!("You wouldn't download a car!");
}
Http::new()
.http1_only(true)
.http1_keep_alive(true)

View File

@@ -1,18 +1,9 @@
use sd_p2p_block::{Range, SpaceblockRequests, SpaceblockRequestsError};
use sd_p2p_block::{SpaceblockRequests, SpaceblockRequestsError};
use sd_p2p_proto::{decode, encode};
use thiserror::Error;
use tokio::io::{AsyncRead, AsyncReadExt};
use uuid::Uuid;
#[derive(Debug, PartialEq, Eq)]
pub struct HeaderFile {
// Request ID
pub(crate) id: Uuid,
pub(crate) library_id: Uuid,
pub(crate) file_path_id: Uuid,
pub(crate) range: Range,
}
/// TODO
#[derive(Debug, PartialEq, Eq)]
pub enum Header {
@@ -20,7 +11,6 @@ pub enum Header {
Ping,
Spacedrop(SpaceblockRequests),
Sync(Uuid),
File(HeaderFile),
// A HTTP server used for rspc requests and streaming files
Http,
}
@@ -35,10 +25,6 @@ pub enum HeaderError {
SpacedropRequest(#[from] SpaceblockRequestsError),
#[error("error reading sync request: {0}")]
SyncRequest(decode::Error),
#[error("error reading header file: {0}")]
HeaderFile(decode::Error),
#[error("error invalid header file discriminator '{0}'")]
HeaderFileDiscriminatorInvalid(u8),
}
impl Header {
@@ -58,36 +44,6 @@ impl Header {
.await
.map_err(HeaderError::SyncRequest)?,
)),
4 => Ok(Self::File(HeaderFile {
id: decode::uuid(stream)
.await
.map_err(HeaderError::HeaderFile)?,
library_id: decode::uuid(stream)
.await
.map_err(HeaderError::HeaderFile)?,
file_path_id: decode::uuid(stream)
.await
.map_err(HeaderError::HeaderFile)?,
range: match stream
.read_u8()
.await
.map_err(|err| HeaderError::HeaderFile(err.into()))?
{
0 => Range::Full,
1 => {
let start = stream
.read_u64_le()
.await
.map_err(|err| HeaderError::HeaderFile(err.into()))?;
let end = stream
.read_u64_le()
.await
.map_err(|err| HeaderError::HeaderFile(err.into()))?;
Range::Partial(start..end)
}
i => return Err(HeaderError::HeaderFileDiscriminatorInvalid(i)),
},
})),
5 => Ok(Self::Http),
d => Err(HeaderError::DiscriminatorInvalid(d)),
}
@@ -106,19 +62,6 @@ impl Header {
encode::uuid(&mut bytes, uuid);
bytes
}
Self::File(HeaderFile {
id,
library_id,
file_path_id,
range,
}) => {
let mut buf = vec![4];
encode::uuid(&mut buf, id);
encode::uuid(&mut buf, library_id);
encode::uuid(&mut buf, file_path_id);
buf.extend_from_slice(&range.to_bytes());
buf
}
Self::Http => vec![5],
}
}

View File

@@ -188,8 +188,6 @@ async fn start(
tokio::select! {
Ok(event) = rx.recv_async() => match event {
HookEvent::PeerExpiredBy(_, identity) => {
println!("CHECKING {:?}", identity); // TODO
let Some(peer) = p2p.peers.read().unwrap_or_else(PoisonError::into_inner).get(&identity).map(Clone::clone) else {
continue;
};

View File

@@ -1,26 +1,19 @@
import { httpLink, initRspc, type AlphaClient } from '@oscartbeaumont-sd/rspc-client/v2';
import { useEffect, useRef, useState } from 'react';
import { useDiscoveredPeers, type Procedures } from '@sd/client';
import { useNavigate } from 'react-router';
import { useCache, useDiscoveredPeers, useLibraryQuery, useNodes } from '@sd/client';
import { Button } from '@sd/ui';
import { usePlatform } from '~/util/Platform';
export const Component = () => {
// TODO: Handle if P2P is disabled
const [activePeer, setActivePeer] = useState<string | null>(null);
return (
<div className="p-4">
{activePeer ? (
<P2PInfo peer={activePeer} />
) : (
<PeerSelector setActivePeer={setActivePeer} />
)}
<PeerSelector />
</div>
);
};
function PeerSelector({ setActivePeer }: { setActivePeer: (peer: string) => void }) {
function PeerSelector() {
const peers = useDiscoveredPeers();
const navigate = useNavigate();
return (
<>
@@ -32,7 +25,9 @@ function PeerSelector({ setActivePeer }: { setActivePeer: (peer: string) => void
{[...peers.entries()].map(([id, _node]) => (
<li key={id}>
{id}
<Button onClick={() => setActivePeer(id)}>Connect</Button>
<Button onClick={() => navigate(`/remote/${id}/browse`)}>
Open Library Browser
</Button>
</li>
))}
</ul>
@@ -40,43 +35,3 @@ function PeerSelector({ setActivePeer }: { setActivePeer: (peer: string) => void
</>
);
}
function P2PInfo({ peer }: { peer: string }) {
const platform = usePlatform();
const ref = useRef<AlphaClient<Procedures>>();
const [result, setResult] = useState('');
useEffect(() => {
// TODO: Cleanup when URL changed
const endpoint = platform.getRemoteRspcEndpoint(peer);
ref.current = initRspc<Procedures>({
links: [
httpLink({
url: endpoint.url,
headers: endpoint.headers
})
]
});
}, [peer]);
useEffect(() => {
if (!ref.current) return;
ref.current.query(['nodeState']).then((data) => setResult(JSON.stringify(data, null, 2)));
}, [ref, result]);
return (
<div className="flex flex-col">
<h1>Connected with: {peer}</h1>
<Button
onClick={() => {
ref.current
?.query(['nodeState'])
.then((data) => setResult(JSON.stringify(data, null, 2)));
}}
>
Refetch
</Button>
<pre>{result}</pre>
</div>
);
}

View File

@@ -1,3 +1,4 @@
import { type AlphaClient } from '@oscartbeaumont-sd/rspc-client/v2';
import { ArrowLeft, ArrowRight, Info } from '@phosphor-icons/react';
import * as Dialog from '@radix-ui/react-dialog';
import { iconNames } from '@sd/assets/util';

View File

@@ -1,18 +1,41 @@
import { useMemo } from 'react';
import { Navigate, Outlet, redirect, useMatches, type RouteObject } from 'react-router-dom';
import { initRspc, wsBatchLink, type AlphaClient } from '@oscartbeaumont-sd/rspc-client/v2';
import { QueryClient, QueryClientProvider } from '@tanstack/react-query';
import { useEffect, useMemo, useState } from 'react';
import {
Link,
Navigate,
Outlet,
redirect,
useMatches,
useNavigate,
type RouteObject
} from 'react-router-dom';
import {
CacheProvider,
ClientContextProvider,
context,
context2,
createCache,
currentLibraryCache,
getCachedLibraries,
LibraryContextProvider,
nonLibraryClient,
NormalisedCache,
Procedures,
useBridgeQuery,
useCache,
useCachedLibraries,
useFeatureFlag,
WithSolid
useNodes,
WithSolid,
type LibraryProceduresDef,
type NonLibraryProceduresDef
} from '@sd/client';
import { Dialogs, Toaster } from '@sd/ui';
import { Button, Dialogs, Toaster, z } from '@sd/ui';
import { RouterErrorBoundary } from '~/ErrorFallback';
import { useRoutingContext } from '~/RoutingContext';
import { Platform } from '..';
import { Platform, PlatformProvider, usePlatform } from '..';
import libraryRoutes from './$libraryId';
import { DragAndDropDebug } from './$libraryId/debug/dnd';
import { Demo, Demo2 } from './demo.solid';
@@ -21,10 +44,14 @@ import { RootContext } from './RootContext';
import './style.scss';
import { useZodRouteParams } from '~/hooks';
// NOTE: all route `Layout`s below should contain
// the `usePlausiblePageViewMonitor` hook, as early as possible (ideally within the layout itself).
// the hook should only be included if there's a valid `ClientContext` (so not onboarding)
const LibraryIdParamsSchema = z.object({ libraryId: z.string() });
export const createRoutes = (platform: Platform, cache: NormalisedCache) =>
[
{
@@ -67,7 +94,7 @@ export const createRoutes = (platform: Platform, cache: NormalisedCache) =>
return <Navigate to={`${libraryId}`} replace />;
},
loader: async () => {
const libraries = await getCachedLibraries(cache);
const libraries = await getCachedLibraries(cache, nonLibraryClient);
const currentLibrary = libraries.find(
(l) => l.uuid === currentLibraryCache.id
@@ -86,11 +113,61 @@ export const createRoutes = (platform: Platform, cache: NormalisedCache) =>
lazy: () => import('./onboarding/Layout'),
children: onboardingRoutes
},
{
path: 'remote/:node',
Component: (props) => <RemoteLayout {...props} />,
children: [
{
path: 'browse',
Component: BrowsePage
},
{
path: ':libraryId',
Component: () => {
const params = useZodRouteParams(LibraryIdParamsSchema);
const result = useBridgeQuery(['library.list']);
useNodes(result.data?.nodes);
const libraries = useCache(result.data?.items);
const library = libraries?.find((l) => l.uuid === params.libraryId);
useEffect(() => {
if (!result.data) return;
if (!library) {
alert('Library not found');
// TODO: Redirect
}
});
if (!library) return <></>; // TODO: Using suspense for loading
return (
<ClientContextProvider currentLibraryId={params.libraryId}>
<LibraryContextProvider library={library}>
<div className="w-full bg-orange-500 text-center text-white">
YOUR ON A REMOTE NODE <Link to="/">Go Back</Link>
</div>
<Outlet />
</LibraryContextProvider>
</ClientContextProvider>
);
},
children: [
{
path: '*',
lazy: () => import('./$libraryId/Layout'),
children: libraryRoutes(platform)
}
]
}
]
},
{
path: ':libraryId',
lazy: () => import('./$libraryId/Layout'),
loader: async ({ params: { libraryId } }) => {
const libraries = await getCachedLibraries(cache);
const libraries = await getCachedLibraries(cache, nonLibraryClient);
const library = libraries.find((l) => l.uuid === libraryId);
if (!library) {
@@ -109,6 +186,132 @@ export const createRoutes = (platform: Platform, cache: NormalisedCache) =>
}
] satisfies RouteObject[];
const ParamsSchema = z.object({ node: z.string() });
function RemoteLayout() {
const platform = usePlatform();
const params = useZodRouteParams(ParamsSchema);
// TODO: The caches should instead be prefixed by the remote node ID, instead of completely being recreated but that's too hard to do right now.
const [rspcClient, setRspcClient] =
useState<
[
AlphaClient<NonLibraryProceduresDef>,
AlphaClient<LibraryProceduresDef>,
QueryClient,
NormalisedCache
]
>();
useEffect(() => {
const endpoint = platform.getRemoteRspcEndpoint(params.node);
const links = [
wsBatchLink({
url: endpoint.url
})
];
const client = initRspc<Procedures>({
links
}).dangerouslyHookIntoInternals<NonLibraryProceduresDef>();
const libraryClient = initRspc<Procedures>({
links
}).dangerouslyHookIntoInternals<LibraryProceduresDef>({
mapQueryKey: (keyAndInput) => {
const libraryId = currentLibraryCache.id;
if (libraryId === null)
throw new Error('Attempted to do library operation with no library set!');
return [keyAndInput[0], { library_id: libraryId, arg: keyAndInput[1] ?? null }];
}
});
const cache = createCache();
setRspcClient([client, libraryClient, new QueryClient(), cache]);
return () => {
// TODO: We *really* need to cleanup `client` so we aren't leaking all the resources.
};
}, [params.node, platform]);
// TODO: Detect if the remote node if offline and render something to show that
const newPlatform = useMemo(
() =>
({
...platform,
getThumbnailUrlByThumbKey: (thumbKey) =>
platform.constructRemoteRspcPath(
params.node,
`thumbnail/${thumbKey.map((i) => encodeURIComponent(i)).join('/')}.webp`
),
getFileUrl: (libraryId, locationLocalId, filePathId) =>
platform.constructRemoteRspcPath(
params.node,
`file/${encodeURIComponent(libraryId)}/${encodeURIComponent(
locationLocalId
)}/${encodeURIComponent(filePathId)}`
),
getFileUrlByPath: (path) =>
platform.constructRemoteRspcPath(
params.node,
`local-file-by-path/${encodeURIComponent(path)}`
)
}) satisfies Platform,
[platform, params.node]
);
return (
<PlatformProvider platform={newPlatform}>
{/* TODO: Maybe library context too? */}
{rspcClient && (
<QueryClientProvider client={rspcClient[2]}>
<CacheProvider cache={rspcClient[3]}>
<context.Provider
value={{
// @ts-expect-error
client: rspcClient[0],
queryClient: rspcClient[2]
}}
>
<context2.Provider
value={{
// @ts-expect-error
client: rspcClient[1],
queryClient: rspcClient[2]
}}
>
<Outlet />
</context2.Provider>
</context.Provider>
</CacheProvider>
</QueryClientProvider>
)}
</PlatformProvider>
);
}
function BrowsePage() {
const navigate = useNavigate();
const result = useBridgeQuery(['library.list']);
useNodes(result.data?.nodes);
const libraries = useCache(result.data?.items);
return (
<div className="flex flex-col">
<h1>Browse Libraries On Remote Node:</h1>
{libraries?.map((l) => (
<Button
key={l.uuid}
variant="accent"
// TODO: Take into account Windows vs Mac vs Linux with the default `path`
onClick={() => navigate(`../${l.uuid}/ephemeral/0-0?path=/System/Volumes/Data`)}
>
{l.config.name}
</Button>
))}
</div>
);
}
/**
* Combines the `path` segments of the current route into a single string.
* This is useful for things like analytics, where we want the route path

View File

@@ -21,6 +21,7 @@ export type Platform = {
url: string;
headers?: Record<string, string>;
};
constructRemoteRspcPath: (remote_identity: string, path: string) => string;
openLink: (url: string) => void;
// Tauri patches `window.confirm` to return `Promise` not `bool`
confirm(msg: string, cb: (result: boolean) => void): void;

View File

@@ -34,11 +34,11 @@ export function createCache() {
const cache = proxy(defaultStore());
return {
cache,
withNodes(data: CacheNode[] | undefined) {
updateNodes(cache, data);
withNodes(data: CacheNode[] | undefined, suffix?: string) {
updateNodes(cache, data, suffix);
},
withCache<T>(data: T | undefined): UseCacheResult<T> {
return restore(cache, new Map(), data) as any;
withCache<T>(data: T | undefined, suffix?: string): UseCacheResult<T> {
return restore(cache, new Map(), data, suffix) as any;
}
};
}
@@ -123,7 +123,12 @@ function scanDataForKeys(cache: Store, keys: StableSet<[string, string]>, item:
}
}
function restore(cache: Store, subscribed: Map<string, Set<unknown>>, item: unknown): unknown {
function restore(
cache: Store,
subscribed: Map<string, Set<unknown>>,
item: unknown,
suffix?: string
): unknown {
if (item === undefined || item === null) {
return item;
} else if (Array.isArray(item)) {
@@ -132,15 +137,16 @@ function restore(cache: Store, subscribed: Map<string, Set<unknown>>, item: unkn
if ('__type' in item && '__id' in item) {
if (typeof item.__type !== 'string') throw new Error('Invalid `__type`');
if (typeof item.__id !== 'string') throw new Error('Invalid `__id`');
const result = cache.nodes?.[item.__type]?.[item.__id];
if (!result)
throw new Error(`Missing node for id '${item.__id}' of type '${item.__type}'`);
const ty = suffix ? `${suffix}:${item.__type}` : item.__type;
const v = subscribed.get(item.__type);
const result = cache.nodes?.[ty]?.[item.__id];
if (!result) throw new Error(`Missing node for id '${item.__id}' of type '${ty}'`);
const v = subscribed.get(ty);
if (v) {
v.add(item.__id);
} else {
subscribed.set(item.__type, new Set([item.__id]));
subscribed.set(ty, new Set([item.__id]));
}
// We call restore again for arrays and objects to deal with nested relations.
@@ -180,24 +186,25 @@ export function useNormalisedCache() {
};
}
function updateNodes(cache: Store, data: CacheNode[] | undefined) {
function updateNodes(cache: Store, data: CacheNode[] | undefined, suffix?: string) {
if (!data) return;
for (const item of data) {
if (!('__type' in item && '__id' in item)) throw new Error('Missing `__type` or `__id`');
if (typeof item.__type !== 'string') throw new Error('Invalid `__type`');
if (typeof item.__id !== 'string') throw new Error('Invalid `__id`');
const ty = suffix ? `${suffix}:${item.__type}` : item.__type;
const copy = { ...item } as any;
delete copy.__type;
delete copy.__id;
const original = cache.nodes?.[item.__type]?.[item.__id];
const original = cache.nodes?.[ty]?.[item.__id];
specialMerge(copy, original);
if (!cache.nodes[item.__type]) cache.nodes[item.__type] = {};
if (!cache.nodes[ty]) cache.nodes[ty] = {};
// TODO: This should be a deepmerge but that would break stuff like `size_in_bytes` or `inode` as the arrays are joined.
cache.nodes[item.__type]![item.__id] = copy;
cache.nodes[ty]![item.__id] = copy;
}
}

View File

@@ -1,7 +1,8 @@
import { AlphaClient } from '@oscartbeaumont-sd/rspc-client/v2';
import { createContext, PropsWithChildren, useContext, useEffect, useMemo } from 'react';
import { NormalisedCache, useCache, useNodes } from '../cache';
import { LibraryConfigWrapped } from '../core';
import { LibraryConfigWrapped, Procedures } from '../core';
import { valtioPersist } from '../lib';
import { nonLibraryClient, useBridgeQuery } from '../rspc';
@@ -38,7 +39,7 @@ export const useCachedLibraries = () => {
};
};
export async function getCachedLibraries(cache: NormalisedCache) {
export async function getCachedLibraries(cache: NormalisedCache, client: AlphaClient<Procedures>) {
const cachedData = localStorage.getItem(libraryCacheLocalStorageKey);
if (cachedData) {
@@ -52,7 +53,7 @@ export async function getCachedLibraries(cache: NormalisedCache) {
}
}
const result = await nonLibraryClient.query(['library.list']);
const result = await client.query(['library.list']);
cache.withNodes(result.nodes);
const libraries = cache.withCache(result.items);

View File

@@ -1,7 +1,11 @@
import { ProcedureDef } from '@oscartbeaumont-sd/rspc-client';
import {
inferMutationInput,
inferMutationResult,
ProcedureDef
} from '@oscartbeaumont-sd/rspc-client';
import { AlphaRSPCError, initRspc } from '@oscartbeaumont-sd/rspc-client/v2';
import { Context, createReactQueryHooks } from '@oscartbeaumont-sd/rspc-react/v2';
import { QueryClient } from '@tanstack/react-query';
import { BaseOptions, Context, createReactQueryHooks } from '@oscartbeaumont-sd/rspc-react/v2';
import { QueryClient, useMutation, UseMutationOptions, useQuery } from '@tanstack/react-query';
import { createContext, PropsWithChildren, useContext } from 'react';
import { match, P } from 'ts-pattern';
@@ -26,11 +30,11 @@ type StripLibraryArgsFromInput<
key: T['key'];
input: NeverOverNull extends true ? (E extends null ? never : E) : E;
result: T['result'];
}
}
: never
: never;
type NonLibraryProceduresDef = {
export type NonLibraryProceduresDef = {
queries: NonLibraryProcedure<'queries'>;
mutations: NonLibraryProcedure<'mutations'>;
subscriptions: NonLibraryProcedure<'subscriptions'>;
@@ -42,8 +46,8 @@ export type LibraryProceduresDef = {
subscriptions: StripLibraryArgsFromInput<LibraryProcedures<'subscriptions'>, true>;
};
const context = createContext<Context<Procedures>>(undefined!);
const context2 = createContext<Context<LibraryProceduresDef>>(undefined!);
export const context = createContext<Context<Procedures>>(undefined!);
export const context2 = createContext<Context<LibraryProceduresDef>>(undefined!);
export const useRspcContext = () => useContext(context);
export const useRspcLibraryContext = () => useContext(context2);