From 0f7a669e5a03749bbbb7ae87f4d9cda9875cce5b Mon Sep 17 00:00:00 2001 From: Oscar Beaumont Date: Mon, 19 Jun 2023 07:13:30 +0200 Subject: [PATCH] [ENG-732] Pairing by library (#943) * library keypair * allow opening stream to non-connected node + more * library identity * fix types * fix maybe undefined * don't forgor migrations file * library manager inside p2p manager * rename * `NodeInformation` struct * node info exchange * fill in info * streamify tunnel * use tunnel for p2p events * libp2p is annoying + stop leaking private key's * Clippy cleanup --- Cargo.lock | Bin 241435 -> 245429 bytes apps/desktop/src-tauri/src/main.rs | 2 +- .../library/LibraryGeneralSettings.tsx | 5 +- .../20230619032753_p2p/migration.sql | 17 ++ core/prisma/schema.prisma | 9 +- core/src/api/jobs.rs | 9 +- core/src/api/libraries.rs | 11 +- core/src/api/nodes.rs | 1 + core/src/api/p2p.rs | 6 +- core/src/api/search.rs | 2 +- core/src/lib.rs | 38 +-- core/src/library/config.rs | 89 +++++- core/src/library/library.rs | 3 + core/src/library/manager.rs | 191 +++++++++--- core/src/location/indexer/indexer_job.rs | 12 +- core/src/node/config.rs | 22 +- core/src/node/mod.rs | 52 ++-- core/src/node/peer_request.rs | 154 ---------- core/src/object/preview/thumbnail/mod.rs | 2 +- core/src/p2p/mod.rs | 2 + core/src/p2p/p2p_manager.rs | 285 ++++++++++++++++-- core/src/p2p/protocol.rs | 218 +++++++++++--- core/src/sync/manager.rs | 16 +- core/src/util/db.rs | 2 +- core/src/util/debug_initializer.rs | 14 +- core/src/util/maybe_undefined.rs | 58 ++++ core/src/util/migrator.rs | 14 +- core/src/util/mod.rs | 2 + crates/p2p/Cargo.toml | 3 + crates/p2p/src/lib.rs | 1 + crates/p2p/src/manager.rs | 10 +- crates/p2p/src/manager_stream.rs | 72 ++++- crates/p2p/src/mdns.rs | 2 +- crates/p2p/src/spaceblock/mod.rs | 40 +-- crates/p2p/src/spacetunnel/identity.rs | 54 ++++ crates/p2p/src/spacetunnel/mod.rs | 7 + crates/p2p/src/spacetunnel/tunnel.rs | 64 ++++ crates/p2p/src/utils/keypair.rs | 9 +- crates/p2p/src/utils/peer_id.rs | 6 + .../Layout/Sidebar/LibrarySection.tsx | 17 +- interface/app/$libraryId/settings/Sidebar.tsx | 4 +- .../$libraryId/settings/library/general.tsx | 16 +- .../app/$libraryId/settings/library/nodes.tsx | 33 ++ packages/client/src/core.ts | 29 +- 44 files changed, 1163 insertions(+), 440 deletions(-) create mode 100644 core/prisma/migrations/20230619032753_p2p/migration.sql delete mode 100644 core/src/node/peer_request.rs create mode 100644 core/src/util/maybe_undefined.rs create mode 100644 crates/p2p/src/spacetunnel/identity.rs create mode 100644 crates/p2p/src/spacetunnel/mod.rs create mode 100644 crates/p2p/src/spacetunnel/tunnel.rs diff --git a/Cargo.lock b/Cargo.lock index ba38eded2c55453741f1abb2d436d1c4267dbc6f..401edd831c0fb6501bb14fe77af3292e3816b87a 100644 GIT binary patch delta 2081 zcmaJ?TZmOv7-sJ?noiR+IpZ0nsg6cL>1@{h(nCk_LFl22CPhuyy2>=>Of%|R(L7aj zLDOnimffBWh>M80?0-byaErMo{jG&@_?-?z82o8t6)?Ryk-}?UV|JK2C z{m1_5KXOTxR1Kz0H#BZcbDOd)ZRTV|#Is-okHlfcDN;H{uRM`TT5p}8K9Gn!(LTU1 zD(8GO2>zKXpZ|1CIVF2Km(<}M<#VrY?R@guZ)=ygP$3Fb9+gY@Qd1owVU)@caJlekvwY+IdT+V3ci#uY zonPL+r+3BVo|(O)ljFYA>O9!Dh>p`c7qY<(zyy5 zIFCw6j1gDHGg994Nb`ChrU6eDV*SQ6|6aa2y?Y_+7Zod7@M~DH0$sz36!^;A)Wmoy zw&$&~d-BVkt8_c*)QNmWwOw_h^yXN8^Ae<}Co0I5F;)v;uXO(9e7JAd^yI#&rJ3@B z)2#t`hHBN&Qxk4Rl@Gi%oaR2xSM<&Vhv6*+y%$cC@}p%=aC!gf>nh%)0xn17=n_kSx`t&1 z4lIXZG1UEEmN3Tfefs9(eC-vYvLKy>)4vX-!xwYg2D>s$7_*XSU}tgQl0|G;w31>b zbcCPO4nv8-e_l$UE1WloNzGWf**4RGt$AOqMf$csYjw_@*?C7cRG48+mPliWGDas_242m ziKG%h2+%4O40DJI6p*0sXdGv_qVBl!+)}5t@e)O91GmtX1PW-IeZlz|Q*jZ6thqF+ z!4+3eO^-hvCa1yvU7$AGpB}iJtx5B*gD^^2s=+wXMMoeIz)YZug1BMUGJ%WFFrVF$j<3uI%d;!D zx1;9B5^y0Pxd6w25VIT$4uRwxHawtcVA9Zlql}z`J|5hyNI*u~DtcD^r;(R{>UQkD z-J8$#F00R)zB|4({TO?vXhd{y_;)%nJ)>HOpQ zP}=zw$3jT zxRkB${y3IDdv`iEm90wlh5X*zrl?|zgo!qf|^5wt)=R%xb8q;LT-1riY-0;`-Fw5G_? z5Ue*gASRMAd7UT~Zk;GlqN@d~aqG5#mylz)TfCA#)k=px1j+s!%^q&^fTdC)c!C97 zevndBLWmO|9RtB46~SuOc&Tw(~ z))Dxh(|wlT+Wq->{{5k=VerM{jfd(1hJ0MNKO0JSjWxDZ(pOQkd#r(%S2sw((yVK2 L=zcxc*uL!_w{>n+ delta 344 zcmdn`mv8nxz73x(HgC3E5IVVFqT=NGKDN!X`oBs}4#-iP{9(2D<_oK=xTYU+WH#LF zxmA*9x?wi6u$Yn#mx5AJTC$m?xh0rgoSB}NIQgG~++?Q1$0jo=N^O2|cpnp!lFsxC z=}gkhDXB%%=Wb*aXG%+(UNDnMoHe~DzqDZT*%Jy}VC@Cj$;B3v{raVtQoD$Y$pIv8vPMt(nEQH?uG%3jh@;P2RvMzFkF(apMiJOQth^V6>ZV@Rc!WdfR)( zD7cvRcDIj=VSHQbncRgrAucS)&YUjzfl+$<6A`9+T-!G(Fy**!=gwrR)|x(JGLzl* zUz3?ESvbIILCys_QwHb~!|g|AGG!@j*VxTuA~5~ZUMA`7>1UYk&YSL?%^WzL(Tz!T Z`i^X7r)fFNhSQ~snYFhk(err: NodeError) -> TauriPlugin { tauri::plugin::Builder::new("spacedrive") .js_init_script(format!( r#"window.__SD_ERROR__ = `{}`;"#, - err.to_string().replace("`", "\"") + err.to_string().replace('`', "\"") )) .build() } diff --git a/apps/mobile/src/screens/settings/library/LibraryGeneralSettings.tsx b/apps/mobile/src/screens/settings/library/LibraryGeneralSettings.tsx index f67653cbf..062d96c17 100644 --- a/apps/mobile/src/screens/settings/library/LibraryGeneralSettings.tsx +++ b/apps/mobile/src/screens/settings/library/LibraryGeneralSettings.tsx @@ -24,7 +24,10 @@ const LibraryGeneralSettingsScreen = ({ const form = useZodForm({ schema, - defaultValues: { name: library.config.name, description: library.config.description } + defaultValues: { + name: library.config.name, + description: library.config.description || undefined + } }); const { mutate: editLibrary } = useBridgeMutation('library.edit'); diff --git a/core/prisma/migrations/20230619032753_p2p/migration.sql b/core/prisma/migrations/20230619032753_p2p/migration.sql new file mode 100644 index 000000000..a6bb4ebaa --- /dev/null +++ b/core/prisma/migrations/20230619032753_p2p/migration.sql @@ -0,0 +1,17 @@ +-- RedefineTables +PRAGMA foreign_keys=OFF; +CREATE TABLE "new_node" ( + "id" INTEGER NOT NULL PRIMARY KEY AUTOINCREMENT, + "pub_id" BLOB NOT NULL, + "name" TEXT NOT NULL, + "platform" INTEGER NOT NULL, + "date_created" DATETIME NOT NULL, + "identity" BLOB, + "node_peer_id" TEXT +); +INSERT INTO "new_node" ("date_created", "id", "name", "platform", "pub_id") SELECT "date_created", "id", "name", "platform", "pub_id" FROM "node"; +DROP TABLE "node"; +ALTER TABLE "new_node" RENAME TO "node"; +CREATE UNIQUE INDEX "node_pub_id_key" ON "node"("pub_id"); +PRAGMA foreign_key_check; +PRAGMA foreign_keys=ON; diff --git a/core/prisma/schema.prisma b/core/prisma/schema.prisma index bbc4d014f..5141c5a66 100644 --- a/core/prisma/schema.prisma +++ b/core/prisma/schema.prisma @@ -52,11 +52,10 @@ model Node { pub_id Bytes @unique name String // Enum: sd_core::node::Platform - platform Int @default(0) - // version String? - // last_seen DateTime @default(now()) - // timezone String? - date_created DateTime @default(now()) + platform Int + date_created DateTime + identity Bytes? // TODO: Change to required field in future + node_peer_id String? // TODO: Remove as part of - https://linear.app/spacedriveapp/issue/ENG-757/p2p-library-portability jobs Job[] Location Location[] diff --git a/core/src/api/jobs.rs b/core/src/api/jobs.rs index 9b998e277..1215ea87f 100644 --- a/core/src/api/jobs.rs +++ b/core/src/api/jobs.rs @@ -115,11 +115,11 @@ pub(crate) fn mount() -> AlphaRouter { match groups.entry(group_key) { // Create new job group with metadata Entry::Vacant(e) => { - let id = job.parent_id.clone().unwrap_or(job.id.clone()); + let id = job.parent_id.unwrap_or(job.id); let group = JobGroup { id: id.to_string(), action: action_name.clone(), - status: job.status.clone(), + status: job.status, jobs: vec![report.clone()], created_at: job.created_at.unwrap_or(Utc::now()), }; @@ -134,8 +134,7 @@ pub(crate) fn mount() -> AlphaRouter { } } - let mut groups_vec: Vec = - groups.into_iter().map(|(_, v)| v).collect(); + let mut groups_vec: Vec = groups.into_values().collect(); groups_vec.sort_by(|a, b| b.created_at.cmp(&a.created_at)); // Update the index after sorting the groups @@ -154,7 +153,7 @@ pub(crate) fn mount() -> AlphaRouter { }) .procedure("isActive", { R.with2(library()).query(|(ctx, _), _: ()| async move { - Ok(ctx.jobs.get_running_reports().await.len() > 0) + Ok(!ctx.jobs.get_running_reports().await.is_empty()) }) }) .procedure("clear", { diff --git a/core/src/api/libraries.rs b/core/src/api/libraries.rs index b213ac8a2..5abd87dec 100644 --- a/core/src/api/libraries.rs +++ b/core/src/api/libraries.rs @@ -1,6 +1,7 @@ use crate::{ library::LibraryConfig, prisma::statistics, + util::MaybeUndefined, volume::{get_volumes, save_volume}, }; @@ -97,10 +98,10 @@ pub(crate) fn mount() -> AlphaRouter { let new_library = ctx .library_manager - .create(LibraryConfig { - name: args.name.to_string(), - ..Default::default() - }) + .create( + LibraryConfig::new(args.name.to_string(), ctx.config.get().await.id), + ctx.config.get().await, + ) .await?; Ok(new_library) @@ -111,7 +112,7 @@ pub(crate) fn mount() -> AlphaRouter { pub struct EditLibraryArgs { pub id: Uuid, pub name: Option, - pub description: Option, + pub description: MaybeUndefined, } R.mutation(|ctx, args: EditLibraryArgs| async move { diff --git a/core/src/api/nodes.rs b/core/src/api/nodes.rs index a94d1b551..347ed92ec 100644 --- a/core/src/api/nodes.rs +++ b/core/src/api/nodes.rs @@ -28,6 +28,7 @@ pub(crate) fn mount() -> AlphaRouter { "error updating config".into(), ) }) + .map(|_| ()) }) }) } diff --git a/core/src/api/p2p.rs b/core/src/api/p2p.rs index 5e3e3a985..c3aa632c1 100644 --- a/core/src/api/p2p.rs +++ b/core/src/api/p2p.rs @@ -7,7 +7,7 @@ use uuid::Uuid; use crate::p2p::P2PEvent; -use super::{Ctx, R}; +use super::{utils::library, Ctx, R}; pub(crate) fn mount() -> AlphaRouter { R.router() @@ -74,4 +74,8 @@ pub(crate) fn mount() -> AlphaRouter { }) }) }) + .procedure("pair", { + R.with2(library()) + .mutation(|(ctx, lib), id: PeerId| async move { ctx.p2p.pair(id, lib) }) + }) } diff --git a/core/src/api/search.rs b/core/src/api/search.rs index 68b563a3e..b3432390a 100644 --- a/core/src/api/search.rs +++ b/core/src/api/search.rs @@ -167,7 +167,7 @@ enum ObjectHiddenFilter { } impl ObjectHiddenFilter { - fn to_param(self) -> Option { + fn to_param(&self) -> Option { match self { ObjectHiddenFilter::Exclude => Some(object::hidden::not(Some(true))), ObjectHiddenFilter::Include => None, diff --git a/core/src/lib.rs b/core/src/lib.rs index afa5dc0f6..d5aa67171 100644 --- a/core/src/lib.rs +++ b/core/src/lib.rs @@ -17,7 +17,7 @@ use std::{ }; use thiserror::Error; use tokio::{fs, sync::broadcast}; -use tracing::{debug, error, info, warn}; +use tracing::{error, info, warn}; use tracing_appender::{ non_blocking::{NonBlocking, WorkerGuard}, rolling::{RollingFileAppender, Rotation}, @@ -42,7 +42,6 @@ pub struct NodeContext { pub jobs: Arc, pub location_manager: Arc, pub event_bus_tx: broadcast::Sender, - pub p2p: Arc, } pub struct Node { @@ -73,49 +72,26 @@ impl Node { let jobs = JobManager::new(); let location_manager = LocationManager::new(); - let (p2p, mut p2p_rx) = P2PManager::new(config.clone()).await?; - let library_manager = LibraryManager::new( data_dir.join("libraries"), NodeContext { config: config.clone(), jobs: jobs.clone(), location_manager: location_manager.clone(), - p2p: p2p.clone(), + // p2p: p2p.clone(), event_bus_tx: event_bus.0.clone(), }, ) .await?; + let p2p = P2PManager::new(config.clone(), library_manager.clone()).await?; #[cfg(debug_assertions)] if let Some(init_data) = init_data { - init_data.apply(&library_manager).await?; + init_data + .apply(&library_manager, config.get().await) + .await?; } - tokio::spawn({ - let library_manager = library_manager.clone(); - - async move { - while let Ok((library_id, operations)) = p2p_rx.recv().await { - debug!("going to ingest {} operations", operations.len()); - - let Some(library) = library_manager.get_library(library_id).await else { - warn!("no library found!"); - continue; - }; - - for op in operations { - library.sync.ingest_op(op).await.unwrap_or_else(|err| { - error!( - "error ingesting operation for library '{}': {err:?}", - library.id - ); - }); - } - } - } - }); - let router = api::mount(); let node = Node { data_dir: data_dir.to_path_buf(), @@ -239,7 +215,7 @@ pub enum NodeError { #[error("failed to initialize p2p manager: {0}")] P2PManager(#[from] sd_p2p::ManagerError), #[error("invalid platform integer: {0}")] - InvalidPlatformInt(i32), + InvalidPlatformInt(u8), #[cfg(debug_assertions)] #[error("Init config error: {0}")] InitConfig(#[from] util::debug_initializer::InitConfigError), diff --git a/core/src/library/config.rs b/core/src/library/config.rs index 9da596652..fe9775434 100644 --- a/core/src/library/config.rs +++ b/core/src/library/config.rs @@ -1,4 +1,9 @@ +use std::{path::PathBuf, sync::Arc}; + +use sd_p2p::{spacetunnel::Identity, PeerId}; +use sd_prisma::prisma::node; use serde::{Deserialize, Serialize}; +use serde_json::Value; use specta::Type; use uuid::Uuid; @@ -11,27 +16,63 @@ use crate::{ }; /// LibraryConfig holds the configuration for a specific library. This is stored as a '{uuid}.sdlibrary' file. -#[derive(Debug, Serialize, Deserialize, Clone, Type, Default)] +#[derive(Debug, Serialize, Deserialize, Clone)] // If you are adding `specta::Type` on this your probably about to leak the P2P private key pub struct LibraryConfig { /// name is the display name of the library. This is used in the UI and is set by the user. pub name: String, /// description is a user set description of the library. This is used in the UI and is set by the user. - pub description: String, + pub description: Option, + /// P2P identity of this library. + pub identity: Vec, + /// Id of the current node + pub node_id: Uuid, // /// is_encrypted is a flag that is set to true if the library is encrypted. // #[serde(default)] // pub is_encrypted: bool, } +#[derive(Debug, Serialize, Deserialize, Clone, Type)] +pub struct SanitisedLibraryConfig { + pub name: String, + pub description: Option, + pub node_id: Uuid, +} + +impl From for SanitisedLibraryConfig { + fn from(config: LibraryConfig) -> Self { + Self { + name: config.name, + description: config.description, + node_id: config.node_id, + } + } +} + +impl LibraryConfig { + pub fn new(name: String, node_id: Uuid) -> Self { + Self { + name, + description: None, + identity: Identity::new().to_bytes().to_vec(), + node_id, + } + } +} + #[async_trait::async_trait] impl Migrate for LibraryConfig { - const CURRENT_VERSION: u32 = 1; + const CURRENT_VERSION: u32 = 4; - type Ctx = PrismaClient; + type Ctx = (Uuid, PeerId, Arc); + + fn default(path: PathBuf) -> Result { + Err(MigratorError::ConfigFileMissing(path)) + } async fn migrate( to_version: u32, - _config: &mut serde_json::Map, - db: &Self::Ctx, + config: &mut serde_json::Map, + (node_id, peer_id, db): &Self::Ctx, ) -> Result<(), MigratorError> { match to_version { 0 => {} @@ -59,6 +100,40 @@ impl Migrate for LibraryConfig { ) .await?; } + 2 => { + config.insert( + "identity".into(), + Value::Array( + Identity::new() + .to_bytes() + .into_iter() + .map(|v| v.into()) + .collect(), + ), + ); + } + // The fact I have to migrate this hurts my soul + 3 => { + if db.node().count(vec![]).exec().await? != 1 { + return Err(MigratorError::Custom( + "Ummm, there are too many nodes in the database, this should not happen!" + .into(), + )); + } + + db.node() + .update_many( + vec![], + vec![ + node::pub_id::set(node_id.as_bytes().to_vec()), + node::node_peer_id::set(Some(peer_id.to_string())), + ], + ) + .exec() + .await?; + + config.insert("node_id".into(), Value::String(node_id.to_string())); + } v => unreachable!("Missing migration for library version {}", v), } @@ -70,5 +145,5 @@ impl Migrate for LibraryConfig { #[derive(Serialize, Deserialize, Debug, Type)] pub struct LibraryConfigWrapped { pub uuid: Uuid, - pub config: LibraryConfig, + pub config: SanitisedLibraryConfig, } diff --git a/core/src/library/library.rs b/core/src/library/library.rs index 1f99e9158..01c190c1c 100644 --- a/core/src/library/library.rs +++ b/core/src/library/library.rs @@ -20,6 +20,7 @@ use std::{ sync::Arc, }; +use sd_p2p::spacetunnel::Identity; use tokio::{fs, io}; use tracing::warn; use uuid::Uuid; @@ -44,6 +45,8 @@ pub struct Library { pub node_local_id: i32, /// node_context holds the node context for the node which this library is running on. pub(super) node_context: NodeContext, + /// p2p identity + pub identity: Arc, pub orphan_remover: OrphanRemoverActor, } diff --git a/core/src/library/manager.rs b/core/src/library/manager.rs index ea9a828b6..1afb7e497 100644 --- a/core/src/library/manager.rs +++ b/core/src/library/manager.rs @@ -1,7 +1,7 @@ use crate::{ invalidate_query, location::{indexer::rules, LocationManagerError}, - node::Platform, + node::{NodeConfig, Platform}, object::orphan_remover::OrphanRemoverActor, prisma::{location, node}, sync::{SyncManager, SyncMessage}, @@ -9,24 +9,47 @@ use crate::{ db::{self, MissingFieldError}, error::{FileIOError, NonUtf8PathError}, migrator::{Migrate, MigratorError}, + MaybeUndefined, }, NodeContext, }; use std::{ - env, path::{Path, PathBuf}, str::FromStr, sync::Arc, }; +use chrono::Local; +use sd_p2p::spacetunnel::{Identity, IdentityErr}; use thiserror::Error; -use tokio::{fs, io, sync::RwLock, try_join}; -use tracing::{debug, error, warn}; +use tokio::{ + fs, io, + sync::{broadcast, RwLock}, + try_join, +}; +use tracing::{debug, error, info, warn}; use uuid::Uuid; use super::{Library, LibraryConfig, LibraryConfigWrapped}; +pub enum SubscriberEvent { + Load(Uuid, Arc, broadcast::Receiver), +} + +impl Clone for SubscriberEvent { + fn clone(&self) -> Self { + match self { + Self::Load(id, identity, receiver) => { + Self::Load(*id, identity.clone(), receiver.resubscribe()) + } + } + } +} + +pub trait SubscriberFn: Fn(SubscriberEvent) + Send + Sync + 'static {} +impl SubscriberFn for F {} + /// LibraryManager is a singleton that manages all libraries for a node. pub struct LibraryManager { /// libraries_dir holds the path to the directory where libraries are stored. @@ -35,6 +58,8 @@ pub struct LibraryManager { libraries: RwLock>, /// node_context holds the context for the node which this library manager is running on. node_context: NodeContext, + /// on load subscribers + subscribers: RwLock>>, } #[derive(Error, Debug)] @@ -65,6 +90,10 @@ pub enum LibraryManagerError { NonUtf8Path(#[from] NonUtf8PathError), #[error("failed to watch locations: {0}")] LocationWatcher(#[from] LocationManagerError), + #[error("failed to parse library p2p identity: {0}")] + Identity(#[from] IdentityErr), + #[error("current node with id '{0}' was not found in the database")] + CurrentNodeNotFound(String), #[error("missing-field: {0}")] MissingField(#[from] MissingFieldError), } @@ -89,6 +118,7 @@ impl LibraryManager { .map_err(|e| FileIOError::from((&libraries_dir, e)))?; let mut libraries = Vec::new(); + let subscribers = RwLock::new(Vec::new()); let mut read_dir = fs::read_dir(&libraries_dir) .await .map_err(|e| FileIOError::from((&libraries_dir, e)))?; @@ -131,7 +161,14 @@ impl LibraryManager { } libraries.push( - Self::load(library_id, &db_path, config_path, node_context.clone()).await?, + Self::load( + library_id, + &db_path, + config_path, + node_context.clone(), + &subscribers, + ) + .await?, ); } } @@ -140,6 +177,7 @@ impl LibraryManager { libraries: RwLock::new(libraries), libraries_dir, node_context, + subscribers, }); debug!("LibraryManager initialized"); @@ -147,18 +185,33 @@ impl LibraryManager { Ok(this) } + /// subscribe to library events + pub(crate) async fn subscribe(&self, f: F) { + self.subscribers.write().await.push(Box::new(f)); + } + + async fn emit(subscribers: &RwLock>>, event: SubscriberEvent) { + let subscribers = subscribers.read().await; + for subscriber in subscribers.iter() { + subscriber(event.clone()); + } + } + /// create creates a new library with the given config and mounts it into the running [LibraryManager]. pub(crate) async fn create( &self, config: LibraryConfig, + node_cfg: NodeConfig, ) -> Result { - self.create_with_uuid(Uuid::new_v4(), config).await + self.create_with_uuid(Uuid::new_v4(), config, node_cfg) + .await } pub(crate) async fn create_with_uuid( &self, id: Uuid, config: LibraryConfig, + node_cfg: NodeConfig, ) -> Result { if config.name.is_empty() || config.name.chars().all(|x| x.is_whitespace()) { return Err(LibraryManagerError::InvalidConfig( @@ -180,9 +233,25 @@ impl LibraryManager { self.libraries_dir.join(format!("{id}.db")), config_path, self.node_context.clone(), + &self.subscribers, ) .await?; + // Create node + node::Create { + pub_id: config.node_id.as_bytes().to_vec(), + name: node_cfg.name.clone(), + platform: Platform::current() as i32, + date_created: Local::now().into(), + _params: vec![ + node::identity::set(Some(config.identity.clone())), + node::node_peer_id::set(Some(node_cfg.keypair.peer_id().to_string())), + ], + } + .to_query(&library.db) + .exec() + .await?; + debug!("Loaded library '{id:?}'"); // Run seeders @@ -196,7 +265,10 @@ impl LibraryManager { debug!("Pushed library into manager '{id:?}'"); - Ok(LibraryConfigWrapped { uuid: id, config }) + Ok(LibraryConfigWrapped { + uuid: id, + config: config.into(), + }) } pub(crate) async fn get_all_libraries_config(&self) -> Vec { @@ -205,21 +277,17 @@ impl LibraryManager { .await .iter() .map(|lib| LibraryConfigWrapped { - config: lib.config.clone(), + config: lib.config.clone().into(), uuid: lib.id, }) .collect() } - // pub(crate) async fn get_all_libraries(&self) -> Vec { - // self.libraries.read().await.clone() - // } - pub(crate) async fn edit( &self, id: Uuid, name: Option, - description: Option, + description: MaybeUndefined, ) -> Result<(), LibraryManagerError> { // check library is valid let mut libraries = self.libraries.write().await; @@ -232,8 +300,10 @@ impl LibraryManager { if let Some(name) = name { library.config.name = name; } - if let Some(description) = description { - library.config.description = description; + match description { + MaybeUndefined::Undefined => {} + MaybeUndefined::Null => library.config.description = None, + MaybeUndefined::Value(description) => library.config.description = Some(description), } LibraryConfig::save( @@ -313,11 +383,12 @@ impl LibraryManager { } /// load the library from a given path - pub(crate) async fn load( + async fn load( id: Uuid, db_path: impl AsRef, config_path: PathBuf, node_context: NodeContext, + subscribers: &RwLock>>, ) -> Result { let db_path = db_path.as_ref(); let db_url = format!( @@ -328,50 +399,73 @@ impl LibraryManager { ); let db = Arc::new(db::load_and_migrate(&db_url).await?); - let config = LibraryConfig::load_and_migrate(&config_path, &db).await?; - let node_config = node_context.config.get().await; + let config = LibraryConfig::load_and_migrate( + &config_path, + &(node_config.id, node_config.keypair.peer_id(), db.clone()), + ) + .await?; + let identity = Arc::new(Identity::from_bytes(&config.identity)?); - let platform = match env::consts::OS { - "windows" => Platform::Windows, - "macos" => Platform::MacOS, - "linux" => Platform::Linux, - _ => Platform::Unknown, - }; - - let uuid_vec = id.as_bytes().to_vec(); let node_data = db .node() - .upsert( - node::pub_id::equals(uuid_vec.clone()), - node::create( - uuid_vec, - node_config.name.clone(), - vec![node::platform::set(platform as i32)], - ), - vec![node::name::set(node_config.name.clone())], - ) + .find_unique(node::pub_id::equals(node_config.id.as_bytes().to_vec())) .exec() - .await?; + .await? + .ok_or_else(|| LibraryManagerError::CurrentNodeNotFound(id.to_string()))?; + + let curr_platform = Platform::current() as i32; + if node_data.platform != curr_platform { + info!( + "Detected change of platform for library '{}', was previously '{}' and will change to '{}'. Reconciling node data.", + id, + node_data.platform, + curr_platform + ); + + db.node() + .update( + node::pub_id::equals(node_data.pub_id.clone()), + vec![ + node::platform::set(curr_platform), + node::name::set(node_config.name.clone()), + ], + ) + .exec() + .await?; + } + + if node_data.name != node_config.name { + info!( + "Detected change of node name for library '{}', was previously '{}' and will change to '{}'. Reconciling node data.", + id, + node_data.name, + node_config.name, + ); + + db.node() + .update( + node::pub_id::equals(node_data.pub_id), + vec![node::name::set(node_config.name.clone())], + ) + .exec() + .await?; + } + + // TODO: Move this reconciliation into P2P and do reconciliation of both local and remote nodes. // let key_manager = Arc::new(KeyManager::new(vec![]).await?); // seed_keymanager(&db, &key_manager).await?; rules::seeder(&db).await?; - let (sync_manager, mut sync_rx) = SyncManager::new(&db, id); + let (sync_manager, sync_rx) = SyncManager::new(&db, id); - tokio::spawn({ - let node_context = node_context.clone(); - - async move { - while let Ok(op) = sync_rx.recv().await { - let SyncMessage::Created(op) = op else { continue; }; - - node_context.p2p.broadcast_sync_events(id, vec![op]).await; - } - } - }); + Self::emit( + subscribers, + SubscriberEvent::Load(id, identity.clone(), sync_rx), + ) + .await; let library = Library { id, @@ -383,6 +477,7 @@ impl LibraryManager { db, node_local_id: node_data.id, node_context, + identity, }; for location in library diff --git a/core/src/location/indexer/indexer_job.rs b/core/src/location/indexer/indexer_job.rs index 7dd6c0341..8b9256d2e 100644 --- a/core/src/location/indexer/indexer_job.rs +++ b/core/src/location/indexer/indexer_job.rs @@ -59,7 +59,7 @@ impl StatefulJob for IndexerJob { /// Creates a vector of valid path buffers from a directory, chunked into batches of `BATCH_SIZE`. async fn init( &self, - mut ctx: &mut WorkerContext, + ctx: &mut WorkerContext, state: &mut JobState, ) -> Result<(), JobError> { let location_id = state.init.location.id; @@ -109,7 +109,7 @@ impl StatefulJob for IndexerJob { walk( &to_walk_path, &indexer_rules, - update_notifier_fn(BATCH_SIZE, &mut ctx), + update_notifier_fn(BATCH_SIZE, ctx), file_paths_db_fetcher_fn!(&db), to_remove_db_fetcher_fn!(location_id, location_path, &db), iso_file_path_factory(location_id, location_path), @@ -146,7 +146,7 @@ impl StatefulJob for IndexerJob { ); IndexerJobData::on_scan_progress( - &mut ctx, + ctx, vec![ScanProgress::Message(format!( "Starting saving {total_paths} files or directories, \ there still {to_walk_count} directories to index", @@ -176,7 +176,7 @@ impl StatefulJob for IndexerJob { /// Process each chunk of entries in the indexer job, writing to the `file_path` table async fn execute_step( &self, - mut ctx: &mut WorkerContext, + ctx: &mut WorkerContext, state: &mut JobState, ) -> Result<(), JobError> { let data = extract_job_data_mut!(state); @@ -186,7 +186,7 @@ impl StatefulJob for IndexerJob { let start_time = Instant::now(); IndexerJobData::on_scan_progress( - &mut ctx, + ctx, vec![ ScanProgress::SavedChunks(step.chunk_idx), ScanProgress::Message(format!( @@ -221,7 +221,7 @@ impl StatefulJob for IndexerJob { keep_walking( to_walk_entry, &data.indexer_rules, - update_notifier_fn(BATCH_SIZE, &mut ctx), + update_notifier_fn(BATCH_SIZE, ctx), file_paths_db_fetcher_fn!(&db), to_remove_db_fetcher_fn!(location_id, location_path, &db), iso_file_path_factory(location_id, location_path), diff --git a/core/src/node/config.rs b/core/src/node/config.rs index 14df7cbda..7fe27831b 100644 --- a/core/src/node/config.rs +++ b/core/src/node/config.rs @@ -15,7 +15,7 @@ use crate::util::migrator::{Migrate, MigratorError}; pub const NODE_STATE_CONFIG_NAME: &str = "node_state.sdconfig"; /// NodeConfig is the configuration for a node. This is shared between all libraries and is stored in a JSON file on disk. -#[derive(Debug, Serialize, Deserialize, Clone, Type)] +#[derive(Debug, Serialize, Deserialize, Clone)] // If you are adding `specta::Type` on this your probably about to leak the P2P private key pub struct NodeConfig { /// id is a unique identifier for the current node. Each node has a public identifier (this one) and is given a local id for each library (done within the library code). pub id: Uuid, @@ -24,7 +24,7 @@ pub struct NodeConfig { // the port this node uses for peer to peer communication. By default a random free port will be chosen each time the application is started. pub p2p_port: Option, /// The p2p identity keypair for this node. This is used to identify the node on the network. - #[specta(skip)] + /// This keypair does effectively nothing except for provide libp2p with a stable peer_id. pub keypair: Keypair, // TODO: These will probs be replaced by your Spacedrive account in the near future. pub p2p_email: Option, @@ -63,6 +63,24 @@ impl Migrate for NodeConfig { type Ctx = (); + fn default(_path: PathBuf) -> Result { + Ok(Self { + id: Uuid::new_v4(), + name: match hostname::get() { + // SAFETY: This is just for display purposes so it doesn't matter if it's lossy + Ok(hostname) => hostname.to_string_lossy().into_owned(), + Err(err) => { + eprintln!("Falling back to default node name as an error occurred getting your systems hostname: '{err}'"); + "my-spacedrive".into() + } + }, + p2p_port: None, + keypair: Keypair::generate(), + p2p_email: None, + p2p_img_url: None, + }) + } + async fn migrate( from_version: u32, _config: &mut Map, diff --git a/core/src/node/mod.rs b/core/src/node/mod.rs index 13660ef1e..02bafa52f 100644 --- a/core/src/node/mod.rs +++ b/core/src/node/mod.rs @@ -1,35 +1,13 @@ -use crate::{prisma::node, NodeError}; - +use crate::NodeError; use serde::{Deserialize, Serialize}; use specta::Type; -use uuid::Uuid; mod config; -pub mod peer_request; pub use config::*; -#[derive(Debug, Clone, Serialize, Deserialize, Type)] -pub struct LibraryNode { - pub uuid: Uuid, - pub name: String, - pub platform: Platform, -} - -impl TryFrom for LibraryNode { - type Error = String; - - fn try_from(data: node::Data) -> Result { - Ok(Self { - uuid: Uuid::from_slice(&data.pub_id).map_err(|_| "Invalid node pub_id")?, - name: data.name, - platform: Platform::try_from(data.platform).map_err(|_| "Invalid platform_id")?, - }) - } -} - #[allow(clippy::upper_case_acronyms)] -#[repr(i32)] +#[repr(u8)] #[derive(Debug, Clone, Copy, Serialize, Deserialize, Type, Eq, PartialEq)] pub enum Platform { Unknown = 0, @@ -40,10 +18,32 @@ pub enum Platform { Android = 5, } -impl TryFrom for Platform { +impl Platform { + #[allow(unreachable_code)] + pub fn current() -> Self { + #[cfg(target_os = "windows")] + return Self::Windows; + + #[cfg(target_os = "macos")] + return Self::MacOS; + + #[cfg(target_os = "linux")] + return Self::Linux; + + #[cfg(target_os = "ios")] + return Self::IOS; + + #[cfg(target_os = "android")] + return Self::Android; + + Self::Unknown + } +} + +impl TryFrom for Platform { type Error = NodeError; - fn try_from(value: i32) -> Result { + fn try_from(value: u8) -> Result { let s = match value { 0 => Self::Unknown, 1 => Self::Windows, diff --git a/core/src/node/peer_request.rs b/core/src/node/peer_request.rs deleted file mode 100644 index 83af4243c..000000000 --- a/core/src/node/peer_request.rs +++ /dev/null @@ -1,154 +0,0 @@ -#![allow(dead_code, unused_variables, clippy::panic, clippy::unwrap_used)] // TODO: Reenable once this is working - -use serde::{Deserialize, Serialize}; -use specta::Type; -use tokio::sync::{mpsc, oneshot}; - -pub enum PeerRequest { - Guest(guest::PeerRequest), - Host(host::PeerRequest), -} - -enum PlaceholderP2PAction { - SubmitPeeringPassword { - peer_id: String, - password: String, - tx: oneshot::Sender, - }, -} - -pub mod guest { - use super::*; - - #[derive(Type, Deserialize)] - pub enum Action { - PromptPassword, - ProcessPassword { password: String }, - } - - #[derive(Type, Serialize, Clone)] - pub enum State { - Start, - AwaitingPassword { prev_invalid: bool }, - AwaitingConfirmation, - ChallengeSuccess, - } - - pub struct PeerRequest { - pub tx: mpsc::Sender, - pub peer_id: String, - } - - struct ActorArgs { - peer_id: String, - p2p: mpsc::Sender, - } - - async fn loop_until(rx: &mut mpsc::Receiver, func: impl Fn(T) -> Option) -> R { - loop { - let Some(msg) = rx.recv().await else { - panic!() - }; - - if let Some(d) = func(msg) { - break d; - } - } - } - - impl PeerRequest { - pub fn new_actor(peer_id: String) -> (Self, mpsc::Receiver) { - let (itx, irx) = mpsc::channel(8); - let (otx, orx) = mpsc::channel(8); - let (p2ptx, _) = mpsc::channel(8); - - tokio::spawn(Self::actor( - otx, - irx, - ActorArgs { - peer_id: peer_id.clone(), - p2p: p2ptx, - }, - )); - - (Self { tx: itx, peer_id }, orx) - } - - async fn actor( - state_tx: mpsc::Sender, - mut action_rx: mpsc::Receiver, - ActorArgs { peer_id, p2p }: ActorArgs, - ) { - let send_state = |state| async { state_tx.send(state).await.ok() }; - - send_state(State::Start).await; - - loop_until(&mut action_rx, |msg| { - matches!(Action::PromptPassword, msg).then_some(()) - }) - .await; - - send_state(State::AwaitingPassword { - prev_invalid: false, - }) - .await; - - loop { - let password = loop_until(&mut action_rx, |msg| match msg { - Action::ProcessPassword { password } => Some(password), - _ => None, - }) - .await; - - let (tx, rx) = oneshot::channel(); - p2p.send(PlaceholderP2PAction::SubmitPeeringPassword { - peer_id: peer_id.clone(), - password, - tx, - }) - .await - .ok(); - - if rx.await.unwrap() { - break; - } - - send_state(State::AwaitingPassword { prev_invalid: true }).await; - } - - send_state(State::ChallengeSuccess).await; - } - - pub async fn submit_password(&self, password: String) { - self.tx - .send(Action::ProcessPassword { password }) - .await - .ok(); - } - } -} - -pub mod host { - use super::*; - - #[derive(Type, Deserialize)] - pub enum Action { - PromptPassword, - ProcessPassword { password: String }, - } - - #[derive(Type, Serialize, Clone)] - pub enum State { - AwaitingResponse, - ChallengeReceived, - } - - pub struct PeerRequest { - pub tx: mpsc::Sender, - pub peer_id: String, - } - - // impl PeerRequest { - // pub fn new_actor() -> (Self, mpsc::Receiver) {} - // } -} diff --git a/core/src/object/preview/thumbnail/mod.rs b/core/src/object/preview/thumbnail/mod.rs index 6d5cc3b34..9d54f7550 100644 --- a/core/src/object/preview/thumbnail/mod.rs +++ b/core/src/object/preview/thumbnail/mod.rs @@ -264,7 +264,7 @@ async fn process_step( } Ok(()) } - Err(e) => Err(e.into()), + Err(e) => Err(e), } } diff --git a/core/src/p2p/mod.rs b/core/src/p2p/mod.rs index 397684634..22aa62d08 100644 --- a/core/src/p2p/mod.rs +++ b/core/src/p2p/mod.rs @@ -1,3 +1,5 @@ +#![allow(clippy::unwrap_used, clippy::panic)] // TODO: Remove once this is fully stablised + mod p2p_manager; mod peer_metadata; mod protocol; diff --git a/core/src/p2p/p2p_manager.rs b/core/src/p2p/p2p_manager.rs index 23263a33a..8c58c0cad 100644 --- a/core/src/p2p/p2p_manager.rs +++ b/core/src/p2p/p2p_manager.rs @@ -1,19 +1,24 @@ -#![allow(clippy::unwrap_used)] // TODO: Remove once this is fully stablised - use std::{ borrow::Cow, collections::HashMap, path::PathBuf, - sync::Arc, + str::FromStr, + sync::{ + atomic::{AtomicU16, Ordering}, + Arc, + }, time::{Duration, Instant}, }; +use chrono::Utc; use futures::Stream; use sd_p2p::{ - spaceblock::{BlockSize, SpacedropRequest, Transfer}, + spaceblock::{BlockSize, SpaceblockRequest, Transfer}, spacetime::SpaceTimeStream, + spacetunnel::{Identity, Tunnel}, Event, Manager, ManagerError, MetadataManager, PeerId, }; +use sd_prisma::prisma::node; use sd_sync::CRDTOperation; use serde::Serialize; use specta::Type; @@ -23,12 +28,14 @@ use tokio::{ sync::{broadcast, oneshot, Mutex}, time::sleep, }; -use tracing::{debug, error, info}; +use tracing::{debug, error, info, warn}; use uuid::Uuid; use crate::{ - node::{NodeConfig, NodeConfigManager}, - p2p::{OperatingSystem, SPACEDRIVE_APP_ID}, + library::{Library, LibraryManager, SubscriberEvent}, + node::{NodeConfig, NodeConfigManager, Platform}, + p2p::{NodeInformation, OperatingSystem, SyncRequestError, SPACEDRIVE_APP_ID}, + sync::SyncMessage, }; use super::{Header, PeerMetadata}; @@ -58,12 +65,15 @@ pub struct P2PManager { spacedrop_pairing_reqs: Arc>>>>, pub metadata_manager: Arc>, pub spacedrop_progress: Arc>>>, + pairing_id: AtomicU16, + library_manager: Arc, } impl P2PManager { pub async fn new( node_config: Arc, - ) -> Result<(Arc, broadcast::Receiver<(Uuid, Vec)>), ManagerError> { + library_manager: Arc, + ) -> Result, ManagerError> { let (config, keypair) = { let config = node_config.get().await; (Self::config_to_metadata(&config), config.keypair) @@ -82,16 +92,15 @@ impl P2PManager { // need to keep 'rx' around so that the channel isn't dropped let (tx, rx) = broadcast::channel(100); - let (tx2, rx2) = broadcast::channel(100); let spacedrop_pairing_reqs = Arc::new(Mutex::new(HashMap::new())); let spacedrop_progress = Arc::new(Mutex::new(HashMap::new())); tokio::spawn({ let events = tx.clone(); - // let sync_events = tx2.clone(); let spacedrop_pairing_reqs = spacedrop_pairing_reqs.clone(); let spacedrop_progress = spacedrop_progress.clone(); + let library_manager = library_manager.clone(); async move { let mut shutdown = false; @@ -117,9 +126,9 @@ impl P2PManager { } Event::PeerMessage(mut event) => { let events = events.clone(); - let sync_events = tx2.clone(); let spacedrop_pairing_reqs = spacedrop_pairing_reqs.clone(); let spacedrop_progress = spacedrop_progress.clone(); + let library_manager = library_manager.clone(); tokio::spawn(async move { let header = Header::from_stream(&mut event.stream).await.unwrap(); @@ -150,11 +159,14 @@ impl P2PManager { .await .insert(id, process_tx.clone()); - if let Err(_) = events.send(P2PEvent::SpacedropRequest { - id, - peer_id: event.peer_id, - name: req.name.clone(), - }) { + if events + .send(P2PEvent::SpacedropRequest { + id, + peer_id: event.peer_id, + name: req.name.clone(), + }) + .is_err() + { // No frontend's are active todo!("Outright reject Spacedrop"); @@ -163,8 +175,6 @@ impl P2PManager { tokio::select! { _ = sleep(SPACEDROP_TIMEOUT) => { info!("spacedrop({id}): timeout, rejecting!"); - - return; } file_path = rx => { match file_path { @@ -183,26 +193,123 @@ impl P2PManager { } Ok(None) => { info!("spacedrop({id}): rejected"); - return; } Err(_) => { info!("spacedrop({id}): error with Spacedrop pairing request receiver!"); - return; } } } }; } - Header::Sync(library_id, len) => { + Header::Pair(library_id) => { + let mut stream = match event.stream { + SpaceTimeStream::Unicast(stream) => stream, + _ => { + // TODO: Return an error to the remote client + error!("Received Spacedrop request from peer '{}' but it's not a unicast stream!", event.peer_id); + return; + } + }; + + info!( + "Starting pairing with '{}' for library '{library_id}'", + event.peer_id + ); + + // TODO: Authentication and security stuff + + let library = + library_manager.get_library(library_id).await.unwrap(); + + debug!("Waiting for nodeinfo from the remote node"); + let remote_info = NodeInformation::from_stream(&mut stream) + .await + .unwrap(); + debug!( + "Received nodeinfo from the remote node: {:?}", + remote_info + ); + + debug!("Creating node in database"); + node::Create { + pub_id: remote_info.pub_id.as_bytes().to_vec(), + name: remote_info.name, + platform: remote_info.platform as i32, + date_created: Utc::now().into(), + _params: vec![ + node::identity::set(Some( + remote_info.public_key.to_bytes().to_vec(), + )), + node::node_peer_id::set(Some( + event.peer_id.to_string(), + )), + ], + } + // TODO: Should this be in a transaction in case it fails? + .to_query(&library.db) + .exec() + .await + .unwrap(); + + let info = NodeInformation { + pub_id: library.config.node_id, + name: library.config.name, + public_key: library.identity.to_remote_identity(), + platform: Platform::current(), + }; + + debug!("Sending nodeinfo to the remote node"); + stream.write_all(&info.to_bytes()).await.unwrap(); + + info!( + "Paired with '{}' for library '{library_id}'", + remote_info.pub_id + ); // TODO: Use hash of identity cert here cause pub_id can be forged + } + Header::Sync(library_id) => { + let stream = match event.stream { + SpaceTimeStream::Unicast(stream) => stream, + _ => { + // TODO: Return an error to the remote client + error!("Received Spacedrop request from peer '{}' but it's not a unicast stream!", event.peer_id); + return; + } + }; + + let mut stream = Tunnel::from_stream(stream).await.unwrap(); + + let mut len = [0; 4]; + stream + .read_exact(&mut len) + .await + .map_err(SyncRequestError::PayloadLenIoError) + .unwrap(); + let len = u32::from_le_bytes(len); + let mut buf = vec![0; len as usize]; // TODO: Designed for easily being able to be DOS the current Node - event.stream.read_exact(&mut buf).await.unwrap(); + stream.read_exact(&mut buf).await.unwrap(); let mut buf: &[u8] = &buf; - let operations = rmp_serde::from_read(&mut buf).unwrap(); + let operations: Vec = + rmp_serde::from_read(&mut buf).unwrap(); - println!("Received sync events for library '{library_id}': {operations:?}"); + debug!("ingesting sync events for library '{library_id}': {operations:?}"); - sync_events.send((library_id, operations)).unwrap(); + let Some(library) = library_manager.get_library(library_id).await else { + warn!("error ingesting sync messages. no library by id '{library_id}' found!"); + return; + }; + + for op in operations { + library.sync.ingest_op(op).await.unwrap_or_else( + |err| { + error!( + "error ingesting operation for library '{}': {err:?}", + library.id + ); + }, + ); + } } } }); @@ -233,8 +340,29 @@ impl P2PManager { spacedrop_pairing_reqs, metadata_manager, spacedrop_progress, + pairing_id: AtomicU16::new(0), + library_manager: library_manager.clone(), }); + library_manager + .subscribe({ + let this = this.clone(); + move |event| match event { + SubscriberEvent::Load(library_id, library_identity, mut sync_rx) => { + let this = this.clone(); + tokio::spawn(async move { + while let Ok(op) = sync_rx.recv().await { + let SyncMessage::Created(op) = op else { continue; }; + + this.broadcast_sync_events(library_id, &library_identity, vec![op]) + .await; + } + }); + } + } + }) + .await; + // TODO: Probs remove this once connection timeout/keepalive are working correctly tokio::spawn({ let this = this.clone(); @@ -246,7 +374,7 @@ impl P2PManager { } }); - Ok((this, rx2)) + Ok(this) } fn config_to_metadata(config: &NodeConfig) -> PeerMetadata { @@ -259,6 +387,7 @@ impl P2PManager { } } + #[allow(unused)] // TODO: Should probs be using this pub async fn update_metadata(&self, node_config_manager: &NodeConfigManager) { self.metadata_manager .update(Self::config_to_metadata(&node_config_manager.get().await)); @@ -280,8 +409,69 @@ impl P2PManager { self.events.0.subscribe() } - #[allow(unused)] // TODO: Remove `allow(unused)` once integrated - pub async fn broadcast_sync_events(&self, library_id: Uuid, event: Vec) { + pub fn pair(&self, peer_id: PeerId, lib: Library) -> u16 { + let pairing_id = self.pairing_id.fetch_add(1, Ordering::SeqCst); + + let manager = self.manager.clone(); + tokio::spawn(async move { + info!( + "Started pairing session '{pairing_id}' with peer '{peer_id}' for library '{}'", + lib.id + ); + + let mut stream = manager.stream(peer_id).await.unwrap(); + + let header = Header::Pair(lib.id); + stream.write_all(&header.to_bytes()).await.unwrap(); + + // TODO: Apply some security here cause this is so open to MITM + // TODO: Signing and a SPAKE style pin prompt + + let info = NodeInformation { + pub_id: lib.config.node_id, + name: lib.config.name, + public_key: lib.identity.to_remote_identity(), + platform: Platform::current(), + }; + + debug!("Sending nodeinfo to remote node"); + stream.write_all(&info.to_bytes()).await.unwrap(); + + debug!("Waiting for nodeinfo from the remote node"); + let remote_info = NodeInformation::from_stream(&mut stream).await.unwrap(); + debug!("Received nodeinfo from the remote node: {:?}", remote_info); + + node::Create { + pub_id: remote_info.pub_id.as_bytes().to_vec(), + name: remote_info.name, + platform: remote_info.platform as i32, + date_created: Utc::now().into(), + _params: vec![ + node::identity::set(Some(remote_info.public_key.to_bytes().to_vec())), + node::node_peer_id::set(Some(peer_id.to_string())), + ], + } + // TODO: Should this be in a transaction in case it fails? + .to_query(&lib.db) + .exec() + .await + .unwrap(); + + info!( + "Paired with '{}' for library '{}'", + remote_info.pub_id, lib.id + ); // TODO: Use hash of identity cert here cause pub_id can be forged + }); + + pairing_id + } + + pub async fn broadcast_sync_events( + &self, + library_id: Uuid, + _identity: &Identity, + event: Vec, + ) { let mut buf = match rmp_serde::to_vec_named(&event) { Ok(buf) => buf, Err(e) => { @@ -289,12 +479,43 @@ impl P2PManager { return; } }; - let mut head_buf = Header::Sync(library_id, buf.len() as u32).to_bytes(); // Max Sync payload is like 4GB + let mut head_buf = Header::Sync(library_id).to_bytes(); // Max Sync payload is like 4GB + head_buf.extend_from_slice(&(buf.len() as u32).to_le_bytes()); head_buf.append(&mut buf); - debug!("broadcasting sync events. payload_len={}", buf.len()); + // TODO: Determine which clients we share that library with - self.manager.broadcast(head_buf).await; + // TODO: Establish a connection to them + + let library = self.library_manager.get_library(library_id).await.unwrap(); + // TODO: probs cache this query in memory cause this is gonna be stupid frequent + let target_nodes = library + .db + .node() + .find_many(vec![]) + .exec() + .await + .unwrap() + .into_iter() + .map(|n| { + PeerId::from_str(&n.node_peer_id.expect("Node was missing 'node_peer_id'!")) + .unwrap() + }) + .collect::>(); + + info!( + "Sending sync messages for library '{}' to nodes with peer id's '{:?}'", + library_id, target_nodes + ); + + // TODO: Do in parallel + for peer_id in target_nodes { + let stream = self.manager.stream(peer_id).await.map_err(|_| ()).unwrap(); // TODO: handle providing incorrect peer id + + let mut tunnel = Tunnel::from_stream(stream).await.unwrap(); + + tunnel.write_all(&head_buf).await.unwrap(); + } } pub async fn ping(&self) { @@ -314,7 +535,7 @@ impl P2PManager { let file = File::open(&path).await.map_err(|_| ())?; let metadata = file.metadata().await.map_err(|_| ())?; - let header = Header::Spacedrop(SpacedropRequest { + let header = Header::Spacedrop(SpaceblockRequest { name: path .file_name() .map(|v| v.to_string_lossy()) diff --git a/core/src/p2p/protocol.rs b/core/src/p2p/protocol.rs index d5eb26a0a..ef366d572 100644 --- a/core/src/p2p/protocol.rs +++ b/core/src/p2p/protocol.rs @@ -1,18 +1,24 @@ +use std::string::FromUtf8Error; + use thiserror::Error; -use tokio::io::AsyncReadExt; +use tokio::io::{AsyncRead, AsyncReadExt}; use uuid::Uuid; use sd_p2p::{ - spaceblock::{SpacedropRequest, SpacedropRequestError}, + spaceblock::{SpaceblockRequest, SpacedropRequestError}, spacetime::SpaceTimeStream, + spacetunnel::{IdentityErr, RemoteIdentity}, }; +use crate::node::Platform; + /// TODO #[derive(Debug, PartialEq, Eq)] pub enum Header { Ping, - Spacedrop(SpacedropRequest), - Sync(Uuid, u32), + Spacedrop(SpaceblockRequest), + Pair(Uuid), + Sync(Uuid), } #[derive(Debug, Error)] @@ -49,7 +55,7 @@ impl Header { match discriminator { 0 => match stream { SpaceTimeStream::Unicast(stream) => Ok(Self::Spacedrop( - SpacedropRequest::from_stream(stream).await?, + SpaceblockRequest::from_stream(stream).await?, )), _ => Err(HeaderError::SpacedropOverMulticastIsForbidden), }, @@ -61,16 +67,19 @@ impl Header { .await .map_err(SyncRequestError::LibraryIdIoError)?; - let mut len = [0; 4]; + Ok(Self::Pair( + Uuid::from_slice(&uuid).map_err(SyncRequestError::ErrorDecodingLibraryId)?, + )) + } + 3 => { + let mut uuid = [0u8; 16]; stream - .read_exact(&mut len) + .read_exact(&mut uuid) .await - .map_err(SyncRequestError::PayloadLenIoError)?; - let len = u32::from_le_bytes(len); + .map_err(SyncRequestError::LibraryIdIoError)?; Ok(Self::Sync( Uuid::from_slice(&uuid).map_err(SyncRequestError::ErrorDecodingLibraryId)?, - len, )) } d => Err(HeaderError::InvalidDiscriminator(d)), @@ -85,41 +94,176 @@ impl Header { bytes } Self::Ping => vec![1], - Self::Sync(uuid, len) => { + Self::Pair(library_id) => { let mut bytes = vec![2]; + bytes.extend_from_slice(library_id.as_bytes()); + bytes + } + Self::Sync(uuid) => { + let mut bytes = vec![3]; bytes.extend_from_slice(uuid.as_bytes()); - - let len_buf = len.to_le_bytes(); - debug_assert_eq!(len_buf.len(), 4); // TODO: Is this bad because `len` is usize?? - bytes.extend_from_slice(&len_buf); - bytes } } } } -// TODO: Unit test it because binary protocols are error prone -// #[cfg(test)] -// mod tests { -// use super::*; +#[derive(Debug, Error)] +pub enum NodeInformationError { + #[error("io error decoding node information library pub_id: {0}")] + ErrorDecodingUuid(std::io::Error), + #[error("error formatting node information library pub_id: {0}")] + UuidFormatError(uuid::Error), + #[error("io error reading node information library name length: {0}")] + NameLenIoError(std::io::Error), + #[error("io error decoding node information library name: {0}")] + ErrorDecodingName(std::io::Error), + #[error("error formatting node information library name: {0}")] + NameFormatError(FromUtf8Error), + #[error("io error reading node information public key length: {0}")] + PublicKeyLenIoError(std::io::Error), + #[error("io error decoding node information public key: {0}")] + ErrorDecodingPublicKey(std::io::Error), + #[error("error decoding public key: {0}")] + ErrorParsingPublicKey(#[from] IdentityErr), + #[error("io error reading node information platform id: {0}")] + PlatformIdError(std::io::Error), +} -// #[test] -// fn test_proto() { -// assert_eq!( -// Header::from_bytes(&Header::Ping.to_bytes()), -// Ok(Header::Ping) -// ); +/// is shared between nodes during pairing and contains the information to identify the node. +#[derive(Debug, PartialEq, Eq)] +pub struct NodeInformation { + pub pub_id: Uuid, + pub name: String, + pub public_key: RemoteIdentity, + pub platform: Platform, +} -// assert_eq!( -// Header::from_bytes(&Header::Spacedrop.to_bytes()), -// Ok(Header::Spacedrop) -// ); +impl NodeInformation { + pub async fn from_stream( + stream: &mut (impl AsyncRead + Unpin), + ) -> Result { + let pub_id = { + let mut buf = vec![0u8; 16]; + stream + .read_exact(&mut buf) + .await + .map_err(NodeInformationError::ErrorDecodingUuid)?; -// let uuid = Uuid::new_v4(); -// assert_eq!( -// Header::from_bytes(&Header::Sync(uuid).to_bytes()), -// Ok(Header::Sync(uuid)) -// ); -// } -// } + Uuid::from_slice(&buf).map_err(NodeInformationError::UuidFormatError)? + }; + + let name = { + let len = stream + .read_u16_le() + .await + .map_err(NodeInformationError::NameLenIoError)?; + + let mut buf = vec![0u8; len as usize]; + stream + .read_exact(&mut buf) + .await + .map_err(NodeInformationError::ErrorDecodingName)?; + + String::from_utf8(buf).map_err(NodeInformationError::NameFormatError)? + }; + + let public_key = { + let len = stream + .read_u16_le() + .await + .map_err(NodeInformationError::PublicKeyLenIoError)?; + + let mut buf = vec![0u8; len as usize]; + stream + .read_exact(&mut buf) + .await + .map_err(NodeInformationError::ErrorDecodingPublicKey)?; + + RemoteIdentity::from_bytes(&buf)? + }; + + let platform = stream + .read_u8() + .await + .map_err(NodeInformationError::PlatformIdError)?; + + Ok(Self { + pub_id, + name, + public_key, + platform: Platform::try_from(platform).unwrap_or(Platform::Unknown), + }) + } + + pub fn to_bytes(&self) -> Vec { + let mut buf = Vec::new(); + + // Pub id + buf.extend(self.pub_id.as_bytes()); + + // Name + let len_buf = (self.name.len() as u16).to_le_bytes(); + if self.name.len() > u16::MAX as usize { + panic!("Name is too long!"); // TODO: Error handling + } + buf.extend_from_slice(&len_buf); + buf.extend(self.name.as_bytes()); + + // Public key // TODO: Can I use a fixed size array? + let pk = self.public_key.to_bytes(); + let len_buf = (pk.len() as u16).to_le_bytes(); + if pk.len() > u16::MAX as usize { + panic!("Public key is too long!"); // TODO: Error handling + } + buf.extend_from_slice(&len_buf); + buf.extend(pk); + + // Platform + buf.push(self.platform as u8); + + buf + } +} + +#[cfg(test)] +mod tests { + use super::*; + use sd_p2p::spacetunnel::Identity; + + #[tokio::test] + async fn test_node_information() { + let original = NodeInformation { + pub_id: Uuid::new_v4(), + name: "Name".into(), + public_key: Identity::new().to_remote_identity(), + platform: Platform::current(), + }; + + let buf = original.to_bytes(); + let mut cursor = std::io::Cursor::new(buf); + let info = NodeInformation::from_stream(&mut cursor).await.unwrap(); + + assert_eq!(original, info); + } + + // TODO: Unit test it because binary protocols are error prone + // #[test] + // fn test_proto() { + // assert_eq!( + // Header::from_bytes(&Header::Ping.to_bytes()), + // Ok(Header::Ping) + // ); + + // assert_eq!( + // Header::from_bytes(&Header::Spacedrop.to_bytes()), + // Ok(Header::Spacedrop) + // ); + + // let uuid = Uuid::new_v4(); + // assert_eq!( + // Header::from_bytes(&Header::Sync(uuid).to_bytes()), + // Ok(Header::Sync(uuid)) + // ); + // } +} diff --git a/core/src/sync/manager.rs b/core/src/sync/manager.rs index 600008e3f..49ceaff3f 100644 --- a/core/src/sync/manager.rs +++ b/core/src/sync/manager.rs @@ -157,14 +157,15 @@ impl SyncManager { pub async fn ingest_op(&self, op: CRDTOperation) -> prisma_client_rust::Result<()> { let db = &self.db; - db.node() - .upsert( - node::pub_id::equals(op.node.as_bytes().to_vec()), - node::create(op.node.as_bytes().to_vec(), "TEMP".to_string(), vec![]), - vec![], - ) + if db + .node() + .find_unique(node::pub_id::equals(op.node.as_bytes().to_vec())) .exec() - .await?; + .await? + .is_none() + { + panic!("Node is not paired!") + } let msg = SyncMessage::Ingested(op.clone()); @@ -294,7 +295,6 @@ impl SyncManager { .await?; } }, - _ => todo!(), } if let CRDTOperationType::Shared(shared_op) = op.typ { diff --git a/core/src/util/db.rs b/core/src/util/db.rs index b03d177df..ab5889a7d 100644 --- a/core/src/util/db.rs +++ b/core/src/util/db.rs @@ -101,7 +101,7 @@ impl<'a, T> OptionalField for &'a Option { } } -pub fn maybe_missing<'a, T: OptionalField>( +pub fn maybe_missing( data: T, field: &'static str, ) -> Result { diff --git a/core/src/util/debug_initializer.rs b/core/src/util/debug_initializer.rs index 647e019c6..c36e50fbe 100644 --- a/core/src/util/debug_initializer.rs +++ b/core/src/util/debug_initializer.rs @@ -12,10 +12,12 @@ use crate::{ location::{ delete_location, scan_location, LocationCreateArgs, LocationError, LocationManagerError, }, + node::NodeConfig, prisma::location, util::AbortOnDrop, }; use prisma_client_rust::QueryError; +use sd_p2p::spacetunnel::Identity; use serde::Deserialize; use thiserror::Error; use tokio::{ @@ -93,7 +95,11 @@ impl InitConfig { Ok(None) } - pub async fn apply(self, library_manager: &LibraryManager) -> Result<(), InitConfigError> { + pub async fn apply( + self, + library_manager: &LibraryManager, + node_cfg: NodeConfig, + ) -> Result<(), InitConfigError> { info!("Initializing app from file: {:?}", self.path); for lib in self.libraries { @@ -108,13 +114,17 @@ impl InitConfig { let library = match library_manager.get_library(lib.id).await { Some(lib) => lib, None => { + let node_pub_id = Uuid::new_v4(); let library = library_manager .create_with_uuid( lib.id, LibraryConfig { name: lib.name, - description: lib.description.unwrap_or("".to_string()), + description: lib.description, + identity: Identity::new().to_bytes(), + node_id: node_pub_id, }, + node_cfg.clone(), ) .await?; diff --git a/core/src/util/maybe_undefined.rs b/core/src/util/maybe_undefined.rs new file mode 100644 index 000000000..f32a88565 --- /dev/null +++ b/core/src/util/maybe_undefined.rs @@ -0,0 +1,58 @@ +//! Copied from: https://docs.rs/async-graphql/latest/async_graphql/types/enum.MaybeUndefined.html +#![allow(unused)] + +use serde::{Deserialize, Deserializer, Serialize, Serializer}; +use specta::Type; + +#[derive(Debug, Clone, Type)] +#[specta(untagged)] +pub enum MaybeUndefined { + Undefined, + Null, + Value(T), +} + +impl MaybeUndefined> { + /// Transposes a `MaybeUndefined` of a [`Result`] into a [`Result`] of a + /// `MaybeUndefined`. + /// + /// [`MaybeUndefined::Undefined`] will be mapped to + /// [`Ok`]`(`[`MaybeUndefined::Undefined`]`)`. [`MaybeUndefined::Null`] + /// will be mapped to [`Ok`]`(`[`MaybeUndefined::Null`]`)`. + /// [`MaybeUndefined::Value`]`(`[`Ok`]`(_))` and + /// [`MaybeUndefined::Value`]`(`[`Err`]`(_))` will be mapped to + /// [`Ok`]`(`[`MaybeUndefined::Value`]`(_))` and [`Err`]`(_)`. + #[inline] + pub fn transpose(self) -> Result, E> { + match self { + MaybeUndefined::Undefined => Ok(MaybeUndefined::Undefined), + MaybeUndefined::Null => Ok(MaybeUndefined::Null), + MaybeUndefined::Value(Ok(v)) => Ok(MaybeUndefined::Value(v)), + MaybeUndefined::Value(Err(e)) => Err(e), + } + } +} + +impl Serialize for MaybeUndefined { + fn serialize(&self, serializer: S) -> Result { + match self { + MaybeUndefined::Value(value) => value.serialize(serializer), + _ => serializer.serialize_none(), + } + } +} + +impl<'de, T> Deserialize<'de> for MaybeUndefined +where + T: Deserialize<'de>, +{ + fn deserialize(deserializer: D) -> Result, D::Error> + where + D: Deserializer<'de>, + { + Option::::deserialize(deserializer).map(|value| match value { + Some(value) => MaybeUndefined::Value(value), + None => MaybeUndefined::Null, + }) + } +} diff --git a/core/src/util/migrator.rs b/core/src/util/migrator.rs index 83a61f8c2..e767f4cbe 100644 --- a/core/src/util/migrator.rs +++ b/core/src/util/migrator.rs @@ -2,7 +2,7 @@ use std::{ any::type_name, fs::File, io::{self, BufReader, Seek, Write}, - path::Path, + path::{Path, PathBuf}, }; use serde::{de::DeserializeOwned, Deserialize, Serialize}; @@ -23,11 +23,13 @@ pub struct BaseConfig { /// System for managing app level migrations on a config file so we can introduce breaking changes to the app without the user needing to reset their whole system. #[async_trait::async_trait] -pub trait Migrate: Sized + DeserializeOwned + Serialize + Default { +pub trait Migrate: Sized + DeserializeOwned + Serialize { const CURRENT_VERSION: u32; type Ctx: Sync; + fn default(path: PathBuf) -> Result; + async fn migrate( from_version: u32, config: &mut Map, @@ -90,7 +92,7 @@ pub trait Migrate: Sized + DeserializeOwned + Serialize + Default { Ok(serde_json::from_value(Value::Object(cfg.other))?) } false => Ok(serde_json::from_value(Value::Object( - Self::default().save(path)?.other, + Self::default(path.into())?.save(path)?.other, ))?), } } @@ -128,6 +130,8 @@ pub enum MigratorError { Database(#[from] prisma_client_rust::QueryError), #[error("We detected a Spacedrive config from a super early version of the app!")] HasSuperLegacyConfig, + #[error("file '{}' was not found by the migrator!", .0.display())] + ConfigFileMissing(PathBuf), #[error("custom migration error: {0}")] Custom(String), } @@ -154,6 +158,10 @@ mod test { type Ctx = (); + fn default(_path: PathBuf) -> Result { + Ok(::default()) + } + async fn migrate( to_version: u32, config: &mut Map, diff --git a/core/src/util/mod.rs b/core/src/util/mod.rs index c61f99f09..97269a620 100644 --- a/core/src/util/mod.rs +++ b/core/src/util/mod.rs @@ -3,7 +3,9 @@ pub mod db; #[cfg(debug_assertions)] pub mod debug_initializer; pub mod error; +mod maybe_undefined; pub mod migrator; pub mod version_manager; pub use abort_on_drop::*; +pub use maybe_undefined::*; diff --git a/crates/p2p/Cargo.toml b/crates/p2p/Cargo.toml index 245515acc..2d8668099 100644 --- a/crates/p2p/Cargo.toml +++ b/crates/p2p/Cargo.toml @@ -32,6 +32,9 @@ specta = { workspace = true } flume = "0.10.14" tokio-util = { version = "0.7.8", features = ["compat"] } arc-swap = "1.6.0" +p384 = { version = "0.13.0", feature = ["ecdh"] } +ed25519-dalek = { version = "1.0.1", features = ["rand"] } +rand_core = { version = "0.5.1", feature = ["getrandom"] } [dev-dependencies] tokio = { workspace = true, features = ["rt-multi-thread"] } diff --git a/crates/p2p/src/lib.rs b/crates/p2p/src/lib.rs index cc32a3fef..a53e0c32f 100644 --- a/crates/p2p/src/lib.rs +++ b/crates/p2p/src/lib.rs @@ -8,6 +8,7 @@ mod metadata_manager; mod peer; pub mod spaceblock; pub mod spacetime; +pub mod spacetunnel; mod utils; pub use event::*; diff --git a/crates/p2p/src/manager.rs b/crates/p2p/src/manager.rs index e365caef9..f9efcc4c2 100644 --- a/crates/p2p/src/manager.rs +++ b/crates/p2p/src/manager.rs @@ -1,5 +1,5 @@ use std::{ - collections::HashSet, + collections::{HashMap, HashSet}, net::SocketAddr, sync::{ atomic::{AtomicBool, AtomicU64}, @@ -41,7 +41,7 @@ impl Manager { .then_some(()) .ok_or(ManagerError::InvalidAppName)?; - let peer_id = PeerId(keypair.peer_id()); + let peer_id = PeerId(keypair.raw_peer_id()); let (event_stream_tx, event_stream_rx) = mpsc::channel(1024); let (mdns, mdns_state) = Mdns::new(application_name, peer_id, metadata_manager) @@ -67,7 +67,7 @@ impl Manager { .map(|(p, c), _| (p, StreamMuxerBox::new(c))) .boxed(), SpaceTime::new(this.clone()), - keypair.peer_id(), + keypair.raw_peer_id(), ) .build(); { @@ -92,6 +92,7 @@ impl Manager { mdns, queued_events: Default::default(), shutdown: AtomicBool::new(false), + on_establish_streams: HashMap::new(), }, )) } @@ -129,6 +130,7 @@ impl Manager { }) } + #[allow(clippy::unused_unit)] // TODO: Remove this clippy override once error handling is added pub async fn stream(&self, peer_id: PeerId) -> Result { // TODO: With this system you can send to any random peer id. Can I reduce that by requiring `.connect(peer_id).unwrap().send(data)` or something like that. let (tx, rx) = oneshot::channel(); @@ -136,6 +138,8 @@ impl Manager { .await; let mut stream = rx.await.map_err(|_| { warn!("failed to queue establishing stream to peer '{peer_id}'!"); + + () })?; stream.write_discriminator().await.unwrap(); // TODO: Error handling Ok(stream) diff --git a/crates/p2p/src/manager_stream.rs b/crates/p2p/src/manager_stream.rs index 58da7e7ad..a6a8e31ce 100644 --- a/crates/p2p/src/manager_stream.rs +++ b/crates/p2p/src/manager_stream.rs @@ -1,5 +1,5 @@ use std::{ - collections::VecDeque, + collections::{HashMap, VecDeque}, fmt, net::SocketAddr, sync::{ @@ -57,16 +57,14 @@ impl From> for ManagerStreamAction -where - TMetadata: Metadata, -{ +pub struct ManagerStream { pub(crate) manager: Arc>, pub(crate) event_stream_rx: mpsc::Receiver>, pub(crate) swarm: Swarm>, pub(crate) mdns: Mdns, pub(crate) queued_events: VecDeque>, pub(crate) shutdown: AtomicBool, + pub(crate) on_establish_streams: HashMap>, } impl ManagerStream @@ -109,7 +107,20 @@ where return Some(event); } }, - SwarmEvent::ConnectionEstablished { .. } => {}, + SwarmEvent::ConnectionEstablished { peer_id, .. } => { + if let Some(streams) = self.on_establish_streams.remove(&peer_id) { + for event in streams { + self.swarm + .behaviour_mut() + .pending_events + .push_back(ToSwarm::NotifyHandler { + peer_id, + handler: NotifyHandler::Any, + event + }); + } + } + }, SwarmEvent::ConnectionClosed { .. } => {}, SwarmEvent::IncomingConnection { local_addr, .. } => debug!("incoming connection from '{}'", local_addr), SwarmEvent::IncomingConnectionError { local_addr, error, .. } => warn!("handshake error with incoming connection from '{}': {}", local_addr, error), @@ -195,7 +206,7 @@ where .addresses(addresses.iter().map(socketaddr_to_quic_multiaddr).collect()) .build(), ) { - Ok(_) => {} + Ok(()) => {} Err(err) => warn!( "error dialing peer '{}' with addresses '{:?}': {}", peer_id, addresses, err @@ -203,14 +214,45 @@ where } } ManagerStreamAction::StartStream(peer_id, rx) => { - self.swarm - .behaviour_mut() - .pending_events - .push_back(ToSwarm::NotifyHandler { - peer_id: peer_id.0, - handler: NotifyHandler::Any, - event: OutboundRequest::Unicast(rx), - }); + if !self.swarm.connected_peers().any(|v| *v == peer_id.0) { + let addresses = self + .mdns + .state + .discovered + .read() + .await + .get(&peer_id) + .unwrap() + .addresses + .clone(); + + match self.swarm.dial( + DialOpts::peer_id(peer_id.0) + .condition(PeerCondition::Disconnected) + .addresses(addresses.iter().map(socketaddr_to_quic_multiaddr).collect()) + .build(), + ) { + Ok(()) => {} + Err(err) => warn!( + "error dialing peer '{}' with addresses '{:?}': {}", + peer_id, addresses, err + ), + } + + self.on_establish_streams + .entry(peer_id.0) + .or_default() + .push(OutboundRequest::Unicast(rx)); + } else { + self.swarm + .behaviour_mut() + .pending_events + .push_back(ToSwarm::NotifyHandler { + peer_id: peer_id.0, + handler: NotifyHandler::Any, + event: OutboundRequest::Unicast(rx), + }); + } } ManagerStreamAction::BroadcastData(data) => { let connected_peers = self.swarm.connected_peers().copied().collect::>(); diff --git a/crates/p2p/src/mdns.rs b/crates/p2p/src/mdns.rs index 53661bcf8..32149ba77 100644 --- a/crates/p2p/src/mdns.rs +++ b/crates/p2p/src/mdns.rs @@ -39,7 +39,7 @@ where service_name: String, next_mdns_advertisement: Pin>, trigger_advertisement: mpsc::UnboundedReceiver<()>, - state: Arc>, + pub(crate) state: Arc>, } impl Mdns diff --git a/crates/p2p/src/spaceblock/mod.rs b/crates/p2p/src/spaceblock/mod.rs index 0a10e775a..1b0392890 100644 --- a/crates/p2p/src/spaceblock/mod.rs +++ b/crates/p2p/src/spaceblock/mod.rs @@ -43,7 +43,7 @@ impl BlockSize { /// TODO #[derive(Debug, Clone, PartialEq, Eq)] -pub struct SpacedropRequest { +pub struct SpaceblockRequest { pub name: String, pub size: u64, // TODO: Include file permissions @@ -62,20 +62,24 @@ pub enum SpacedropRequestError { SizeIoError(std::io::Error), } -impl SpacedropRequest { +impl SpaceblockRequest { pub async fn from_stream( stream: &mut (impl AsyncRead + Unpin), ) -> Result { - let name_len = stream - .read_u16_le() - .await - .map_err(SpacedropRequestError::NameLenIoError)?; - let mut name = vec![0u8; name_len as usize]; - stream - .read_exact(&mut name) - .await - .map_err(SpacedropRequestError::NameIoError)?; - let name = String::from_utf8(name).map_err(SpacedropRequestError::NameFormatError)?; + let name = { + let len = stream + .read_u16_le() + .await + .map_err(SpacedropRequestError::NameLenIoError)?; + + let mut buf = vec![0u8; len as usize]; + stream + .read_exact(&mut buf) + .await + .map_err(SpacedropRequestError::NameIoError)?; + + String::from_utf8(buf).map_err(SpacedropRequestError::NameFormatError)? + }; let size = stream .read_u64_le() @@ -153,7 +157,7 @@ impl<'a> Block<'a> { /// TODO pub struct Transfer<'a, F> { - req: &'a SpacedropRequest, + req: &'a SpaceblockRequest, on_progress: F, } @@ -161,7 +165,7 @@ impl<'a, F> Transfer<'a, F> where F: Fn(u8) + 'a, { - pub fn new(req: &'a SpacedropRequest, on_progress: F) -> Self { + pub fn new(req: &'a SpaceblockRequest, on_progress: F) -> Self { Self { req, on_progress } } @@ -242,14 +246,14 @@ mod tests { #[tokio::test] async fn test_spaceblock_request() { - let req = SpacedropRequest { + let req = SpaceblockRequest { name: "Demo".to_string(), size: 42069, block_size: BlockSize::from_size(42069), }; let bytes = req.to_bytes(); - let req2 = SpacedropRequest::from_stream(&mut Cursor::new(bytes)) + let req2 = SpaceblockRequest::from_stream(&mut Cursor::new(bytes)) .await .unwrap(); assert_eq!(req, req2); @@ -261,7 +265,7 @@ mod tests { // This is sent out of band of Spaceblock let data = b"Spacedrive".to_vec(); - let req = SpacedropRequest { + let req = SpaceblockRequest { name: "Demo".to_string(), size: data.len() as u64, block_size: BlockSize::from_size(data.len() as u64), @@ -297,7 +301,7 @@ mod tests { let data = vec![0u8; block_size as usize * 4]; // Let's pacman some RAM let block_size = BlockSize::dangerously_new(block_size); - let req = SpacedropRequest { + let req = SpaceblockRequest { name: "Demo".to_string(), size: data.len() as u64, block_size, diff --git a/crates/p2p/src/spacetunnel/identity.rs b/crates/p2p/src/spacetunnel/identity.rs new file mode 100644 index 000000000..0c7f95288 --- /dev/null +++ b/crates/p2p/src/spacetunnel/identity.rs @@ -0,0 +1,54 @@ +use ed25519_dalek::PublicKey; +use rand_core::OsRng; +use thiserror::Error; + +#[derive(Debug, Error)] +#[error(transparent)] +pub struct IdentityErr(#[from] ed25519_dalek::ed25519::Error); + +/// TODO +pub struct Identity(ed25519_dalek::Keypair); + +impl Default for Identity { + fn default() -> Self { + Self(ed25519_dalek::Keypair::generate(&mut OsRng)) + } +} + +impl Identity { + pub fn new() -> Self { + Self::default() + } + + pub fn from_bytes(bytes: &[u8]) -> Result { + Ok(Self(ed25519_dalek::Keypair::from_bytes(bytes)?)) + } + + pub fn to_bytes(&self) -> Vec { + self.0.to_bytes().to_vec() + } + + pub fn public_key(&self) -> PublicKey { + self.0.public + } + + pub fn to_remote_identity(&self) -> RemoteIdentity { + RemoteIdentity(self.0.public) + } +} +#[derive(Debug, PartialEq, Eq)] +pub struct RemoteIdentity(ed25519_dalek::PublicKey); + +impl RemoteIdentity { + pub fn from_bytes(bytes: &[u8]) -> Result { + Ok(Self(ed25519_dalek::PublicKey::from_bytes(bytes)?)) + } + + pub fn to_bytes(&self) -> [u8; 32] { + self.0.to_bytes() + } + + pub fn public_key(&self) -> PublicKey { + self.0 + } +} diff --git a/crates/p2p/src/spacetunnel/mod.rs b/crates/p2p/src/spacetunnel/mod.rs new file mode 100644 index 000000000..b007bb2e7 --- /dev/null +++ b/crates/p2p/src/spacetunnel/mod.rs @@ -0,0 +1,7 @@ +//! A system for creating encrypted tunnels between peers on untrusted connections. + +mod identity; +mod tunnel; + +pub use identity::*; +pub use tunnel::*; diff --git a/crates/p2p/src/spacetunnel/tunnel.rs b/crates/p2p/src/spacetunnel/tunnel.rs new file mode 100644 index 000000000..4dfbccd9e --- /dev/null +++ b/crates/p2p/src/spacetunnel/tunnel.rs @@ -0,0 +1,64 @@ +use std::{ + io, + pin::Pin, + task::{Context, Poll}, +}; + +use tokio::io::{AsyncRead, AsyncReadExt, AsyncWrite, ReadBuf}; + +use crate::spacetime::UnicastStream; + +pub struct Tunnel { + stream: UnicastStream, +} + +impl Tunnel { + // TODO: Proper errors + pub async fn from_stream(mut stream: UnicastStream) -> Result { + let discriminator = stream + .read_u8() + .await + .map_err(|_| "Error reading discriminator. Is this stream actually a tunnel?")?; + if discriminator != b'T' { + return Err("Invalid discriminator. Is this stream actually a tunnel?"); + } + + // TODO: Do pairing + + Ok(Self { stream }) + } +} + +impl AsyncRead for Tunnel { + fn poll_read( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + buf: &mut ReadBuf<'_>, + ) -> Poll> { + // TODO: Do decryption + + Pin::new(&mut self.get_mut().stream).poll_read(cx, buf) + } +} + +impl AsyncWrite for Tunnel { + fn poll_write( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + buf: &[u8], + ) -> Poll> { + // TODO: Do encryption + + Pin::new(&mut self.get_mut().stream).poll_write(cx, buf) + } + + fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + Pin::new(&mut self.get_mut().stream).poll_flush(cx) + } + + fn poll_shutdown(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + Pin::new(&mut self.get_mut().stream).poll_shutdown(cx) + } +} + +// TODO: Unit tests diff --git a/crates/p2p/src/utils/keypair.rs b/crates/p2p/src/utils/keypair.rs index 62e4f6426..9ae1a71fb 100644 --- a/crates/p2p/src/utils/keypair.rs +++ b/crates/p2p/src/utils/keypair.rs @@ -9,7 +9,14 @@ impl Keypair { Self(ed25519::Keypair::generate()) } - pub fn peer_id(&self) -> libp2p::PeerId { + pub fn peer_id(&self) -> crate::PeerId { + let pk: libp2p::identity::PublicKey = self.0.public().into(); + + crate::PeerId(libp2p::PeerId::from_public_key(&pk)) + } + + // TODO: Maybe try and remove + pub fn raw_peer_id(&self) -> libp2p::PeerId { let pk: libp2p::identity::PublicKey = self.0.public().into(); libp2p::PeerId::from_public_key(&pk) diff --git a/crates/p2p/src/utils/peer_id.rs b/crates/p2p/src/utils/peer_id.rs index e33b07b6d..a533db003 100644 --- a/crates/p2p/src/utils/peer_id.rs +++ b/crates/p2p/src/utils/peer_id.rs @@ -9,6 +9,12 @@ pub struct PeerId( pub(crate) libp2p::PeerId, ); +// impl PeerId { +// pub fn to_string(&self) -> String { +// self.0.to_string() +// } +// } + impl FromStr for PeerId { #[allow(deprecated)] type Err = libp2p::core::ParseError; diff --git a/interface/app/$libraryId/Layout/Sidebar/LibrarySection.tsx b/interface/app/$libraryId/Layout/Sidebar/LibrarySection.tsx index dd704fc9d..8773be041 100644 --- a/interface/app/$libraryId/Layout/Sidebar/LibrarySection.tsx +++ b/interface/app/$libraryId/Layout/Sidebar/LibrarySection.tsx @@ -2,7 +2,13 @@ import { Laptop } from '@sd/assets/icons'; import clsx from 'clsx'; import { useEffect, useState } from 'react'; import { Link, NavLink } from 'react-router-dom'; -import { arraysEqual, useBridgeQuery, useLibraryQuery, useOnlineLocations } from '@sd/client'; +import { + arraysEqual, + useBridgeQuery, + useFeatureFlag, + useLibraryQuery, + useOnlineLocations +} from '@sd/client'; import { AddLocationButton } from '~/app/$libraryId/settings/library/locations/AddLocationButton'; import { Folder } from '~/components/Folder'; import { SubtleButton } from '~/components/SubtleButton'; @@ -26,6 +32,7 @@ export const LibrarySection = () => { const locations = useLibraryQuery(['locations.list'], { keepPreviousData: true }); const tags = useLibraryQuery(['tags.list'], { keepPreviousData: true }); const onlineLocations = useOnlineLocations(); + const isPairingEnabled = useFeatureFlag('p2pPairing'); const [triggeredContextItem, setTriggeredContextItem] = useState( null ); @@ -47,9 +54,13 @@ export const LibrarySection = () => {
+ isPairingEnabled ? ( + + + + ) : ( - + ) } > {/* diff --git a/interface/app/$libraryId/settings/Sidebar.tsx b/interface/app/$libraryId/settings/Sidebar.tsx index 25262a966..02607b592 100644 --- a/interface/app/$libraryId/settings/Sidebar.tsx +++ b/interface/app/$libraryId/settings/Sidebar.tsx @@ -12,6 +12,7 @@ import { ShieldCheck, TagSimple } from 'phosphor-react'; +import { useFeatureFlag } from '@sd/client'; import { tw } from '@sd/ui'; import { useOperatingSystem } from '~/hooks/useOperatingSystem'; import Icon from '../Layout/Sidebar/Icon'; @@ -23,6 +24,7 @@ const Section = tw.div`space-y-0.5`; export default () => { const os = useOperatingSystem(); + const isPairingEnabled = useFeatureFlag('p2pPairing'); return (
@@ -68,7 +70,7 @@ export default () => { General - + Nodes diff --git a/interface/app/$libraryId/settings/library/general.tsx b/interface/app/$libraryId/settings/library/general.tsx index 1c9de9ce9..8bf59fe63 100644 --- a/interface/app/$libraryId/settings/library/general.tsx +++ b/interface/app/$libraryId/settings/library/general.tsx @@ -1,4 +1,4 @@ -import { useBridgeMutation, useLibraryContext } from '@sd/client'; +import { MaybeUndefined, useBridgeMutation, useLibraryContext } from '@sd/client'; import { Button, Input, dialogManager } from '@sd/ui'; import { useZodForm, z } from '@sd/ui/src/forms'; import { useDebouncedFormWatch } from '~/hooks'; @@ -9,23 +9,31 @@ import DeleteLibraryDialog from '../node/libraries/DeleteDialog'; const schema = z.object({ id: z.string(), name: z.string().min(1), - description: z.string() + description: z.string().nullable() }); +// TODO: With some extra upstream Specta work this should be able to be removed +function toMaybeUndefined(v: T | null | undefined): MaybeUndefined { + return v as any; +} + export const Component = () => { const { library } = useLibraryContext(); const editLibrary = useBridgeMutation('library.edit'); const form = useZodForm({ schema, - defaultValues: { id: library!.uuid, ...library?.config } + defaultValues: { + id: library!.uuid, + ...library?.config + } }); useDebouncedFormWatch(form, (value) => editLibrary.mutate({ id: library.uuid, name: value.name ?? null, - description: value.description ?? null + description: toMaybeUndefined(value.description) }) ); diff --git a/interface/app/$libraryId/settings/library/nodes.tsx b/interface/app/$libraryId/settings/library/nodes.tsx index 386874b0d..c3e4e84c9 100644 --- a/interface/app/$libraryId/settings/library/nodes.tsx +++ b/interface/app/$libraryId/settings/library/nodes.tsx @@ -1,12 +1,45 @@ +import { useDiscoveredPeers, useFeatureFlag, useLibraryMutation } from '@sd/client'; +import { Button } from '@sd/ui'; import { Heading } from '../Layout'; export const Component = () => { + const isPairingEnabled = useFeatureFlag('p2pPairing'); + return ( <> + + {/* TODO: Show paired nodes + unpair button */} + + {isPairingEnabled && } ); }; + +// TODO: This entire component shows a UI which is pairing by node but that is just not how it works. +function IncorrectP2PPairingPane() { + const onlineNodes = useDiscoveredPeers(); + const p2pPair = useLibraryMutation('p2p.pair', { + onSuccess(data) { + console.log(data); + } + }); + + console.log(onlineNodes); + + return ( + <> +

Pairing

+ {[...onlineNodes.entries()].map(([id, node]) => ( +
+

{node.name}

+ + +
+ ))} + + ); +} diff --git a/packages/client/src/core.ts b/packages/client/src/core.ts index e209751ac..21b2bae60 100644 --- a/packages/client/src/core.ts +++ b/packages/client/src/core.ts @@ -16,7 +16,7 @@ export type Procedures = { { key: "locations.indexer_rules.get", input: LibraryArgs, result: IndexerRule } | { key: "locations.indexer_rules.list", input: LibraryArgs, result: IndexerRule[] } | { key: "locations.indexer_rules.listForLocation", input: LibraryArgs, result: IndexerRule[] } | - { key: "locations.list", input: LibraryArgs, result: { id: number; pub_id: number[]; node_id: number | null; name: string | null; path: string | null; total_capacity: number | null; available_capacity: number | null; is_archived: boolean | null; generate_preview_media: boolean | null; sync_preview_media: boolean | null; hidden: boolean | null; date_created: string | null; node: Node | null }[] } | + { key: "locations.list", input: LibraryArgs, result: { id: number; pub_id: number[]; name: string | null; path: string | null; total_capacity: number | null; available_capacity: number | null; is_archived: boolean | null; generate_preview_media: boolean | null; sync_preview_media: boolean | null; hidden: boolean | null; date_created: string | null; node_id: number | null; node: Node | null }[] } | { key: "nodeState", input: never, result: NodeState } | { key: "search.objects", input: LibraryArgs, result: SearchData } | { key: "search.paths", input: LibraryArgs, result: SearchData } | @@ -55,8 +55,9 @@ export type Procedures = { { key: "locations.indexer_rules.delete", input: LibraryArgs, result: null } | { key: "locations.relink", input: LibraryArgs, result: null } | { key: "locations.update", input: LibraryArgs, result: null } | - { key: "nodes.changeNodeName", input: ChangeNodeNameArgs, result: NodeConfig } | + { key: "nodes.changeNodeName", input: ChangeNodeNameArgs, result: null } | { key: "p2p.acceptSpacedrop", input: [string, string | null], result: null } | + { key: "p2p.pair", input: LibraryArgs, result: number } | { key: "p2p.spacedrop", input: SpacedropArgs, result: string | null } | { key: "tags.assign", input: LibraryArgs, result: null } | { key: "tags.create", input: LibraryArgs, result: Tag } | @@ -90,7 +91,7 @@ export type CreateLibraryArgs = { name: string } export type DiskType = "SSD" | "HDD" | "Removable" -export type EditLibraryArgs = { id: string; name: string | null; description: string | null } +export type EditLibraryArgs = { id: string; name: string | null; description: MaybeUndefined } export type ExplorerItem = { type: "Path"; has_local_thumbnail: boolean; thumbnail_key: string[] | null; item: FilePathWithObject } | { type: "Object"; has_local_thumbnail: boolean; thumbnail_key: string[] | null; item: ObjectWithFilePaths } @@ -151,16 +152,11 @@ export type JobStatus = "Queued" | "Running" | "Completed" | "Canceled" | "Faile */ export type LibraryArgs = { library_id: string; arg: T } -/** - * LibraryConfig holds the configuration for a specific library. This is stored as a '{uuid}.sdlibrary' file. - */ -export type LibraryConfig = { name: string; description: string } - -export type LibraryConfigWrapped = { uuid: string; config: LibraryConfig } +export type LibraryConfigWrapped = { uuid: string; config: SanitisedLibraryConfig } export type LightScanArgs = { location_id: number; sub_path: string } -export type Location = { id: number; pub_id: number[]; node_id: number | null; name: string | null; path: string | null; total_capacity: number | null; available_capacity: number | null; is_archived: boolean | null; generate_preview_media: boolean | null; sync_preview_media: boolean | null; hidden: boolean | null; date_created: string | null } +export type Location = { id: number; pub_id: number[]; name: string | null; path: string | null; total_capacity: number | null; available_capacity: number | null; is_archived: boolean | null; generate_preview_media: boolean | null; sync_preview_media: boolean | null; hidden: boolean | null; date_created: string | null; node_id: number | null } /** * `LocationCreateArgs` is the argument received from the client using `rspc` to create a new location. @@ -179,18 +175,15 @@ export type LocationCreateArgs = { path: string; dry_run: boolean; indexer_rules */ export type LocationUpdateArgs = { id: number; name: string | null; generate_preview_media: boolean | null; sync_preview_media: boolean | null; hidden: boolean | null; indexer_rules_ids: number[] } -export type LocationWithIndexerRules = { id: number; pub_id: number[]; node_id: number | null; name: string | null; path: string | null; total_capacity: number | null; available_capacity: number | null; is_archived: boolean | null; generate_preview_media: boolean | null; sync_preview_media: boolean | null; hidden: boolean | null; date_created: string | null; indexer_rules: { indexer_rule: IndexerRule }[] } +export type LocationWithIndexerRules = { id: number; pub_id: number[]; name: string | null; path: string | null; total_capacity: number | null; available_capacity: number | null; is_archived: boolean | null; generate_preview_media: boolean | null; sync_preview_media: boolean | null; hidden: boolean | null; date_created: string | null; node_id: number | null; indexer_rules: { indexer_rule: IndexerRule }[] } export type MaybeNot = T | { not: T } +export type MaybeUndefined = null | null | T + export type MediaData = { id: number; pixel_width: number | null; pixel_height: number | null; longitude: number | null; latitude: number | null; fps: number | null; capture_device_make: string | null; capture_device_model: string | null; capture_device_software: string | null; duration_seconds: number | null; codecs: string | null; streams: number | null } -export type Node = { id: number; pub_id: number[]; name: string; platform: number; date_created: string } - -/** - * NodeConfig is the configuration for a node. This is shared between all libraries and is stored in a JSON file on disk. - */ -export type NodeConfig = { id: string; name: string; p2p_port: number | null; p2p_email: string | null; p2p_img_url: string | null } +export type Node = { id: number; pub_id: number[]; name: string; platform: number; date_created: string; identity: number[] | null; node_peer_id: string | null } export type NodeState = ({ id: string; name: string; p2p_port: number | null; p2p_email: string | null; p2p_img_url: string | null }) & { data_path: string } @@ -239,6 +232,8 @@ export type RenameOne = { from_file_path_id: number; to: string } export type RuleKind = "AcceptFilesByGlob" | "RejectFilesByGlob" | "AcceptIfChildrenDirectoriesArePresent" | "RejectIfChildrenDirectoriesArePresent" +export type SanitisedLibraryConfig = { name: string; description: string | null; node_id: string } + export type SanitisedNodeConfig = { id: string; name: string; p2p_port: number | null; p2p_email: string | null; p2p_img_url: string | null } export type SearchData = { cursor: number[] | null; items: T[] }