Working Sync & Thumbnail Sync (#2875)

* First draft on new p2p design

* Test sync

* Sync works

* Update version

* Fix merge conflicts

* More stuff for thumbnail sync

* Get the thumbnail

* Update mod.rs

* Working thumbnail sync

* Format

* Update thumbnails.rs

* Update runner.rs

* Update runner.rs

---------

Co-authored-by: Ericson Soares <ericson.ds999@gmail.com>
This commit is contained in:
Arnab Chakraborty
2025-03-20 04:34:59 +00:00
committed by GitHub
parent 2ee62186a2
commit 4693f31e90
83 changed files with 1561 additions and 176 deletions

BIN
Cargo.lock generated
View File

Binary file not shown.

View File

@@ -20,50 +20,53 @@ rust-version = "1.81"
[workspace.dependencies]
# First party dependencies
sd-cloud-schema = { git = "https://github.com/spacedriveapp/cloud-services-schema", rev = "4e4565bee4" }
sd-cloud-schema = { git = "https://github.com/spacedriveapp/cloud-services-schema", rev = "515ba740ea" }
# Third party dependencies used by one or more of our crates
anyhow = "1.0.94"
async-channel = "2.3"
async-stream = "0.3.6"
async-trait = "0.1.83"
axum = "0.7.7"
axum-extra = "0.9.4"
axum = "0.7.9"
axum-extra = "0.9.6"
base64 = "0.22.1"
blake3 = "1.5.4"
bytes = "1.7.1" # Update blocked by hyper
chrono = "0.4.38"
blake3 = "1.5.5"
bytes = "1.9.0"
chrono = "0.4.39"
ed25519-dalek = "2.1"
flume = "0.11.0"
futures = "0.3.31"
futures-concurrency = "7.6"
futures-concurrency = "7.6.2"
globset = "0.4.15"
http = "1.1"
hyper = "1.5"
image = "0.25.4"
http = "1.2.0"
hyper = "1.5.2"
image = "0.25.5"
iroh = "0.29.0"
itertools = "0.13.0"
lending-stream = "1.0"
libc = "0.2.159"
libc = "0.2.169"
mimalloc = "0.1.43"
normpath = "1.3"
pin-project-lite = "0.2.14"
pin-project-lite = "0.2.15"
quic-rpc = "0.17.3"
rand = "0.9.0-alpha.2"
regex = "1.11"
reqwest = { version = "0.12.8", default-features = false }
regex = "1.11.1"
reqwest = { version = "0.12.9", default-features = false }
rmp = "0.8.14"
rmp-serde = "1.3"
rmpv = { version = "1.3", features = ["with-serde"] }
serde = "1.0"
serde_json = "1.0"
serde = "1.0.216"
serde_json = "1.0.133"
specta = "=2.0.0-rc.20"
strum = "0.26"
strum_macros = "0.26"
tempfile = "3.13"
thiserror = "1.0"
tokio = "1.40"
tokio-stream = "0.1.16"
tokio-util = "0.7.12"
tracing = "0.1.40"
tracing-subscriber = "0.3.18"
tempfile = "3.14.0"
thiserror = "2.0.8"
tokio = "1.42.0"
tokio-stream = "0.1.17"
tokio-util = "0.7.13"
tracing = "0.1.41"
tracing-subscriber = "0.3.19"
tracing-test = "0.2.5"
uhlc = "0.8.0" # Must follow version used by specta
uuid = "1.10" # Must follow version used by specta

View File

@@ -1,6 +1,6 @@
[package]
name = "sd-desktop"
version = "0.4.3"
version = "0.5.0"
authors = ["Spacedrive Technology Inc <support@spacedrive.com>"]
default-run = "sd-desktop"
@@ -69,9 +69,9 @@ sd-desktop-linux = { path = "../crates/linux" }
# WARNING: dbus must NOT be vendored, as that breaks the app on Linux,X11,Nvidia
dbus = { version = "0.9.7", features = ["stdfd"] }
# https://github.com/tauri-apps/tauri/blob/tauri-v2.0.0/crates/tauri/Cargo.toml#L101
gtk = { version = "0.18", features = ["v3_24"] }
tao = { version = "0.31.1", features = ["serde"] }
webkit2gtk = { version = "=2.0.1", features = ["v2_40"] }
gtk = { version = "0.18", features = ["v3_24"] }
tao = { version = "0.31.1", features = ["serde"] }
[target.'cfg(target_os = "macos")'.dependencies]

View File

@@ -8,6 +8,7 @@ import {
libraryClient,
RspcProvider,
useBridgeMutation,
useLibraryMutation,
useSelector
} from '@sd/client';
import {
@@ -61,21 +62,21 @@ declare global {
}
// Disabling until sync is ready.
// SuperTokens.init({
// appInfo: {
// apiDomain: AUTH_SERVER_URL,
// apiBasePath: '/api/auth',
// appName: 'Spacedrive Auth Service'
// },
// cookieHandler: getCookieHandler,
// windowHandler: getWindowHandler,
// recipeList: [
// Session.init({ tokenTransferMethod: 'header' }),
// EmailPassword.init(),
// ThirdParty.init(),
// Passwordless.init()
// ]
// });
SuperTokens.init({
appInfo: {
apiDomain: AUTH_SERVER_URL,
apiBasePath: '/api/auth',
appName: 'Spacedrive Auth Service'
},
cookieHandler: getCookieHandler,
windowHandler: getWindowHandler,
recipeList: [
Session.init({ tokenTransferMethod: 'header' }),
EmailPassword.init()
// ThirdParty.init(),
// Passwordless.init()
]
});
const startupError = (window as any).__SD_ERROR__ as string | undefined;
@@ -232,7 +233,7 @@ type RedirectPath = { pathname: string; search: string | undefined };
function AppInner() {
const [tabs, setTabs] = useState(() => [createTab()]);
const [selectedTabIndex, setSelectedTabIndex] = useState(0);
const cloudBootstrap = useBridgeMutation('cloud.bootstrap');
const cloudBootstrap = useLibraryMutation('cloud.bootstrap');
useEffect(() => {
(async () => {

View File

@@ -49,6 +49,16 @@ export const commands = {
else return { status: 'error', error: e as any };
}
},
/**
* Initiates a drag and drop operation with cursor position tracking
*
* # Arguments
* * `window` - The Tauri window instance
* * `_state` - Current drag state (unused)
* * `files` - Vector of file paths to be dragged
* * `image` - Base64 encoded image to be used as drag icon
* * `on_event` - Channel for communicating drag operation events back to the frontend
*/
async startDrag(
files: string[],
image: string,

View File

@@ -1,6 +1,6 @@
[package]
name = "sd-core"
version = "0.4.3"
version = "0.5.0"
authors = ["Spacedrive Technology Inc <support@spacedrive.com>"]
description = "Virtual distributed filesystem engine that powers Spacedrive."
@@ -35,10 +35,10 @@ sd-ffmpeg = { path = "../crates/ffmpeg", optional = true }
sd-file-ext = { path = "../crates/file-ext" }
sd-images = { path = "../crates/images", features = ["rspc", "serde", "specta"] }
sd-media-metadata = { path = "../crates/media-metadata" }
sd-p2p = { path = "../crates/p2p", features = ["specta"] }
sd-p2p-block = { path = "../crates/p2p/crates/block" }
sd-p2p-proto = { path = "../crates/p2p/crates/proto" }
sd-p2p-tunnel = { path = "../crates/p2p/crates/tunnel" }
sd-old-p2p = { path = "../crates/old-p2p", features = ["specta"] }
sd-old-p2p-block = { path = "../crates/old-p2p/crates/block" }
sd-old-p2p-proto = { path = "../crates/old-p2p/crates/proto" }
sd-old-p2p-tunnel = { path = "../crates/old-p2p/crates/tunnel" }
sd-prisma = { path = "../crates/prisma" }
sd-sync = { path = "../crates/sync" }
sd-task-system = { path = "../crates/task-system" }

View File

@@ -6,7 +6,9 @@ edition = "2021"
[dependencies]
# Core Spacedrive Sub-crates
sd-core-sync = { path = "../sync" }
sd-core-heavy-lifting = { path = "../heavy-lifting" }
sd-core-prisma-helpers = { path = "../prisma-helpers" }
sd-core-sync = { path = "../sync" }
# Spacedrive Sub-crates
sd-actors = { path = "../../../crates/actors" }
@@ -16,6 +18,7 @@ sd-prisma = { path = "../../../crates/prisma" }
sd-utils = { path = "../../../crates/utils" }
# Workspace dependencies
anyhow = { workspace = true }
async-stream = { workspace = true }
base64 = { workspace = true }
blake3 = { workspace = true }
@@ -23,6 +26,8 @@ chrono = { workspace = true, features = ["serde"] }
flume = { workspace = true }
futures = { workspace = true }
futures-concurrency = { workspace = true }
iroh = { workspace = true, features = ["discovery-local-network"] }
quic-rpc = { workspace = true, features = ["iroh-transport", "quinn-transport"] }
rmp-serde = { workspace = true }
rspc = { workspace = true }
serde = { workspace = true, features = ["derive"] }
@@ -37,12 +42,9 @@ uuid = { workspace = true, features = ["serde"] }
zeroize = { workspace = true }
# External dependencies
anyhow = "1.0.86"
dashmap = "6.1.0"
iroh = { version = "0.29.0", features = ["discovery-local-network"] }
paste = "=1.0.15"
quic-rpc = { version = "0.17.1", features = ["iroh-transport", "quinn-transport"] }
quinn = { package = "iroh-quinn", version = "0.12" }
dashmap = "6.1.0"
paste = "=1.0.15"
quinn = { package = "iroh-quinn", version = "0.12" }
# Using whatever version of reqwest that reqwest-middleware uses, just putting here to enable some features
reqwest = { version = "0.12", features = ["json", "native-tls-vendored", "stream"] }
reqwest-middleware = { version = "0.4", features = ["json"] }

View File

@@ -1,15 +1,16 @@
use crate::p2p::{NotifyUser, UserResponse};
use sd_cloud_schema::{Client, Request, Response, ServicesALPN};
use sd_cloud_schema::{Client, Service, ServicesALPN};
use std::{net::SocketAddr, sync::Arc, time::Duration};
use futures::Stream;
use iroh::relay::RelayUrl;
use quic_rpc::{transport::quinn::QuinnConnector, RpcClient, RpcMessage};
use quic_rpc::{client::QuinnConnector, RpcClient};
use quinn::{crypto::rustls::QuicClientConfig, ClientConfig, Endpoint};
use reqwest::{IntoUrl, Url};
use reqwest_middleware::{reqwest, ClientBuilder, ClientWithMiddleware};
// use reqwest_retry::{policies::ExponentialBackoff, RetryTransientMiddleware};
use tokio::sync::{Mutex, RwLock};
use tracing::warn;
@@ -17,11 +18,13 @@ use super::{
error::Error, key_manager::KeyManager, p2p::CloudP2P, token_refresher::TokenRefresher,
};
pub type CloudServicesClient = Client<QuinnConnector<Service>>;
#[derive(Debug, Default, Clone)]
enum ClientState<In: RpcMessage, Out: RpcMessage> {
enum ClientState {
#[default]
NotConnected,
Connected(Client<QuinnConnector<In, Out>>),
Connected(CloudServicesClient),
}
/// Cloud services are a optional feature that allows you to interact with the cloud services
@@ -34,7 +37,7 @@ enum ClientState<In: RpcMessage, Out: RpcMessage> {
/// that core can always operate without the cloud services.
#[derive(Debug)]
pub struct CloudServices {
client_state: Arc<RwLock<ClientState<Response, Request>>>,
client_state: Arc<RwLock<ClientState>>,
get_cloud_api_address: Url,
http_client: ClientWithMiddleware,
domain_name: String,
@@ -157,7 +160,7 @@ impl CloudServices {
http_client: &ClientWithMiddleware,
get_cloud_api_address: Url,
domain_name: String,
) -> Result<Client<QuinnConnector<Response, Request>>, Error> {
) -> Result<CloudServicesClient, Error> {
let cloud_api_address = http_client
.get(get_cloud_api_address)
.send()
@@ -256,7 +259,7 @@ impl CloudServices {
.map_err(Error::FailedToCreateEndpoint)?;
endpoint.set_default_client_config(client_config);
Ok(Client::new(RpcClient::new(QuinnConnector::new(
Ok(Client::new(RpcClient::new(QuinnConnector::<Service>::new(
endpoint,
cloud_api_address,
domain_name,
@@ -268,7 +271,7 @@ impl CloudServices {
/// If the client is not connected, it will try to connect to the cloud services.
/// Available routes documented in
/// [`sd_cloud_schema::Service`](https://github.com/spacedriveapp/cloud-services-schema).
pub async fn client(&self) -> Result<Client<QuinnConnector<Response, Request>>, Error> {
pub async fn client(&self) -> Result<CloudServicesClient, Error> {
if let ClientState::Connected(client) = { &*self.client_state.read().await } {
return Ok(client.clone());
}

View File

@@ -177,10 +177,22 @@ pub enum Error {
ReadNonceStreamDecryption(io::Error),
#[error("Incomplete download bytes sync messages")]
IncompleteDownloadBytesSyncMessages,
#[error("Timed out while waiting to recive thumbnail data")]
ThumbnailRequestTimeout,
// Temporary errors
#[error("Device missing secret key for decrypting sync messages")]
MissingKeyHash,
#[error("Not Implemented yet")]
NotImplemented,
#[error("Device not found")]
DeviceNotFound,
#[error("Invalid CAS ID")]
InvalidCasId,
#[error("Internal Error")]
InternalError,
#[error("Remote Device Error")]
RemoteDeviceError,
}
#[derive(thiserror::Error, Debug)]

View File

@@ -8,8 +8,9 @@ use sd_cloud_schema::{
SecretKey as IrohSecretKey,
};
use sd_crypto::{CryptoRng, SeedableRng};
use sd_prisma::prisma::file_path::cas_id;
use std::{sync::Arc, time::Duration};
use std::{path::PathBuf, sync::Arc, time::Duration};
use iroh::{
discovery::{
@@ -35,6 +36,12 @@ pub struct JoinedLibraryCreateArgs {
pub description: Option<String>,
}
#[derive(Debug)]
pub struct RecivedGetThumbnailArgs {
pub cas_id: cas_id::Type,
pub error: Option<Error>,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize, specta::Type)]
#[serde(transparent)]
#[repr(transparent)]
@@ -108,6 +115,7 @@ impl CloudP2P {
dns_origin_domain: String,
dns_pkarr_url: Url,
relay_url: RelayUrl,
data_directory: PathBuf,
) -> Result<Self, Error> {
let dht_discovery = DhtDiscovery::builder()
.secret_key(iroh_secret_key.clone())
@@ -156,6 +164,7 @@ impl CloudP2P {
cloud_services,
msgs_tx.clone(),
endpoint,
data_directory,
)
.await?;
let user_response_rx = cloud_services.user_response_rx.clone();
@@ -232,6 +241,28 @@ impl CloudP2P {
.await
.expect("Channel closed");
}
/// Requests the binary of a thumbnail from a specific device endpoint
///
/// # Panics
/// Will panic if the actor channel is closed, which should never happen
pub async fn request_thumbnail_data(
&self,
device_pub_id: devices::PubId,
cas_id: cas_id::Type,
library_pub_id: libraries::PubId,
tx: oneshot::Sender<RecivedGetThumbnailArgs>,
) {
self.msgs_tx
.send_async(runner::Message::Request(runner::Request::GetThumbnail {
device_pub_id,
cas_id,
library_pub_id,
tx,
}))
.await
.expect("Channel closed");
}
}
impl Drop for CloudP2P {

View File

@@ -9,12 +9,15 @@ use sd_cloud_schema::{
CloudP2PError, Service,
},
devices::{self, Device},
libraries::{self},
sync::groups,
};
use sd_crypto::{CryptoRng, SeedableRng};
use sd_prisma::prisma::file_path::cas_id;
use std::{
collections::HashMap,
path::PathBuf,
pin::pin,
sync::{
atomic::{AtomicU64, Ordering},
@@ -27,7 +30,7 @@ use dashmap::DashMap;
use flume::SendError;
use futures::StreamExt;
use futures_concurrency::stream::Merge;
use iroh::{Endpoint, NodeId};
use iroh::{key::PublicKey, Endpoint, NodeId};
use quic_rpc::{
server::{Accepting, RpcChannel, RpcServerError},
transport::quinn::{QuinnConnector, QuinnListener},
@@ -44,7 +47,7 @@ use tracing::{debug, error, warn};
use super::{
new_sync_messages_notifier::dispatch_notifier, BasicLibraryCreationArgs, JoinSyncGroupResponse,
JoinedLibraryCreateArgs, NotifyUser, Ticket, UserResponse,
JoinedLibraryCreateArgs, NotifyUser, RecivedGetThumbnailArgs, Ticket, UserResponse,
};
const TEN_SECONDS: Duration = Duration::from_secs(10);
@@ -65,6 +68,12 @@ pub enum Request {
devices_in_group: Vec<(devices::PubId, NodeId)>,
tx: oneshot::Sender<JoinedLibraryCreateArgs>,
},
GetThumbnail {
device_pub_id: devices::PubId,
cas_id: cas_id::Type,
library_pub_id: libraries::PubId,
tx: oneshot::Sender<RecivedGetThumbnailArgs>,
},
}
/// We use internal mutability here, but don't worry because there will always be a single
@@ -85,6 +94,7 @@ pub struct Runner {
pending_sync_group_join_requests: Arc<Mutex<HashMap<Ticket, PendingSyncGroupJoin>>>,
cached_devices_per_group: HashMap<groups::PubId, (Instant, Vec<(devices::PubId, NodeId)>)>,
timeout_checker_buffer: Vec<(Ticket, PendingSyncGroupJoin)>,
data_directory: PathBuf,
}
impl Clone for Runner {
@@ -106,6 +116,7 @@ impl Clone for Runner {
cached_devices_per_group: HashMap::new(),
// This one is a temporary buffer only used for timeout checker
timeout_checker_buffer: vec![],
data_directory: self.data_directory.clone(),
}
}
}
@@ -125,6 +136,7 @@ impl Runner {
cloud_services: &CloudServices,
msgs_tx: flume::Sender<Message>,
endpoint: Endpoint,
data_directory: PathBuf,
) -> Result<Self, Error> {
Ok(Self {
current_device_pub_id,
@@ -139,6 +151,7 @@ impl Runner {
pending_sync_group_join_requests: Arc::default(),
cached_devices_per_group: HashMap::new(),
timeout_checker_buffer: vec![],
data_directory,
})
}
@@ -202,6 +215,13 @@ impl Runner {
tx,
})) => self.dispatch_join_requests(req, devices_in_group, &mut rng, tx),
StreamMessage::Message(Message::Request(Request::GetThumbnail {
device_pub_id,
cas_id,
library_pub_id,
tx,
})) => self.dispatch_get_thumbnail(device_pub_id, cas_id, library_pub_id, tx),
StreamMessage::Message(Message::RegisterSyncMessageNotifier((
group_pub_id,
notifier,
@@ -356,6 +376,161 @@ impl Runner {
});
}
#[allow(clippy::too_many_lines)]
fn dispatch_get_thumbnail(
&self,
device_pub_id: devices::PubId,
cas_id: cas_id::Type,
library_pub_id: libraries::PubId,
tx: oneshot::Sender<RecivedGetThumbnailArgs>,
) {
debug!(?device_pub_id, ?cas_id, "Received request for thumbnail");
let current_device_pub_id = self.current_device_pub_id;
let cas_id_clone = cas_id.clone();
// Put tx in an Arc to allow multiple references to it
let tx = Arc::new(Mutex::new(Some(tx)));
let device_connection = self
.cached_devices_per_group
.values()
.find(|(_, devices)| devices.iter().any(|(pub_id, _)| pub_id == &device_pub_id))
.and_then(|(_, devices)| devices.iter().find(|(pub_id, _)| pub_id == &device_pub_id))
.ok_or_else(|| {
error!("Failed to find device in the cached devices list");
// Use a clone of the channel to send the error response
let tx_clone = tx.clone();
spawn(async move {
if let Some(tx) = tx_clone.lock().await.take() {
if tx
.send(RecivedGetThumbnailArgs {
cas_id: cas_id_clone.clone(),
error: Some(Error::DeviceNotFound),
})
.is_err()
{
error!("Failed to send response to user;");
}
}
});
})
.expect("Device must be in the cached devices list");
let (_, device_connection_id) = device_connection;
debug!("Device Connection ID: {:?}", device_connection_id);
let data_dir_clone = self.data_directory.clone();
// Spawn a separate task to avoid blocking the runner
spawn({
let endpoint = self.endpoint.clone();
let device_connection_id = *device_connection_id;
let tx = tx.clone();
let cas_id_clone_clone = cas_id.clone();
async move {
// Connect to the device
let client =
match connect_to_specific_client(&endpoint, &device_connection_id).await {
Ok(client) => client,
Err(e) => {
error!(?e, "Failed to connect to device");
// Send the error through the channel
if let Some(tx) = tx.lock().await.take() {
if tx
.send(RecivedGetThumbnailArgs {
cas_id: cas_id_clone_clone,
error: Some(Error::DeviceNotFound),
})
.is_err()
{
error!("Failed to send response to user;");
}
}
return;
}
};
// Create the request
let request = cloud_p2p::get_thumbnail::Request {
cas_id: cas_id_clone_clone.clone().unwrap_or_default(),
device_pub_id: current_device_pub_id,
library_pub_id,
};
// Send the request
match client.get_thumbnail(request).await {
Ok(Ok(cloud_p2p::get_thumbnail::Response { thumbnail })) => {
debug!(?cas_id, "Successfully received thumbnail");
// Convert cas_id to a string
let cas_id_str = cas_id_clone_clone.clone().unwrap_or_default();
// If we received a thumbnail, try to save it locally
if let Some(thumbnail_data) = &thumbnail {
// Try to save the thumbnail, but don't fail if saving fails
if let Err(e) = save_remote_thumbnail(
&cas_id_str,
thumbnail_data,
data_dir_clone,
library_pub_id,
)
.await
{
error!(?e, "Failed to save remote thumbnail locally, but continuing with response");
}
}
// Send the response via the oneshot channel
if let Some(tx) = tx.lock().await.take() {
if tx
.send(RecivedGetThumbnailArgs {
cas_id: cas_id_clone_clone.clone(),
error: None,
})
.is_err()
{
error!("Failed to send thumbnail response to user");
}
}
}
Ok(Err(e)) => {
error!(?e, "Remote device returned error for thumbnail request");
// Send the error through the channel
if let Some(tx) = tx.lock().await.take() {
if tx
.send(RecivedGetThumbnailArgs {
cas_id: cas_id_clone_clone.clone(),
error: Some(Error::RemoteDeviceError),
})
.is_err()
{
error!("Failed to send response to user;");
}
}
}
Err(e) => {
error!(?e, "Failed to send thumbnail request to remote device");
// Send the error through the channel
if let Some(tx) = tx.lock().await.take() {
if tx
.send(RecivedGetThumbnailArgs {
cas_id: cas_id_clone_clone.clone(),
error: Some(Error::InternalError),
})
.is_err()
{
error!("Failed to send response to user;");
}
}
}
}
}
});
}
#[allow(clippy::too_many_lines)]
async fn handle_request(
&self,
request: cloud_p2p::Request,
@@ -432,6 +607,52 @@ impl Runner {
);
}
}
cloud_p2p::Request::GetThumbnail(req) => {
if let Err(e) = channel
.rpc(
req,
(),
|(),
cloud_p2p::get_thumbnail::Request {
cas_id,
device_pub_id,
library_pub_id,
}| async move {
debug!(
?cas_id,
"Received thumbnail request from device {:?}", device_pub_id
);
match fetch_local_thumbnail(
Some(cas_id.clone()),
self.data_directory.clone(),
library_pub_id,
)
.await
{
Ok(Some(thumbnail_data)) => {
debug!(?cas_id, "Found thumbnail locally");
Ok(cloud_p2p::get_thumbnail::Response {
thumbnail: Some(thumbnail_data),
})
}
Ok(None) => {
debug!(?cas_id, "Thumbnail not found locally");
Err(CloudP2PError::Rejected)
}
Err(e) => {
error!(?e, ?cas_id, "Error fetching thumbnail");
Err(CloudP2PError::Rejected)
}
}
},
)
.await
{
error!(?e, "Failed to send get thumbnail response;");
}
}
}
}
@@ -615,6 +836,24 @@ async fn connect_to_first_available_client(
Err(CloudP2PError::UnableToConnect)
}
async fn connect_to_specific_client(
endpoint: &Endpoint,
device_connection_id: &PublicKey,
) -> Result<Client<QuinnConnector<cloud_p2p::Response, cloud_p2p::Request>>, CloudP2PError> {
// Get the connection id by fetching using the device pub id
let connection = endpoint
.connect(*device_connection_id, CloudP2PALPN::LATEST)
.await
.map_err(|e| {
error!(?e, "Failed to connect to authorizor device candidate");
CloudP2PError::UnableToConnect
})?;
debug!(%device_connection_id, "Connected to authorizor device candidate");
Ok(Client::new(RpcClient::new(
QuinnConnector::from_connection(connection),
)))
}
fn setup_server_endpoint(
endpoint: Endpoint,
) -> (RpcServer<Service, P2PServerEndpoint>, JoinHandle<()>) {
@@ -645,3 +884,226 @@ fn setup_server_endpoint(
}),
)
}
async fn fetch_local_thumbnail(
cas_id: cas_id::Type,
data_directory: PathBuf,
library_pub_id: libraries::PubId,
) -> Result<Option<Vec<u8>>, Error> {
use tokio::fs;
use tracing::{debug, error};
debug!(?cas_id, "Fetching thumbnail from local storage");
// Convert cas_id to a string
let cas_id = cas_id.unwrap_or_default();
let cas_id = sd_core_prisma_helpers::CasId::from(cas_id);
let thumbnails_directory =
sd_core_heavy_lifting::media_processor::get_thumbnails_directory(data_directory);
// Get the shard hex for the cas_id
let shard_hex = sd_core_heavy_lifting::media_processor::get_shard_hex(&cas_id);
// First try to find the thumbnail in the specific library folder
let library_path = thumbnails_directory.join(library_pub_id.to_string());
let shard_path = library_path.join(shard_hex);
let thumbnail_path = shard_path.join(format!("{}.webp", cas_id.as_str()));
debug!("Checking for thumbnail at {:?}", thumbnail_path);
// If the thumbnail exists in the specific library folder, read it
if fs::metadata(&thumbnail_path).await.is_ok() {
match fs::read(&thumbnail_path).await {
Ok(data) => {
debug!("Found thumbnail at {:?}", thumbnail_path);
return Ok(Some(data));
}
Err(e) => {
error!(?e, "Failed to read thumbnail file");
return Err(Error::InternalError);
}
}
}
// If not found in the specific library, try the ephemeral directory
let ephemeral_dir = thumbnails_directory.join("ephemeral");
let ephemeral_shard_path = ephemeral_dir.join(shard_hex);
let ephemeral_thumbnail_path = ephemeral_shard_path.join(format!("{}.webp", cas_id.as_str()));
debug!(
"Checking for thumbnail in ephemeral at {:?}",
ephemeral_thumbnail_path
);
// If the thumbnail exists in ephemeral, read it
if fs::metadata(&ephemeral_thumbnail_path).await.is_ok() {
match fs::read(&ephemeral_thumbnail_path).await {
Ok(data) => {
debug!("Found thumbnail at {:?}", ephemeral_thumbnail_path);
return Ok(Some(data));
}
Err(e) => {
error!(?e, "Failed to read thumbnail file");
return Err(Error::InternalError);
}
}
}
// If we still don't have the thumbnail, search all library folders as a fallback
// This is to handle cases where the library ID might have changed
let Ok(mut directories) = fs::read_dir(&thumbnails_directory).await else {
debug!("No thumbnails directory found");
return Ok(None);
};
// Try to find the thumbnail in any other library directories
while let Ok(Some(entry)) = directories.next_entry().await {
let dir_path = entry.path();
// Skip files and already checked directories
if !dir_path.is_dir() || dir_path == library_path || dir_path == ephemeral_dir {
continue;
}
// Check if thumbnail exists in this directory
let other_shard_path = dir_path.join(shard_hex);
let other_thumbnail_path = other_shard_path.join(format!("{}.webp", cas_id.as_str()));
debug!("Checking for thumbnail at {:?}", other_thumbnail_path);
if fs::metadata(&other_thumbnail_path).await.is_ok() {
match fs::read(&other_thumbnail_path).await {
Ok(data) => {
debug!("Found thumbnail at {:?}", other_thumbnail_path);
return Ok(Some(data));
}
Err(e) => {
error!(?e, "Failed to read thumbnail file");
return Err(Error::InternalError);
}
}
}
}
// If we get here, the thumbnail doesn't exist anywhere
debug!("Thumbnail not found for {}", cas_id.as_str());
Ok(None)
}
async fn save_remote_thumbnail(
cas_id: &str,
thumbnail_data: &[u8],
data_directory: PathBuf,
library_pub_id: libraries::PubId,
) -> Result<PathBuf, Error> {
use tokio::fs;
use tracing::{debug, error};
debug!(?cas_id, "Saving remote thumbnail to local storage");
// Convert to CasId for path computation
let cas_id = sd_core_prisma_helpers::CasId::from(cas_id);
// Get the thumbnails directory
let thumbnails_directory =
sd_core_heavy_lifting::media_processor::get_thumbnails_directory(data_directory);
let library_dir = thumbnails_directory.join(library_pub_id.to_string());
// Get the shard hex for organizing thumbnails
let shard_hex = sd_core_heavy_lifting::media_processor::get_shard_hex(&cas_id);
// Create the full directory path
let shard_dir = library_dir.join(shard_hex);
// Create the directories if they don't exist
if let Err(e) = fs::create_dir_all(&shard_dir).await {
error!(?e, "Failed to create thumbnail directory structure in library folder, falling back to ephemeral");
// If we can't create in library folder, fall back to ephemeral
let ephemeral_dir = thumbnails_directory.join("ephemeral");
let ephemeral_shard_dir = ephemeral_dir.join(shard_hex);
if let Err(e) = fs::create_dir_all(&ephemeral_shard_dir).await {
error!(
?e,
"Failed to create thumbnail directory structure in ephemeral folder"
);
return Err(Error::InternalError);
}
// Create the full path for the thumbnail in ephemeral
let thumbnail_path = ephemeral_shard_dir.join(format!("{}.webp", cas_id.as_str()));
// Write the thumbnail data to disk
match fs::write(&thumbnail_path, thumbnail_data).await {
Ok(()) => {
debug!(
"Successfully saved remote thumbnail to ephemeral: {:?}",
thumbnail_path
);
return Ok(thumbnail_path);
}
Err(e) => {
error!(
?e,
"Failed to write remote thumbnail to disk in ephemeral folder"
);
return Err(Error::InternalError);
}
}
}
// Create the full path for the thumbnail in the library folder
let thumbnail_path = shard_dir.join(format!("{}.webp", cas_id.as_str()));
// Write the thumbnail data to disk
match fs::write(&thumbnail_path, thumbnail_data).await {
Ok(()) => {
debug!(
"Successfully saved remote thumbnail to library folder: {:?}",
thumbnail_path
);
Ok(thumbnail_path)
}
Err(e) => {
error!(
?e,
"Failed to write remote thumbnail to disk in library folder"
);
// If writing to library folder fails, try ephemeral as a fallback
let ephemeral_dir = thumbnails_directory.join("ephemeral");
let ephemeral_shard_dir = ephemeral_dir.join(shard_hex);
if let Err(e) = fs::create_dir_all(&ephemeral_shard_dir).await {
error!(
?e,
"Failed to create thumbnail directory structure in ephemeral folder"
);
return Err(Error::InternalError);
}
let ephemeral_thumbnail_path =
ephemeral_shard_dir.join(format!("{}.webp", cas_id.as_str()));
match fs::write(&ephemeral_thumbnail_path, thumbnail_data).await {
Ok(()) => {
debug!(
"Successfully saved remote thumbnail to ephemeral fallback: {:?}",
ephemeral_thumbnail_path
);
Ok(ephemeral_thumbnail_path)
}
Err(e) => {
error!(
?e,
"Failed to write remote thumbnail to disk in ephemeral fallback folder"
);
Err(Error::InternalError)
}
}
}
}
}

View File

@@ -36,7 +36,7 @@ use tracing::{debug, instrument, trace, Level};
#[derive(thiserror::Error, Debug, Serialize, Deserialize, Type, Clone)]
pub enum NonCriticalMediaDataExtractorError {
#[error("failed to extract media data from <file='{}'>: {1}", .0.display())]
#[error("failed to extract media data from <file='{path}'>: {1}", path = .0.display())]
FailedToExtractImageMediaData(PathBuf, String),
#[error("file path missing object id: <file_path_id='{0}'>")]
FilePathMissingObjectId(file_path::id::Type),

View File

@@ -211,17 +211,17 @@ pub enum NonCriticalThumbnailerError {
MissingCasId(file_path::id::Type),
#[error("failed to extract isolated file path data from file path <id='{0}'>: {1}")]
FailedToExtractIsolatedFilePathData(file_path::id::Type, String),
#[error("failed to generate video file thumbnail <path='{}'>: {1}", .0.display())]
#[error("failed to generate video file thumbnail <path='{path}'>: {1}", path = .0.display())]
VideoThumbnailGenerationFailed(PathBuf, String),
#[error("failed to format image <path='{}'>: {1}", .0.display())]
#[error("failed to format image <path='{path}'>: {1}", path = .0.display())]
FormatImage(PathBuf, String),
#[error("failed to encode webp image <path='{}'>: {1}", .0.display())]
#[error("failed to encode webp image <path='{path}'>: {1}", path = .0.display())]
WebPEncoding(PathBuf, String),
#[error("processing thread panicked while generating thumbnail from <path='{}'>: {1}", .0.display())]
#[error("processing thread panicked while generating thumbnail from <path='{path}'>: {1}", path = .0.display())]
PanicWhileGeneratingThumbnail(PathBuf, String),
#[error("failed to create shard directory for thumbnail: {0}")]
CreateShardDirectory(String),
#[error("failed to save thumbnail <path='{}'>: {1}", .0.display())]
#[error("failed to save thumbnail <path='{path}'>: {1}", path = .0.display())]
SaveThumbnail(PathBuf, String),
#[error("task timed out: {0}")]
TaskTimeout(TaskId),

View File

@@ -0,0 +1,70 @@
[package]
edition = "2021"
name = "sd-core-p2p"
version = "0.1.0"
[lints.rust]
# Warns
deprecated = "warn"
rust_2018_idioms = { level = "warn", priority = -1 }
trivial_casts = "warn"
trivial_numeric_casts = "warn"
unused_allocation = "warn"
unused_qualifications = "warn"
# Forbids
deprecated_in_future = "forbid"
[lints.clippy]
# Warns
all = { level = "warn", priority = -1 }
cast_lossless = "warn"
cast_possible_truncation = "warn"
cast_possible_wrap = "warn"
cast_precision_loss = "warn"
cast_sign_loss = "warn"
complexity = { level = "warn", priority = -1 }
correctness = { level = "warn", priority = -1 }
dbg_macro = "warn"
deprecated_cfg_attr = "warn"
nursery = { level = "warn", priority = -1 }
pedantic = { level = "warn", priority = -1 }
perf = { level = "warn", priority = -1 }
separated_literal_suffix = "warn"
style = { level = "warn", priority = -1 }
suspicious = { level = "warn", priority = -1 }
unnecessary_cast = "warn"
unwrap_used = "warn"
# Allows
missing_errors_doc = "allow"
module_name_repetitions = "allow"
[dependencies]
# Core Spacedrive Sub-crates
sd-core-cloud-services = { path = "../cloud-services" }
# Spacedrive Sub-crates
sd-cloud-schema = { workspace = true }
sd-crypto = { path = "../../../crates/crypto" }
# Workspace dependencies
anyhow = { workspace = true }
async-stream = { workspace = true }
flume = { workspace = true }
futures = { workspace = true }
futures-concurrency = { workspace = true }
iroh = { workspace = true, features = ["discovery-local-network"] }
quic-rpc = { workspace = true, features = ["iroh-transport"] }
serde = { workspace = true, features = ["derive"] }
specta = { workspace = true, features = ["chrono", "uuid"] }
thiserror = { workspace = true }
tokio = { workspace = true, features = ["sync"] }
tokio-util = { workspace = true }
tracing = { workspace = true }
# External dependencies
derive_more = { version = "1.0", features = ["display", "from", "try_into"] }
nested_enum_utils = "0.1"
postcard = { version = "1.1", features = ["use-std"] }
quic-rpc-derive = "0.17"
url = "2.5"

View File

@@ -0,0 +1,24 @@
use std::io;
#[derive(Debug, thiserror::Error)]
pub enum Error {
// Network setup errors
#[error("Setup iroh endpoint: {0}")]
SetupEndpoint(anyhow::Error),
#[error("Setup iroh listener: {0}")]
SetupListener(io::Error),
#[error("Initialize LocalSwarmDiscovery: {0}")]
LocalSwarmDiscoveryInit(anyhow::Error),
#[error("Initialize DhtDiscovery: {0}")]
DhtDiscoveryInit(anyhow::Error),
// Known hosts loading errors
#[error("Serialize known devices: {0}")]
SerializeKnownDevices(postcard::Error),
#[error("Deserialize known devices: {0}")]
DeserializeKnownDevices(postcard::Error),
#[error("Load known devices from file: {0}")]
LoadKnownDevices(io::Error),
#[error("Save known devices to file: {0}")]
SaveKnownDevices(io::Error),
}

162
core/crates/p2p/src/lib.rs Normal file
View File

@@ -0,0 +1,162 @@
use sd_core_cloud_services::CloudServices;
use sd_cloud_schema::devices;
use sd_crypto::CryptoRng;
use std::{path::Path, sync::Arc};
use iroh::{
discovery::{
dns::DnsDiscovery, local_swarm_discovery::LocalSwarmDiscovery, pkarr::dht::DhtDiscovery,
ConcurrentDiscovery,
},
key::SecretKey,
Endpoint, NodeId, RelayMap, RelayMode, RelayUrl,
};
use quic_rpc::{server::IrohListener, RpcServer};
use tokio::{
fs, io,
sync::{oneshot, RwLock},
};
use url::Url;
mod error;
mod schema;
mod server;
use server::Server;
pub use error::Error;
const KNOWN_DEVICES_FILE_NAME: &str = "known_devices.bin";
#[derive(Debug, Clone)]
pub struct P2P {
current_device_pub_id: devices::PubId,
known_devices_file_path: Arc<Box<Path>>,
endpoint: Endpoint,
cloud_services: Arc<RwLock<Option<CloudServices>>>,
known_devices: Arc<RwLock<Vec<NodeId>>>,
cancel_tx: flume::Sender<oneshot::Sender<()>>,
}
impl P2P {
pub async fn new(
data_directory: impl AsRef<Path> + Send,
current_device_pub_id: devices::PubId,
rng: CryptoRng,
iroh_secret_key: SecretKey,
dns_origin_domain: String,
dns_pkarr_url: Url,
relay_url: RelayUrl,
) -> Result<Self, Error> {
async fn inner(
data_directory: &Path,
current_device_pub_id: devices::PubId,
rng: CryptoRng,
iroh_secret_key: SecretKey,
dns_origin_domain: String,
dns_pkarr_url: Url,
relay_url: RelayUrl,
) -> Result<P2P, Error> {
let endpoint = Endpoint::builder()
.alpns(vec![schema::ALPN::LATEST.to_vec()])
.discovery(Box::new(ConcurrentDiscovery::from_services(vec![
Box::new(DnsDiscovery::new(dns_origin_domain)),
Box::new(
LocalSwarmDiscovery::new(iroh_secret_key.public())
.map_err(Error::LocalSwarmDiscoveryInit)?,
),
Box::new(
DhtDiscovery::builder()
.secret_key(iroh_secret_key.clone())
.pkarr_relay(dns_pkarr_url)
.build()
.map_err(Error::DhtDiscoveryInit)?,
),
])))
.secret_key(iroh_secret_key)
.relay_mode(RelayMode::Custom(RelayMap::from_url(relay_url)))
.bind()
.await
.map_err(Error::SetupEndpoint)?;
let (cancel_tx, cancel_rx) = flume::bounded(1);
let known_devices_file_path = data_directory
.join(KNOWN_DEVICES_FILE_NAME)
.into_boxed_path();
let known_devices = Arc::new(RwLock::new(
P2P::load_known_devices(&known_devices_file_path).await?,
));
let cloud_services = Arc::default();
Server::new(
current_device_pub_id,
Arc::clone(&cloud_services),
Arc::clone(&known_devices),
)
.dispatch(
RpcServer::new(
IrohListener::<schema::Service>::new(endpoint.clone())
.map_err(Error::SetupListener)?,
),
cancel_rx,
);
Ok(P2P {
current_device_pub_id,
endpoint,
cloud_services,
known_devices,
known_devices_file_path: Arc::new(known_devices_file_path),
cancel_tx,
})
}
inner(
data_directory.as_ref(),
current_device_pub_id,
rng,
iroh_secret_key,
dns_origin_domain,
dns_pkarr_url,
relay_url,
)
.await
}
async fn load_known_devices(
known_devices_file_path: impl AsRef<Path> + Send,
) -> Result<Vec<NodeId>, Error> {
async fn inner(known_devices_file_path: &Path) -> Result<Vec<NodeId>, Error> {
match fs::read(known_devices_file_path).await {
Ok(data) => postcard::from_bytes(&data).map_err(Error::DeserializeKnownDevices),
Err(e) if e.kind() == io::ErrorKind::NotFound => Ok(Vec::new()),
Err(e) => Err(Error::LoadKnownDevices(e)),
}
}
inner(known_devices_file_path.as_ref()).await
}
pub async fn set_cloud_services(&self, cloud_services: CloudServices) {
self.cloud_services.write().await.replace(cloud_services);
}
pub async fn shutdown(&self) -> Result<(), Error> {
let (tx, rx) = oneshot::channel();
self.cancel_tx.send_async(tx).await.unwrap();
rx.await.unwrap();
fs::write(
self.known_devices_file_path.as_ref(),
&postcard::to_stdvec(&*self.known_devices.read().await)
.map_err(Error::SerializeKnownDevices)?,
)
.await
.map_err(Error::SaveKnownDevices)
}
}

View File

@@ -0,0 +1,22 @@
use sd_cloud_schema::{devices, libraries, sync::groups};
use serde::{Deserialize, Serialize};
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub struct Request {
pub sync_group: groups::GroupWithDevices,
pub asking_device: devices::Device,
}
#[derive(Debug, PartialEq, Eq, Serialize, Deserialize)]
pub enum Response {
Accepted {
authorizor_device: devices::Device,
keys: Vec<Vec<u8>>,
library_pub_id: libraries::PubId,
library_name: String,
library_description: Option<String>,
},
Rejected,
TimedOut,
}

View File

@@ -0,0 +1,25 @@
use crate::schema::Service;
use nested_enum_utils::enum_conversions;
use serde::{Deserialize, Serialize};
pub mod authorize_new_device_in_sync_group;
pub mod notify_new_sync_messages;
#[allow(clippy::large_enum_variant)]
#[nested_enum_utils::enum_conversions(super::Request)]
#[derive(Debug, Serialize, Deserialize)]
#[quic_rpc_derive::rpc_requests(Service)]
pub enum Request {
#[rpc(response = authorize_new_device_in_sync_group::Response)]
AuthorizeNewDeviceInSyncGroup(authorize_new_device_in_sync_group::Request),
#[rpc(response = notify_new_sync_messages::Response)]
NotifyNewSyncMessages(notify_new_sync_messages::Request),
}
#[derive(Debug, Serialize, Deserialize)]
#[enum_conversions(super::Response)]
pub enum Response {
AuthorizeNewDeviceInSyncGroup(authorize_new_device_in_sync_group::Response),
NotifyNewSyncMessages(notify_new_sync_messages::Response),
}

View File

@@ -0,0 +1,12 @@
use sd_cloud_schema::{devices, sync::groups};
use serde::{Deserialize, Serialize};
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub struct Request {
pub sync_group_pub_id: groups::PubId,
pub device_pub_id: devices::PubId,
}
#[derive(Debug, Serialize, Deserialize)]
pub struct Response;

View File

@@ -0,0 +1,36 @@
use serde::{Deserialize, Serialize};
pub mod cloud_services;
/// ALPN for the Spacedrive P2P protocol
///
/// P2P with associated constants for each existing version and an alias for the latest version.
/// This application layer protocol is used when a cloud service needs to devices communicating
/// with each other, like for sending sync keys, or other strictly P2P features, like Spacedrop.
pub struct ALPN;
impl ALPN {
pub const LATEST: &'static [u8] = Self::V1;
pub const V1: &'static [u8] = b"sd-p2p/v1";
}
#[derive(Debug, Clone)]
pub struct Service;
impl quic_rpc::Service for Service {
type Req = Request;
type Res = Response;
}
#[nested_enum_utils::enum_conversions]
#[derive(Debug, Serialize, Deserialize)]
pub enum Request {
CloudServices(cloud_services::Request),
}
#[nested_enum_utils::enum_conversions]
#[derive(Debug, Serialize, Deserialize)]
pub enum Response {
CloudServices(cloud_services::Response),
}

View File

@@ -0,0 +1,191 @@
use sd_core_cloud_services::CloudServices;
use sd_cloud_schema::devices;
use std::{pin::pin, sync::Arc, time::Duration};
use anyhow::Context as _;
use futures::StreamExt as _;
use futures_concurrency::stream::Merge as _;
use iroh::NodeId;
use quic_rpc::{
server::{Accepting, RpcServerError},
Listener, RpcServer,
};
use tokio::{
spawn,
sync::{oneshot, RwLock},
task::JoinError,
time::timeout,
};
use tracing::{error, info, warn};
use super::schema;
mod router;
const SHUTDOWN_TIMEOUT: Duration = Duration::from_secs(20);
#[derive(Debug, Clone)]
pub struct Server {
current_device_pub_id: devices::PubId,
cloud_services: Arc<RwLock<Option<CloudServices>>>,
known_devices: Arc<RwLock<Vec<NodeId>>>,
}
impl Server {
pub fn new(
current_device_pub_id: devices::PubId,
cloud_services: Arc<RwLock<Option<CloudServices>>>,
known_devices: Arc<RwLock<Vec<NodeId>>>,
) -> Self {
Self {
current_device_pub_id,
cloud_services,
known_devices,
}
}
async fn handle_single_request(
self,
accepting: Accepting<schema::Service, impl Listener<schema::Service>>,
out_tx: flume::Sender<Result<anyhow::Result<()>, JoinError>>,
) {
async fn inner(
server: Server,
accepting: Accepting<schema::Service, impl Listener<schema::Service>>,
) -> anyhow::Result<()> {
let (req, chan) = accepting
.read_first()
.await
.context("Failed to receive request")?;
router::handle(server, req, chan).await
}
// Running on a detached task to avoid panicking the main task
let res = spawn(inner(self, accepting)).await;
out_tx.send_async(res).await.expect("channel never closes");
}
pub fn dispatch(
self,
rpc_server: RpcServer<schema::Service, impl Listener<schema::Service>>,
cancel_rx: flume::Receiver<oneshot::Sender<()>>,
) {
spawn({
async move {
loop {
info!("Starting P2P Server");
if let Err(e) =
spawn(self.clone().run_loop(rpc_server.clone(), cancel_rx.clone())).await
{
if e.is_panic() {
error!(?e, "P2P Server crashed, restarting...");
} else {
break;
}
}
}
}
});
}
async fn run_loop(
self,
rpc_server: RpcServer<schema::Service, impl Listener<schema::Service>>,
cancel_rx: flume::Receiver<oneshot::Sender<()>>,
) {
enum StreamMessage<Listener: quic_rpc::Listener<schema::Service>> {
AcceptResult(Result<Accepting<schema::Service, Listener>, RpcServerError<Listener>>),
RequestOutcome(Result<anyhow::Result<()>, JoinError>),
Shutdown(oneshot::Sender<()>),
}
let (out_tx, out_rx) = flume::bounded(32);
let mut msg_stream = pin!((
async_stream::stream! {
loop {
yield StreamMessage::AcceptResult(rpc_server.accept().await);
}
},
cancel_rx.stream().map(StreamMessage::Shutdown),
out_rx.stream().map(StreamMessage::RequestOutcome)
)
.merge());
let mut inflight_count = 0u32;
info!("P2P listening for connections...");
while let Some(msg) = msg_stream.next().await {
match msg {
StreamMessage::AcceptResult(Ok(accepting)) => {
spawn(
self.clone()
.handle_single_request(accepting, out_tx.clone()),
);
inflight_count += 1;
}
StreamMessage::AcceptResult(Err(e)) => {
error!(?e, "Failed to accept request;");
}
StreamMessage::RequestOutcome(out) => {
process_request_outcome(out);
inflight_count -= 1;
}
StreamMessage::Shutdown(tx) => {
// Received an Interrupt signal, which means the user wants to stop the server,
// so we wait for all inflight requests to finish before exiting
// this way we're doing a graceful shutdown
let wait_all_to_finish = async {
while inflight_count > 0 {
process_request_outcome(
// SAFETY: channel never closes
out_rx.recv_async().await.expect("channel never closes"),
);
inflight_count -= 1;
}
};
if let Err(elapsed) = timeout(SHUTDOWN_TIMEOUT, wait_all_to_finish).await {
warn!(?elapsed, %inflight_count, "Server graceful shutdown timed out");
} else {
info!("Server graceful shutdown complete!");
}
if tx.send(()).is_err() {
warn!("Failed to send P2P shutdown completion response;");
}
break;
}
}
}
}
}
fn process_request_outcome(out: Result<anyhow::Result<()>, JoinError>) {
match out {
Ok(Err(e)) => {
error!(?e, "Failed to handle request;");
}
Err(e) if e.is_panic() => {
if let Some(msg) = e.into_panic().downcast_ref::<&str>() {
error!(?msg, "Panic in request handler!");
} else {
error!("Some unknown panic in request handler!");
}
}
Ok(Ok(())) | Err(_) => {
// The request was handled successfully, or the JoinHandle was aborted,
// which can't happen because we don't even have the handle, so...
// ...
// Everything is Awesome!
}
}
}

View File

@@ -0,0 +1,47 @@
use crate::{
schema::{
self,
cloud_services::{self, authorize_new_device_in_sync_group, notify_new_sync_messages},
},
server::Server,
};
use anyhow::Context as _;
use quic_rpc::{server::RpcChannel, Listener};
pub async fn router(
server: Server,
request: cloud_services::Request,
chan: RpcChannel<schema::Service, impl Listener<schema::Service>>,
) -> anyhow::Result<()> {
match request {
cloud_services::Request::AuthorizeNewDeviceInSyncGroup(req) => {
chan.rpc(req, server, authorize_new_device_in_sync_group)
.await
}
cloud_services::Request::NotifyNewSyncMessages(req) => {
chan.rpc(req, server, notify_new_sync_messages).await
}
}
.context("Failed to handle cloud services request")
}
async fn authorize_new_device_in_sync_group(
server: Server,
authorize_new_device_in_sync_group::Request {
sync_group,
asking_device,
}: authorize_new_device_in_sync_group::Request,
) -> authorize_new_device_in_sync_group::Response {
todo!()
}
async fn notify_new_sync_messages(
server: Server,
notify_new_sync_messages::Request {
sync_group_pub_id,
device_pub_id,
}: notify_new_sync_messages::Request,
) -> notify_new_sync_messages::Response {
todo!()
}

View File

@@ -0,0 +1,19 @@
use anyhow::Context;
use quic_rpc::{server::RpcChannel, Listener};
use crate::schema;
use super::Server;
mod cloud_services;
pub async fn handle(
server: Server,
request: schema::Request,
chan: RpcChannel<schema::Service, impl Listener<schema::Service>>,
) -> anyhow::Result<()> {
match request {
schema::Request::CloudServices(req) => cloud_services::router(server, req, chan).await,
}
.context("Failed to handle p2p request")
}

View File

@@ -1,5 +1,4 @@
use crate::{
invalidate_query,
library::LibraryManagerError,
node::{config::NodeConfig, HardwareModel},
Node,
@@ -14,6 +13,7 @@ use sd_cloud_schema::{
users, Client, Request, Response, SecretKey as IrohSecretKey,
};
use sd_crypto::{CryptoRng, SeedableRng};
use sd_prisma::prisma::{location, SortOrder};
use sd_utils::error::report_error;
use std::pin::pin;
@@ -24,12 +24,13 @@ use futures_concurrency::future::TryJoin;
use rspc::alpha::AlphaRouter;
use tracing::{debug, error, instrument};
use super::{Ctx, R};
use super::{utils::library, Ctx, R};
mod devices;
mod libraries;
mod locations;
mod sync_groups;
mod thumbnails;
async fn try_get_cloud_services_client(
node: &Node,
@@ -46,9 +47,11 @@ pub(crate) fn mount() -> AlphaRouter<Ctx> {
.merge("locations.", locations::mount())
.merge("devices.", devices::mount())
.merge("syncGroups.", sync_groups::mount())
.merge("thumbnails.", thumbnails::mount())
.procedure("bootstrap", {
R.mutation(
|node, (access_token, refresh_token): (auth::AccessToken, auth::RefreshToken)| async move {
R.with2(library()).mutation(
|(node, library),
(access_token, refresh_token): (auth::AccessToken, auth::RefreshToken)| async move {
use sd_cloud_schema::devices;
// Only allow a single bootstrap request in flight at a time
@@ -133,7 +136,7 @@ pub(crate) fn mount() -> AlphaRouter<Ctx> {
debug!("Device hello successful");
KeyManager::load(master_key, data_directory).await?
KeyManager::load(master_key, data_directory.clone()).await?
}
Err(Error::Client(ClientSideError::NotFound(_))) => {
// Device not registered, we execute a device register flow
@@ -162,7 +165,7 @@ pub(crate) fn mount() -> AlphaRouter<Ctx> {
debug!("Device registered successfully");
KeyManager::new(master_key, iroh_secret_key, data_directory, &mut rng)
KeyManager::new(master_key, iroh_secret_key, data_directory.clone(), &mut rng)
.await?
}
Err(e) => return Err(e.into()),
@@ -182,6 +185,7 @@ pub(crate) fn mount() -> AlphaRouter<Ctx> {
node.cloud_services.cloud_p2p_dns_origin_name.clone(),
node.cloud_services.cloud_p2p_dns_pkarr_url.clone(),
node.cloud_services.cloud_p2p_relay_url.clone(),
data_directory.clone(),
)
.await?,
)
@@ -231,6 +235,76 @@ pub(crate) fn mount() -> AlphaRouter<Ctx> {
.try_join()
.await?;
// If locations are not saved in the cloud, we need to save them
// Get locations from library db
let locations = library
.db
.location()
.find_many(vec![])
.order_by(location::date_created::order(SortOrder::Desc))
.exec()
.await?;
let library_pub_id = sd_cloud_schema::libraries::PubId(library.id);
// Fetch locations from cloud
let sd_cloud_schema::locations::list::Response(cloud_locations) = handle_comm_error(
client
.locations()
.list(sd_cloud_schema::locations::list::Request {
access_token: node
.cloud_services
.token_refresher
.get_access_token()
.await?,
library_pub_id,
with_library: true,
with_device: true,
})
.await,
"Failed to list locations on bootstrap",
)??;
// Save locations that are not in the cloud
for location in locations {
let location_uuid = uuid::Uuid::from_slice(&location.pub_id).unwrap();
debug!(
location_id = %location_uuid,
"Processing location during bootstrap"
);
if !cloud_locations.iter().any(|l| l.pub_id.0 == location_uuid) {
debug!(
location_id = %location_uuid,
location_name = %location.name.clone().unwrap_or_else(|| format!("Location {}", location.id)),
"Creating location in cloud during bootstrap"
);
handle_comm_error(
client
.locations()
.create(sd_cloud_schema::locations::create::Request {
access_token: node
.cloud_services
.token_refresher
.get_access_token()
.await?,
pub_id: sd_cloud_schema::locations::PubId(location_uuid),
name: location.name.clone().unwrap_or_else(|| format!("Location {}", location.id)),
library_pub_id,
device_pub_id: node.config.get().await.id.into(),
})
.await,
"Failed to create location on bootstrap",
)?;
} else {
debug!(
location_id = %location_uuid,
"Location already exists in cloud, skipping creation"
);
}
}
*has_bootstrapped_lock = true;
Ok(())

View File

@@ -0,0 +1,61 @@
use crate::api::{Ctx, R};
use sd_cloud_schema::{devices, libraries};
use sd_prisma::prisma::file_path::cas_id;
use futures::FutureExt;
use futures_concurrency::future::TryJoin;
use rspc::alpha::AlphaRouter;
use serde::Deserialize;
use tokio::sync::oneshot;
use tracing::{debug, error};
pub fn mount() -> AlphaRouter<Ctx> {
R.router().procedure("get", {
#[derive(Deserialize, specta::Type)]
struct CloudThumbnailRequestArgs {
device_pub_id: devices::PubId,
library_pub_id: libraries::PubId,
cas_id: cas_id::Type,
}
R.mutation(
|node,
CloudThumbnailRequestArgs {
device_pub_id,
library_pub_id,
cas_id,
}: CloudThumbnailRequestArgs| async move {
let ((client, access_token), cloud_p2p) = (
super::get_client_and_access_token(&node),
node.cloud_services
.cloud_p2p()
.map(|res| res.map_err(Into::into)),
)
.try_join()
.await?;
let (tx, rx) = oneshot::channel();
cloud_p2p
.request_thumbnail_data(device_pub_id, cas_id, library_pub_id, tx)
.await;
// Log rx output
let out = rx.await;
let out = out.map_err(|e| {
error!(?e, "Failed to receive thumbnail data");
rspc::Error::new(
rspc::ErrorCode::InternalServerError,
String::from("Failed to receive thumbnail data"),
)
})?;
debug!(?out, "Received thumbnail data");
Ok(())
},
)
})
}

View File

@@ -4,7 +4,7 @@ use once_cell::sync::Lazy;
use rspc::{alpha::AlphaRouter, ErrorCode};
use sd_crypto::cookie::CookieCipher;
use serde_json::{json, Map, Value};
use std::path::{Path, PathBuf};
use std::path::Path;
use std::sync::Arc;
use tokio::io::AsyncWriteExt;
use tokio::sync::RwLock;
@@ -136,6 +136,12 @@ pub(crate) fn mount() -> AlphaRouter<Ctx> {
}
let base_dir = node.config.data_directory();
// Remove /dev from Path
let base_dir = if base_dir.ends_with("dev") {
base_dir.parent().unwrap_or(&base_dir).to_path_buf()
} else {
base_dir
};
// let path = sanitize_path(&base_dir, Path::new(".sdks"))?;
let path = base_dir.join(".sdks");
let data = read_file(&path).await?;

View File

@@ -10,7 +10,7 @@ use crate::{
use sd_core_heavy_lifting::JobId;
use sd_file_ext::kind::ObjectKind;
use sd_p2p::RemoteIdentity;
use sd_old_p2p::RemoteIdentity;
use sd_prisma::prisma::{file_path, indexer_rule, object, object_kind_statistics, statistics};
use sd_utils::{db::size_in_bytes_from_db, u64_to_frontend};

View File

@@ -5,7 +5,7 @@ use crate::{
relink_location, scan_location, scan_location_sub_path, LocationCreateArgs, LocationError,
LocationUpdateArgs, ScanState,
},
p2p::PeerMetadata,
old_p2p::PeerMetadata,
util::AbortOnDrop,
};

View File

@@ -13,7 +13,7 @@ use sd_core_heavy_lifting::media_processor::ThumbKey;
use sd_core_sync::DevicePubId;
use sd_cloud_schema::devices::DeviceOS;
use sd_p2p::RemoteIdentity;
use sd_old_p2p::RemoteIdentity;
use sd_prisma::prisma::file_path;
use std::sync::Arc;

View File

@@ -1,6 +1,9 @@
use crate::p2p::{operations, ConnectionMethod, DiscoveryMethod, Header, P2PEvent, PeerMetadata};
//! DEPRICATED FOR NEW SYSTEM. DO NOT USE THIS API
use crate::old_p2p::{
operations, ConnectionMethod, DiscoveryMethod, Header, P2PEvent, PeerMetadata,
};
use sd_p2p::{PeerConnectionCandidate, RemoteIdentity};
use sd_old_p2p::{PeerConnectionCandidate, RemoteIdentity};
use rspc::{alpha::AlphaRouter, ErrorCode};
use serde::Deserialize;

View File

@@ -1,7 +1,7 @@
use crate::{
api::{utils::InvalidateOperationEvent, CoreEvent},
library::Library,
p2p::operations::{self, request_file},
old_p2p::operations::{self, request_file},
util::InfallibleResponse,
Node,
};
@@ -11,8 +11,8 @@ use sd_core_heavy_lifting::media_processor::WEBP_EXTENSION;
use sd_core_prisma_helpers::file_path_to_handle_custom_uri;
use sd_file_ext::text::is_text;
use sd_p2p::{RemoteIdentity, P2P};
use sd_p2p_block::Range;
use sd_old_p2p::{RemoteIdentity, P2P};
use sd_old_p2p_block::Range;
use sd_prisma::prisma::{file_path, location};
use sd_utils::db::maybe_missing;
use tokio_util::sync::PollSender;

View File

@@ -46,7 +46,7 @@ pub(crate) mod node;
pub(crate) mod notifications;
pub(crate) mod object;
pub(crate) mod old_job;
pub(crate) mod p2p;
pub(crate) mod old_p2p;
pub(crate) mod preferences;
#[doc(hidden)] // TODO(@Oscar): Make this private when breaking out `utils` into `sd-utils`
pub mod util;
@@ -66,7 +66,7 @@ pub struct Node {
pub libraries: Arc<library::Libraries>,
pub volumes: Arc<volume::Volumes>,
pub locations: location::Locations,
pub p2p: Arc<p2p::P2PManager>,
pub p2p: Arc<old_p2p::P2PManager>,
pub event_bus: (broadcast::Sender<CoreEvent>, broadcast::Receiver<CoreEvent>),
pub notifications: Notifications,
pub task_system: TaskSystem<sd_core_heavy_lifting::Error>,
@@ -151,7 +151,7 @@ impl Node {
let task_system = TaskSystem::new();
let (p2p, start_p2p) = p2p::P2PManager::new(config.clone(), libraries.clone())
let (p2p, start_p2p) = old_p2p::P2PManager::new(config.clone(), libraries.clone())
.await
.map_err(NodeError::P2PManager)?;
@@ -208,7 +208,7 @@ impl Node {
.map_err(|e| {
FileIOError::from((sdks_file.clone(), e, "Failed to create .sdks file"))
})
.map_err(|e| NodeError::FileIO(e))?;
.map_err(NodeError::FileIO)?;
}
let router = api::mount();

View File

@@ -3,7 +3,7 @@ use crate::{
util::version_manager::{Kind, ManagedVersion, VersionManager, VersionManagerError},
};
use sd_p2p::{Identity, RemoteIdentity};
use sd_old_p2p::{Identity, RemoteIdentity};
use sd_prisma::prisma::{file_path, indexer_rule, instance, location, PrismaClient};
use sd_utils::{db::maybe_missing, error::FileIOError};

View File

@@ -9,7 +9,7 @@ use sd_core_sync::{backfill::backfill_operations, SyncManager};
use sd_actors::ActorsCollection;
use sd_cloud_schema::sync::groups;
use sd_crypto::{CryptoRng, SeedableRng};
use sd_p2p::Identity;
use sd_old_p2p::Identity;
use sd_prisma::prisma::{file_path, location, PrismaClient};
use sd_utils::{db::maybe_missing, error::FileIOError};

View File

@@ -3,7 +3,7 @@ use crate::{library::LibraryConfigError, location::LocationManagerError, volume}
use sd_core_indexer_rules::seed::SeederError;
use sd_core_sync::DevicePubId;
use sd_p2p::IdentityErr;
use sd_old_p2p::IdentityErr;
use sd_utils::{
db::{self, MissingFieldError},
error::{FileIOError, NonUtf8PathError},

View File

@@ -3,14 +3,14 @@ use crate::{
invalidate_query,
location::metadata::{LocationMetadataError, SpacedriveLocationMetadataFile},
object::tag,
p2p,
old_p2p,
util::{mpscrr, MaybeUndefined},
Node,
};
use sd_core_sync::{DevicePubId, SyncEvent, SyncManager};
use sd_p2p::{Identity, RemoteIdentity};
use sd_old_p2p::{Identity, RemoteIdentity};
use sd_prisma::{
prisma::{self, device, instance, location, PrismaClient},
prisma_sync,
@@ -500,7 +500,10 @@ impl Libraries {
device::hardware_model::set(Some(node_config.hardware_model as i32)),
device::date_created::set(Some(Utc::now().fixed_offset())),
],
}.to_query(&db).exec().await?;
}
.to_query(&db)
.exec()
.await?;
}
let identity = match instance.identity.as_ref() {
@@ -639,7 +642,7 @@ async fn sync_rx_actor(
InvalidateOperationEvent::all(),
)),
SyncEvent::Created => {
p2p::sync::originator(library.clone(), &library.sync, &node.p2p).await
old_p2p::sync::originator(library.clone(), &library.sync, &node.p2p).await
}
}
}

View File

@@ -6,9 +6,8 @@ use crate::{
use sd_cloud_schema::devices::DeviceOS;
use sd_core_sync::DevicePubId;
use sd_p2p::Identity;
use sd_prisma::prisma::device;
use sd_utils::{db, error::FileIOError};
use sd_old_p2p::Identity;
use sd_utils::error::FileIOError;
use std::{
collections::HashSet,
@@ -150,7 +149,7 @@ pub struct NodeConfig {
}
mod identity_serde {
use sd_p2p::Identity;
use sd_old_p2p::Identity;
use serde::{Deserialize, Deserializer, Serialize, Serializer};
pub fn serialize<S>(identity: &Identity, serializer: S) -> Result<S::Ok, S::Error>
@@ -215,6 +214,19 @@ impl ManagedVersion<NodeConfigVersion> for NodeConfig {
HardwareModel::Other
});
// Create .sdks file in the data directory if it doesn't exist
let data_directory = Path::new(NODE_STATE_CONFIG_NAME)
.parent()
.expect("Config path must have a parent directory");
let sdks_file = data_directory.join(".sdks");
if !sdks_file.exists() {
std::fs::write(&sdks_file, b"")
.map_err(|e| {
FileIOError::from((sdks_file.clone(), e, "Failed to create .sdks file"))
})
.expect("Panicked to initialize .sdks file");
}
Some(Self {
id: Uuid::now_v7().into(),
name,

View File

@@ -1,6 +1,6 @@
use std::{collections::HashSet, net::SocketAddr, sync::Arc};
use sd_p2p::{
use sd_old_p2p::{
flume::bounded, hooks::QuicHandle, HookEvent, PeerConnectionCandidate, RemoteIdentity, P2P,
};
use serde::Serialize;

View File

@@ -3,7 +3,7 @@ use std::{
sync::{Arc, Mutex, PoisonError},
};
use sd_p2p::{hooks::QuicHandle, RemoteIdentity, P2P};
use sd_old_p2p::{hooks::QuicHandle, RemoteIdentity, P2P};
use tracing::error;
use crate::library::{Libraries, LibraryManagerEvent};

View File

@@ -3,7 +3,7 @@ use crate::{
config::{self, P2PDiscoveryState},
HardwareModel,
},
p2p::{
old_p2p::{
libraries::libraries_hook, operations, sync::SyncMessage, Header, OperatingSystem,
SPACEDRIVE_APP_ID,
},
@@ -12,12 +12,12 @@ use crate::{
use axum::routing::IntoMakeService;
use sd_p2p::{
use sd_old_p2p::{
flume::{bounded, Receiver},
hooks::{Libp2pPeerId, Mdns, QuicHandle, QuicTransport, RelayServerEntry},
Peer, RemoteIdentity, UnicastStream, P2P,
};
use sd_p2p_tunnel::Tunnel;
use sd_old_p2p_tunnel::Tunnel;
use serde::Serialize;
use serde_json::json;
use specta::Type;

View File

@@ -6,8 +6,8 @@ use std::{
use sd_core_file_path_helper::IsolatedFilePathData;
use sd_core_prisma_helpers::file_path_to_handle_p2p_serve_file;
use sd_p2p::{Identity, RemoteIdentity, UnicastStream, P2P};
use sd_p2p_block::{BlockSize, Range, SpaceblockRequest, SpaceblockRequests, Transfer};
use sd_old_p2p::{Identity, RemoteIdentity, UnicastStream, P2P};
use sd_old_p2p_block::{BlockSize, Range, SpaceblockRequest, SpaceblockRequests, Transfer};
use sd_prisma::prisma::file_path;
use tokio::{
fs::File,
@@ -16,7 +16,7 @@ use tokio::{
use tracing::debug;
use uuid::Uuid;
use crate::{p2p::Header, Node};
use crate::{old_p2p::Header, Node};
/// Request a file from a remote library
#[allow(unused)]
@@ -41,7 +41,7 @@ pub async fn request_file(
)
.await?;
let mut stream = sd_p2p_tunnel::Tunnel::initiator(stream, library_identity).await?;
let mut stream = sd_old_p2p_tunnel::Tunnel::initiator(stream, library_identity).await?;
let block_size = BlockSize::from_stream(&mut stream).await?;
let size = stream.read_u64_le().await?;
@@ -77,7 +77,7 @@ pub(crate) async fn receiver(
);
// The tunnel takes care of authentication and encrypts all traffic to the library to be certain we are talking to a node with the library.
let mut stream = sd_p2p_tunnel::Tunnel::responder(stream).await?;
let mut stream = sd_old_p2p_tunnel::Tunnel::responder(stream).await?;
let library = node
.libraries

View File

@@ -1,10 +1,10 @@
use std::{error::Error, sync::Arc};
use sd_p2p::{RemoteIdentity, UnicastStream, P2P};
use sd_old_p2p::{RemoteIdentity, UnicastStream, P2P};
use tokio::io::{AsyncReadExt, AsyncWriteExt};
use tracing::debug;
use crate::p2p::Header;
use crate::old_p2p::Header;
/// Send a ping to all peers we are connected to
#[allow(unused)]

View File

@@ -3,12 +3,12 @@ use std::{error::Error, sync::Arc};
use axum::{extract::Request, http, Router};
use hyper::{body::Incoming, client::conn::http1::handshake, server::conn::http1, Response};
use hyper_util::rt::TokioIo;
use sd_p2p::{RemoteIdentity, UnicastStream, P2P};
use sd_old_p2p::{RemoteIdentity, UnicastStream, P2P};
use tokio::io::AsyncWriteExt;
use tower_service::Service;
use tracing::debug;
use crate::{p2p::Header, Node};
use crate::{old_p2p::Header, Node};
/// Transfer an rspc query to a remote node.
pub async fn remote_rspc(

View File

@@ -8,10 +8,10 @@ use std::{
time::Duration,
};
use crate::p2p::{Header, P2PEvent, P2PManager};
use crate::old_p2p::{Header, P2PEvent, P2PManager};
use futures::future::join_all;
use sd_p2p::{RemoteIdentity, UnicastStream};
use sd_p2p_block::{BlockSize, Range, SpaceblockRequest, SpaceblockRequests, Transfer};
use sd_old_p2p::{RemoteIdentity, UnicastStream};
use sd_old_p2p_block::{BlockSize, Range, SpaceblockRequest, SpaceblockRequests, Transfer};
use thiserror::Error;
use tokio::{
fs::{create_dir_all, File},
@@ -32,7 +32,7 @@ pub enum SpacedropError {
#[error("error connecting to peer")]
FailedPeerConnection,
#[error("error creating stream: {0}")]
FailedNewStream(#[from] sd_p2p::NewStreamError),
FailedNewStream(#[from] sd_old_p2p::NewStreamError),
#[error("error opening file: {0}")]
FailedFileOpen(#[from] std::io::Error),
}

View File

@@ -1,5 +1,5 @@
use sd_p2p_block::{Range, SpaceblockRequests, SpaceblockRequestsError};
use sd_p2p_proto::{decode, encode};
use sd_old_p2p_block::{Range, SpaceblockRequests, SpaceblockRequestsError};
use sd_old_p2p_proto::{decode, encode};
use thiserror::Error;
use tokio::io::{AsyncRead, AsyncReadExt};
use uuid::Uuid;

View File

@@ -1,4 +1,4 @@
use sd_p2p_proto::decode;
use sd_old_p2p_proto::decode;
use tokio::io::{AsyncRead, AsyncReadExt};
// will probs have more variants in future

View File

@@ -27,7 +27,7 @@ pub struct LabelerOutput {
pub enum ImageLabelerError {
#[error("model executor failed: {0}")]
ModelExecutorFailed(#[from] ort::Error),
#[error("image load failed <path='{}'>: {0}", .1.display())]
#[error("image load failed <path='{path}'>: {0}", path = .1.display())]
ImageLoadFailed(image::ImageError, Box<Path>),
#[error("failed to get isolated file path data: {0}")]
IsolateFilePathData(#[from] MissingFieldError),

View File

@@ -4,7 +4,7 @@ pub type Result<T> = std::result::Result<T, Error>;
#[derive(thiserror::Error, Debug)]
pub enum Error {
#[error("there was an i/o at path '{}' error: {0}", .1.display())]
#[error("there was an i/o at path '{path}' error: {0}", path = .1.display())]
Io(std::io::Error, Box<Path>),
#[error("the image provided is unsupported")]

View File

@@ -1,5 +1,5 @@
[package]
name = "sd-p2p"
name = "sd-old-p2p"
version = "0.2.0"
authors = ["Oscar Beaumont <oscar@otbeaumont.me>"]

View File

@@ -1,5 +1,5 @@
[package]
name = "sd-p2p-block"
name = "sd-old-p2p-block"
version = "0.1.0"
authors = ["Oscar Beaumont <oscar@otbeaumont.me>"]
@@ -9,7 +9,7 @@ repository.workspace = true
[dependencies]
# Spacedrive Sub-crates
sd-p2p-proto = { path = "../proto" }
sd-old-p2p-proto = { path = "../proto" }
# Workspace dependencies
thiserror = { workspace = true }

View File

@@ -4,7 +4,7 @@ use thiserror::Error;
use tokio::io::{AsyncRead, AsyncReadExt};
use uuid::Uuid;
use sd_p2p_proto::{decode, encode};
use sd_old_p2p_proto::{decode, encode};
use super::BlockSize;

View File

@@ -1,5 +1,5 @@
[package]
name = "sd-p2p-proto"
name = "sd-old-p2p-proto"
version = "0.1.0"
authors = ["Oscar Beaumont <oscar@otbeaumont.me>"]

View File

@@ -1,5 +1,5 @@
[package]
name = "sd-p2p-tunnel"
name = "sd-old-p2p-tunnel"
version = "0.1.0"
authors = ["Oscar Beaumont <oscar@otbeaumont.me>"]
@@ -9,8 +9,8 @@ repository.workspace = true
[dependencies]
# Spacedrive Sub-crates
sd-p2p = { path = "../../" }
sd-p2p-proto = { path = "../proto" }
sd-old-p2p = { path = "../../" }
sd-old-p2p-proto = { path = "../proto" }
# Workspace dependencies
thiserror = { workspace = true }

View File

@@ -6,12 +6,12 @@ use std::{
task::{Context, Poll},
};
use sd_p2p_proto::{decode, encode};
use sd_old_p2p_proto::{decode, encode};
use tokio::io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt, ReadBuf};
use thiserror::Error;
use sd_p2p::{Identity, IdentityErr, RemoteIdentity, UnicastStream};
use sd_old_p2p::{Identity, IdentityErr, RemoteIdentity, UnicastStream};
#[derive(Debug, Error)]
pub enum TunnelError {

View File

@@ -269,45 +269,61 @@ pub async fn get_response(
let cipher = CookieCipher::new(&key).unwrap();
// Read .sdks file
let sdks_path = data_dir.join(".sdks");
let sdks_path = NODE_DATA_DIR
.get()
.unwrap()
.clone()
.join("spacedrive")
.join(".sdks");
debug!("Reading .sdks file: {:?}", sdks_path);
let data = std::fs::read(sdks_path.clone()).unwrap();
// If the .sdks file doesn't exist, create it
if !sdks_path.exists() {
std::fs::write(sdks_path.clone(), "").unwrap();
}
let data_str = String::from_utf8(data)
.map_err(|e| {
error!("Failed to convert data to string: {:?}", e.to_string());
})
.unwrap();
let data = CookieCipher::base64_decode(&data_str)
.map_err(|e| {
error!("Failed to decode data: {:?}", e.to_string());
})
.unwrap();
let de_data = cipher
.decrypt(&data)
.map_err(|e| {
error!("Failed to decrypt data: {:?}", e.to_string());
})
.unwrap();
let de_data = String::from_utf8(de_data)
.map_err(|e| {
error!("Failed to convert data to string: {:?}", e.to_string());
})
.unwrap();
debug!("Decrypted Data: {:?}", de_data);
let mut de_data: Vec<String> = Vec::new();
// Try to read existing cookies if file exists and has content
if !data.is_empty() {
let data_str = String::from_utf8(data)
.map_err(|e| {
error!("Failed to convert data to string: {:?}", e.to_string());
})
.unwrap();
let data = CookieCipher::base64_decode(&data_str)
.map_err(|e| {
error!("Failed to decode data: {:?}", e.to_string());
})
.unwrap();
let decrypted = cipher
.decrypt(&data)
.map_err(|e| {
error!("Failed to decrypt data: {:?}", e.to_string());
})
.unwrap();
let decrypted_str = String::from_utf8(decrypted)
.map_err(|e| {
error!("Failed to convert data to string: {:?}", e.to_string());
})
.unwrap();
de_data = serde_json::from_str(&decrypted_str).unwrap();
}
debug!("\nCookies:");
for (name, value) in &cookie_store {
debug!(" {} = {}", name, value);
}
let mut de_data: Vec<String> = serde_json::from_str(&de_data).unwrap();
for cookie in &mut de_data {
for (name, value) in &cookie_store {
if cookie.starts_with(name) {
*cookie = format!("{}={};expires=Fri, 31 Dec 9999 23:59:59 GMT;path=/;samesite=lax", name, value);
}
}
// Update or add new cookies
for (name, value) in &cookie_store {
let cookie_str = format!(
"{}={};expires=Fri, 31 Dec 9999 23:59:59 GMT;path=/;samesite=lax",
name, value
);
// Remove existing cookie if present
de_data.retain(|c| !c.starts_with(name));
// Add new cookie
de_data.push(cookie_str);
}
debug!("Updated Cookies: {:?}", de_data);

View File

@@ -113,7 +113,15 @@ const state = {
isContextMenuOpen: false,
quickRescanLastRun: Date.now() - 200,
// Map = { hotkey: '0'...'9', tagId: 1234 }
tagBulkAssignHotkeys: [] as Array<{ hotkey: string; tagId: number }>
tagBulkAssignHotkeys: [] as Array<{ hotkey: string; tagId: number }>,
// Used to check if location is remote or not:
currentLocation: {
id: 0,
device_id: 0,
device_pub_id: '',
device_name: '',
name: ''
}
};
export function flattenThumbnailKey(thumbKey: ThumbKey) {

View File

@@ -1,10 +1,13 @@
import { useCallback, useEffect, useMemo, useRef, useState } from 'react';
import { useEffect, useMemo, useRef, useState } from 'react';
import { subscribe } from 'valtio';
import {
compareHumanizedSizes,
getExplorerItemData,
humanizeSize,
ThumbKey,
useBridgeMutation,
useBridgeQuery,
useClientContext,
type ExplorerItem
} from '@sd/client';
import { usePlatform } from '~/util/Platform';
@@ -25,6 +28,18 @@ export function useExplorerItemData(explorerItem: ExplorerItem) {
const platform = usePlatform();
const cachedSize = useRef<ReturnType<typeof humanizeSize> | null>(null);
const [newThumbnails, setNewThumbnails] = useState<Map<string, string | null>>(new Map());
const { currentLibraryId } = useClientContext();
const currentLocation = useMemo(
() => explorerStore.currentLocation,
[explorerStore.currentLocation]
);
// Move the hook call to the top level
const thumbnailGet = useBridgeMutation('cloud.thumbnails.get');
const currentDevice = useBridgeQuery(['cloud.devices.get_current_device']);
// Keep track of which thumbnails we've already requested to avoid duplicates
const requestedThumbnails = useRef(new Set<string>());
let thumbnails: ThumbKey | ThumbKey[] | null = null;
switch (explorerItem.type) {
@@ -50,7 +65,31 @@ export function useExplorerItemData(explorerItem: ExplorerItem) {
const thumbs = thumbnailKeys.reduce<Map<string, string | null>>((acc, thumbKey) => {
const url = platform.getThumbnailUrlByThumbKey(thumbKey);
const thumbId = flattenThumbnailKey(thumbKey);
acc.set(url, explorerStore.newThumbnails.has(thumbId) ? thumbId : null);
// Check if we already have a thumbnail locally
const hasLocalThumb = explorerStore.newThumbnails.has(thumbId);
// If no local thumbnail and we have the required info, fetch from remote device && check that device_id is not 1
if (
!hasLocalThumb &&
currentDevice?.data?.pub_id !== currentLocation?.device_pub_id &&
thumbKey.cas_id &&
currentLocation?.device_pub_id &&
currentLibraryId &&
!requestedThumbnails.current.has(thumbKey.cas_id)
) {
// Mark as requested to avoid duplicate requests
requestedThumbnails.current.add(thumbKey.cas_id);
// Initiate the thumbnail fetch
thumbnailGet.mutate({
cas_id: thumbKey.cas_id,
library_pub_id: currentLibraryId,
device_pub_id: currentLocation.device_pub_id
});
}
acc.set(url, hasLocalThumb ? thumbId : null);
return acc;
}, new Map());
@@ -66,12 +105,12 @@ export function useExplorerItemData(explorerItem: ExplorerItem) {
updateThumbnails();
return subscribe(explorerStore, updateThumbnails);
}, [thumbnails, platform]);
}, [thumbnails, platform, currentLocation, currentLibraryId, thumbnailGet]);
return useMemo(() => {
const explorerItemData = getExplorerItemData(explorerItem);
// Avoid unecessary re-renders
// Avoid unnecessary re-renders
if (
cachedSize.current == null ||
!compareHumanizedSizes(cachedSize.current, explorerItemData.size)

View File

@@ -7,6 +7,7 @@ import {
FilePathOrder,
filePathOrderingKeysSchema,
Location,
useClientContext,
useLibraryQuery,
useLibrarySubscription,
useOnlineLocations
@@ -134,6 +135,28 @@ const LocationExplorer = ({ location }: { location: Location; path?: string }) =
const { t } = useLocale();
// Get list of devices and map the public_id to the device_ids
const devices = useLibraryQuery(['devices.list'], {
placeholderData: keepPreviousData
});
const deviceId = devices.data?.find((device) => device.id === location.device_id)
?.pub_id as any;
// Get UUID of the device from the pub_id, as CorePubId = { Uuid: string } | { Vec: number[] }
// and we need to convert it to a string
const deviceIdString = deviceId?.Uuid ?? (deviceId?.Vec ? stringify(deviceId.Vec) : '');
const deviceName = devices.data?.find((device) => device.id === location.device_id)?.name;
// Set state to current location
useEffect(() => {
explorerStore.currentLocation = {
id: location.id,
device_id: location.device_id ?? -1,
device_pub_id: deviceIdString ?? '',
device_name: deviceName ?? '',
name: location.name ?? 'Unknown'
};
}, [location, deviceIdString, deviceName]);
return (
<ExplorerContextProvider explorer={explorer}>
<SearchContextProvider search={search}>

View File

@@ -66,10 +66,10 @@ export default () => {
{t('general')}
</SidebarLink>
{/* Disabling for now until sync is ready. */}
{/* <SidebarLink to="client/account">
<SidebarLink to="client/account">
<Icon component={User} />
{t('account')}
</SidebarLink> */}
</SidebarLink>
<SidebarLink to="node/libraries">
<Icon component={Books} />
{t('libraries')}

View File

@@ -6,6 +6,7 @@ import {
useBridgeMutation,
useBridgeQuery,
useBridgeSubscription,
useClientContext,
useLibraryMutation,
useLibrarySubscription
} from '@sd/client';
@@ -44,7 +45,7 @@ const Profile = ({
})();
}, []);
const cloudBootstrap = useBridgeMutation('cloud.bootstrap');
const cloudBootstrap = useLibraryMutation('cloud.bootstrap');
const devices = useBridgeQuery(['cloud.devices.list']);
const addLibraryToCloud = useLibraryMutation('cloud.libraries.create');
const [syncStatus, setSyncStatus] = useState<SyncStatus | null>(null);
@@ -85,7 +86,9 @@ const Profile = ({
<div className="flex flex-col gap-3">
<div>
<p className="font-medium">Joined on</p>
<p className="text-ink-dull">{new Date(user.timejoined).toLocaleDateString()}</p>
<p className="text-ink-dull">
{new Date(user.timejoined).toLocaleDateString()}
</p>
</div>
<div>
<p className="font-medium">User ID</p>
@@ -104,7 +107,9 @@ const Profile = ({
<div
className={clsx(
'mr-2 size-[15px] rounded-full bg-app-box',
syncStatus?.[status as keyof SyncStatus] ? 'bg-accent' : 'bg-app-input'
syncStatus?.[status as keyof SyncStatus]
? 'bg-accent'
: 'bg-app-input'
)}
/>
<h3 className="text-sm font-semibold">{status}</h3>

View File

@@ -70,7 +70,7 @@ export type Procedures = {
{ key: "backups.backup", input: LibraryArgs<null>, result: string } |
{ key: "backups.delete", input: string, result: null } |
{ key: "backups.restore", input: string, result: null } |
{ key: "cloud.bootstrap", input: [AccessToken, RefreshToken], result: null } |
{ key: "cloud.bootstrap", input: LibraryArgs<[AccessToken, RefreshToken]>, result: null } |
{ key: "cloud.devices.delete", input: CloudDevicePubId, result: null } |
{ key: "cloud.devices.update", input: CloudUpdateDeviceArgs, result: null } |
{ key: "cloud.libraries.create", input: LibraryArgs<null>, result: null } |
@@ -81,6 +81,7 @@ export type Procedures = {
{ key: "cloud.syncGroups.create", input: LibraryArgs<null>, result: null } |
{ key: "cloud.syncGroups.delete", input: CloudSyncGroupPubId, result: null } |
{ key: "cloud.syncGroups.request_join", input: SyncGroupsRequestJoinArgs, result: null } |
{ key: "cloud.thumbnails.get", input: CloudThumbnailRequestArgs, result: null } |
{ key: "cloud.userResponse", input: CloudP2PUserResponse, result: null } |
{ key: "ephemeralFiles.copyFiles", input: LibraryArgs<EphemeralFileSystemOps>, result: null } |
{ key: "ephemeralFiles.createFile", input: LibraryArgs<CreateEphemeralFileArgs>, result: string } |
@@ -236,6 +237,8 @@ export type CloudSyncGroupsRemoveDeviceArgs = { group_pub_id: CloudSyncGroupPubI
export type CloudSyncKeyHash = string
export type CloudThumbnailRequestArgs = { device_pub_id: CloudDevicePubId; library_pub_id: CloudLibraryPubId; cas_id: string | null }
export type CloudUpdateDeviceArgs = { pub_id: CloudDevicePubId; name: string }
export type Codec = { kind: string | null; sub_kind: string | null; tag: string | null; name: string | null; profile: string | null; bit_rate: number; props: Props | null }