From 53ab3178c2c93d7ffd0bc5e87fde569968781b02 Mon Sep 17 00:00:00 2001 From: Oscar Beaumont Date: Mon, 17 Jul 2023 19:53:25 +0800 Subject: [PATCH] [ENG-906] Initial library sync (#1095) * ffs * typo * yeet library data over p2p * fix a bunch of edge cases * report complete status on responder * better log * fix types * mobile debug screen * mobile + P2P is a mess * feature flag mobile p2p pairing * wrong one --------- Co-authored-by: Brendan Allan --- apps/mobile/package.json | 1 + apps/mobile/src/App.tsx | 10 +- apps/mobile/src/main.tsx | 12 + .../src/navigation/SettingsNavigator.tsx | 7 + apps/mobile/src/screens/p2p/index.tsx | 26 ++ apps/mobile/src/screens/settings/Settings.tsx | 43 +- .../src/screens/settings/info/Debug.tsx | 35 ++ .../settings/library/NodesSettings.tsx | 36 +- core/src/library/config.rs | 19 +- core/src/library/manager.rs | 21 +- core/src/object/orphan_remover.rs | 70 ++-- core/src/p2p/pairing/initial_sync.rs | 368 ++++++++++++++++++ core/src/p2p/pairing/mod.rs | 90 ++++- core/src/p2p/pairing/proto.rs | 86 +++- core/src/util/debug_initializer.rs | 2 +- .../app/$libraryId/settings/library/nodes.tsx | 2 +- .../$libraryId/settings/resources/about.tsx | 18 +- interface/app/p2p/index.tsx | 5 +- interface/app/p2p/pairing.tsx | 3 + packages/client/src/core.ts | 2 +- packages/client/src/hooks/useDebugState.ts | 17 + packages/client/src/hooks/useP2PEvents.tsx | 2 +- pnpm-lock.yaml | Bin 916476 -> 916727 bytes 23 files changed, 781 insertions(+), 94 deletions(-) create mode 100644 apps/mobile/src/screens/p2p/index.tsx create mode 100644 apps/mobile/src/screens/settings/info/Debug.tsx create mode 100644 core/src/p2p/pairing/initial_sync.rs diff --git a/apps/mobile/package.json b/apps/mobile/package.json index 1177ad9c2..de1762bb4 100644 --- a/apps/mobile/package.json +++ b/apps/mobile/package.json @@ -32,6 +32,7 @@ "@tanstack/react-query": "^4.29.1", "class-variance-authority": "^0.5.3", "dayjs": "^1.11.8", + "event-target-polyfill": "^0.0.3", "expo": "~48.0.19", "expo-linking": "~4.0.1", "expo-media-library": "~15.2.3", diff --git a/apps/mobile/src/App.tsx b/apps/mobile/src/App.tsx index 5e3aaab99..b64cd29d4 100644 --- a/apps/mobile/src/App.tsx +++ b/apps/mobile/src/App.tsx @@ -19,6 +19,8 @@ import { useSnapshot } from 'valtio'; import { ClientContextProvider, LibraryContextProvider, + NotificationContextProvider, + P2PContextProvider, RspcProvider, initPlausible, useClientContext, @@ -30,6 +32,7 @@ import { useTheme } from './hooks/useTheme'; import { changeTwTheme, tw } from './lib/tailwind'; import RootNavigator from './navigation'; import OnboardingNavigator from './navigation/OnboardingNavigator'; +import { P2P } from './screens/p2p'; import { currentLibraryStore } from './utils/nav'; dayjs.extend(advancedFormat); @@ -111,7 +114,12 @@ function AppContainer() { - + + + + + + diff --git a/apps/mobile/src/main.tsx b/apps/mobile/src/main.tsx index 6d0139bcc..fe2960fbc 100644 --- a/apps/mobile/src/main.tsx +++ b/apps/mobile/src/main.tsx @@ -1,4 +1,5 @@ import AsyncStorage from '@react-native-async-storage/async-storage'; +import 'event-target-polyfill'; import * as SplashScreen from 'expo-splash-screen'; import { Suspense, lazy } from 'react'; import { Platform } from 'react-native'; @@ -7,6 +8,17 @@ import { reactNativeLink } from './lib/rspcReactNativeTransport'; // Enable the splash screen SplashScreen.preventAutoHideAsync(); +// The worlds worse pollyfill for "CustomEvent". I tried "custom-event-pollyfill" from npm but it uses `document` :( +if (typeof globalThis.CustomEvent !== 'function') { + // @ts-expect-error + globalThis.CustomEvent = (event, params) => { + const evt = new Event(event, params); + // @ts-expect-error + evt.detail = params.detail; + return evt; + }; +} + const _localStorage = new Map(); // We patch stuff onto `globalThis` so that `@sd/client` can use it. This is super hacky but as far as I can tell, there's no better way to do this. diff --git a/apps/mobile/src/navigation/SettingsNavigator.tsx b/apps/mobile/src/navigation/SettingsNavigator.tsx index 116ec478a..dc93c6644 100644 --- a/apps/mobile/src/navigation/SettingsNavigator.tsx +++ b/apps/mobile/src/navigation/SettingsNavigator.tsx @@ -7,6 +7,7 @@ import GeneralSettingsScreen from '~/screens/settings/client/GeneralSettings'; import LibrarySettingsScreen from '~/screens/settings/client/LibrarySettings'; import PrivacySettingsScreen from '~/screens/settings/client/PrivacySettings'; import AboutScreen from '~/screens/settings/info/About'; +import DebugScreen from '~/screens/settings/info/Debug'; import SupportScreen from '~/screens/settings/info/Support'; import EditLocationSettingsScreen from '~/screens/settings/library/EditLocationSettings'; // import KeysSettingsScreen from '~/screens/settings/library/KeysSettings'; @@ -104,6 +105,11 @@ export default function SettingsNavigator() { component={SupportScreen} options={{ headerTitle: 'Support' }} /> + ); } @@ -130,6 +136,7 @@ export type SettingsStackParamList = { // Info About: undefined; Support: undefined; + Debug: undefined; }; export type SettingsStackScreenProps = diff --git a/apps/mobile/src/screens/p2p/index.tsx b/apps/mobile/src/screens/p2p/index.tsx new file mode 100644 index 000000000..cf3980bbd --- /dev/null +++ b/apps/mobile/src/screens/p2p/index.tsx @@ -0,0 +1,26 @@ +import { useBridgeMutation, useFeatureFlag, useLibraryContext, useP2PEvents } from '@sd/client'; + +export function P2P() { + // const pairingResponse = useBridgeMutation('p2p.pairingResponse'); + // const activeLibrary = useLibraryContext(); + + const pairingEnabled = useFeatureFlag('p2pPairing'); + useP2PEvents((data) => { + if (data.type === 'PairingRequest' && pairingEnabled) { + console.log('Pairing incoming from', data.name); + + // TODO: open pairing screen and guide user through the process. For now we auto-accept + // pairingResponse.mutate([ + // data.id, + // { decision: 'accept', libraryId: activeLibrary.library.uuid } + // ]); + } + + // TODO: For now until UI is implemented + if (data.type === 'PairingProgress') { + console.log('Pairing progress', data); + } + }); + + return null; +} diff --git a/apps/mobile/src/screens/settings/Settings.tsx b/apps/mobile/src/screens/settings/Settings.tsx index 2f67a0f78..d4eb74de8 100644 --- a/apps/mobile/src/screens/settings/Settings.tsx +++ b/apps/mobile/src/screens/settings/Settings.tsx @@ -1,6 +1,7 @@ import { Books, FlyingSaucer, + Gear, GearSix, HardDrive, Heart, @@ -12,7 +13,8 @@ import { TagSimple } from 'phosphor-react-native'; import React from 'react'; -import { SectionList, Text, View } from 'react-native'; +import { SectionList, Text, TouchableWithoutFeedback, View } from 'react-native'; +import { DebugState, useDebugState, useDebugStateEnabler } from '@sd/client'; import { SettingsItem, SettingsItemDivider } from '~/components/settings/SettingsItem'; import { tw, twStyle } from '~/lib/tailwind'; import { SettingsStackParamList, SettingsStackScreenProps } from '~/navigation/SettingsNavigator'; @@ -26,7 +28,7 @@ type SectionType = { }[]; }; -const sections: SectionType[] = [ +const sections: (debugState: DebugState) => SectionType[] = (debugState) => [ { title: 'Client', data: [ @@ -99,7 +101,16 @@ const sections: SectionType[] = [ icon: Heart, navigateTo: 'Support', title: 'Support' - } + }, + ...(debugState.enabled + ? ([ + { + icon: Gear, + navigateTo: 'Debug', + title: 'Debug' + } + ] as const) + : []) ] } ]; @@ -118,10 +129,12 @@ function renderSectionHeader({ section }: { section: { title: string } }) { } export default function SettingsScreen({ navigation }: SettingsStackScreenProps<'Home'>) { + const debugState = useDebugState(); + return ( ( @@ -132,13 +145,7 @@ export default function SettingsScreen({ navigation }: SettingsStackScreenProps< /> )} renderSectionHeader={renderSectionHeader} - ListFooterComponent={ - - Spacedrive - {/* TODO: Get this automatically (expo-device have this?) */} - v0.1.0 - - } + ListFooterComponent={} showsVerticalScrollIndicator={false} stickySectionHeadersEnabled={false} initialNumToRender={50} @@ -146,3 +153,17 @@ export default function SettingsScreen({ navigation }: SettingsStackScreenProps< ); } + +function FooterComponent() { + const onClick = useDebugStateEnabler(); + + return ( + + + Spacedrive + + {/* TODO: Get this automatically (expo-device have this?) */} + v0.1.0 + + ); +} diff --git a/apps/mobile/src/screens/settings/info/Debug.tsx b/apps/mobile/src/screens/settings/info/Debug.tsx new file mode 100644 index 000000000..72df1e263 --- /dev/null +++ b/apps/mobile/src/screens/settings/info/Debug.tsx @@ -0,0 +1,35 @@ +import React from 'react'; +import { Text, View } from 'react-native'; +import { getDebugState, toggleFeatureFlag, useDebugState, useFeatureFlags } from '@sd/client'; +import { Button } from '~/components/primitive/Button'; +import { tw } from '~/lib/tailwind'; +import { SettingsStackScreenProps } from '~/navigation/SettingsNavigator'; + +const DebugScreen = ({ navigation }: SettingsStackScreenProps<'Debug'>) => { + const debugState = useDebugState(); + const featureFlags = useFeatureFlags(); + return ( + + Debug + + + {JSON.stringify(featureFlags)} + {JSON.stringify(debugState)} + + + ); +}; + +export default DebugScreen; diff --git a/apps/mobile/src/screens/settings/library/NodesSettings.tsx b/apps/mobile/src/screens/settings/library/NodesSettings.tsx index e54180a52..a214e91d2 100644 --- a/apps/mobile/src/screens/settings/library/NodesSettings.tsx +++ b/apps/mobile/src/screens/settings/library/NodesSettings.tsx @@ -1,12 +1,46 @@ import React from 'react'; import { Text, View } from 'react-native'; +import { isEnabled, useBridgeMutation, useDiscoveredPeers } from '@sd/client'; +import { Button } from '~/components/primitive/Button'; import { tw } from '~/lib/tailwind'; import { SettingsStackScreenProps } from '~/navigation/SettingsNavigator'; const NodesSettingsScreen = ({ navigation }: SettingsStackScreenProps<'NodesSettings'>) => { + const onlineNodes = useDiscoveredPeers(); + const p2pPair = useBridgeMutation('p2p.pair', { + onSuccess(data) { + console.log(data); + } + }); + return ( - TODO + Pairing + + {[...onlineNodes.entries()].map(([id, node]) => ( + + {node.name} + + + + ))} ); }; diff --git a/core/src/library/config.rs b/core/src/library/config.rs index c367ccee7..04e020929 100644 --- a/core/src/library/config.rs +++ b/core/src/library/config.rs @@ -35,7 +35,7 @@ pub struct LibraryConfig { #[async_trait::async_trait] impl Migrate for LibraryConfig { - const CURRENT_VERSION: u32 = 7; + const CURRENT_VERSION: u32 = 8; type Ctx = (NodeConfig, Arc); @@ -200,7 +200,7 @@ impl Migrate for LibraryConfig { if instances.len() > 1 { return Err(MigratorError::Custom( - "7 - More than one node found in the DB... This can't be automatically reconciled!" + "7 - More than one instance found in the DB... This can't be automatically reconciled!" .into(), )); } @@ -210,6 +210,9 @@ impl Migrate for LibraryConfig { )); }; + config.remove("instance_id"); + config.insert("instance_id".into(), Value::Number(instance.id.into())); + // We are relinking all locations to the current instance. // If you have more than one node in your database and your not @Oscar, something went horribly wrong so this is fine. db.location() @@ -217,6 +220,18 @@ impl Migrate for LibraryConfig { .exec() .await?; } + 8 => { + let instances = db.instance().find_many(vec![]).exec().await?; + let Some(instance) = instances.first() else { + return Err(MigratorError::Custom( + "8 - No nodes found... How did you even get this far?!".into(), + )); + }; + + // This should be in 7 but it's added to ensure to hell it runs. + config.remove("instance_id"); + config.insert("instance_id".into(), Value::Number(instance.id.into())); + } v => unreachable!("Missing migration for library version {}", v), } diff --git a/core/src/library/manager.rs b/core/src/library/manager.rs index 9927f9878..02f5fadb2 100644 --- a/core/src/library/manager.rs +++ b/core/src/library/manager.rs @@ -169,6 +169,7 @@ impl LibraryManager { node_context.clone(), &subscribers, None, + true, ) .await?, ); @@ -202,7 +203,7 @@ impl LibraryManager { description: Option, node_cfg: NodeConfig, ) -> Result { - self.create_with_uuid(Uuid::new_v4(), name, description, node_cfg) + self.create_with_uuid(Uuid::new_v4(), name, description, node_cfg, true) .await } @@ -212,6 +213,7 @@ impl LibraryManager { name: LibraryName, description: Option, node_cfg: NodeConfig, + should_seed: bool, ) -> Result { if name.as_ref().is_empty() || name.as_ref().chars().all(|x| x.is_whitespace()) { return Err(LibraryManagerError::InvalidConfig( @@ -251,16 +253,17 @@ impl LibraryManager { date_created: now, _params: vec![instance::id::set(config.instance_id)], }), + should_seed, ) .await?; debug!("Loaded library '{id:?}'"); - // Run seeders - tag::seed::new_library(&library).await?; - indexer::rules::seed::new_or_existing_library(&library).await?; - - debug!("Seeded library '{id:?}'"); + if should_seed { + tag::seed::new_library(&library).await?; + indexer::rules::seed::new_or_existing_library(&library).await?; + debug!("Seeded library '{id:?}'"); + } invalidate_query!(library, "library.list"); @@ -398,6 +401,7 @@ impl LibraryManager { node_context: NodeContext, subscribers: &RwLock>>, create: Option, + should_seed: bool, ) -> Result { let db_path = db_path.as_ref(); let db_url = format!( @@ -476,7 +480,10 @@ impl LibraryManager { identity, }; - indexer::rules::seed::new_or_existing_library(&library).await?; + if should_seed { + library.orphan_remover.invoke().await; + indexer::rules::seed::new_or_existing_library(&library).await?; + } for location in library .db diff --git a/core/src/object/orphan_remover.rs b/core/src/object/orphan_remover.rs index 789c3264b..71df0eefe 100644 --- a/core/src/object/orphan_remover.rs +++ b/core/src/object/orphan_remover.rs @@ -14,50 +14,44 @@ impl OrphanRemoverActor { pub fn spawn(db: Arc) -> Self { let (tx, mut rx) = channel(4); - tokio::spawn({ - let tx = tx.clone(); - async move { - tx.send(()).await.ok(); + tokio::spawn(async move { + while let Some(()) = rx.recv().await { + // prevents timeouts + tokio::time::sleep(Duration::from_millis(10)).await; - while let Some(()) = rx.recv().await { - // prevents timeouts - tokio::time::sleep(Duration::from_millis(10)).await; - - loop { - let objs = match db - .object() - .find_many(vec![object::file_paths::none(vec![])]) - .take(512) - .select(object::select!({ id pub_id })) - .exec() - .await - { - Ok(objs) => objs, - Err(e) => { - error!("Failed to fetch orphaned objects: {e}"); - break; - } - }; - - if objs.is_empty() { + loop { + let objs = match db + .object() + .find_many(vec![object::file_paths::none(vec![])]) + .take(512) + .select(object::select!({ id pub_id })) + .exec() + .await + { + Ok(objs) => objs, + Err(e) => { + error!("Failed to fetch orphaned objects: {e}"); break; } + }; - debug!("Removing {} orphaned objects", objs.len()); + if objs.is_empty() { + break; + } - let ids: Vec<_> = objs.iter().map(|o| o.id).collect(); + debug!("Removing {} orphaned objects", objs.len()); - if let Err(e) = db - ._batch(( - db.tag_on_object().delete_many(vec![ - tag_on_object::object_id::in_vec(ids.clone()), - ]), - db.object().delete_many(vec![object::id::in_vec(ids)]), - )) - .await - { - error!("Failed to remove orphaned objects: {e}"); - } + let ids: Vec<_> = objs.iter().map(|o| o.id).collect(); + + if let Err(e) = db + ._batch(( + db.tag_on_object() + .delete_many(vec![tag_on_object::object_id::in_vec(ids.clone())]), + db.object().delete_many(vec![object::id::in_vec(ids)]), + )) + .await + { + error!("Failed to remove orphaned objects: {e}"); } } } diff --git a/core/src/p2p/pairing/initial_sync.rs b/core/src/p2p/pairing/initial_sync.rs new file mode 100644 index 000000000..bea6bf448 --- /dev/null +++ b/core/src/p2p/pairing/initial_sync.rs @@ -0,0 +1,368 @@ +use sd_prisma::prisma::*; + +// TODO: Turn this entire file into a Prisma generator cause it could be way more maintainable + +// Pairing will fail if the two clients aren't on versions with identical DB models so it's safe to send them and ignore migrations. + +const ITEMS_PER_BATCH: i64 = 1000; + +macro_rules! impl_for_models { + ($($variant:ident($model:ident)),* $(,)+) => { + /// Represents any DB model to be ingested into the database as part of the initial sync + #[derive(Debug, serde::Serialize, serde::Deserialize)] + pub enum ModelData { + $( + $variant(Vec<$model::Data>), + )* + } + + impl ModelData { + /// Length of data + pub fn len(&self) -> usize { + match self { + $( + Self::$variant(data) => data.len(), + )* + } + } + + /// Get count of all of the rows in the database + pub async fn total_count(db: &PrismaClient) -> Result { + let mut total_count = 0; + + let ($( $model ),*) = tokio::join!( + $( + db.$model().count(vec![]).exec(), + )* + ); + + $(total_count += $model?;)* + Ok(total_count) + } + + /// Insert the data into the database + pub async fn insert(self, db: &PrismaClient) -> Result<(), prisma_client_rust::QueryError> { + match self { + $( + Self::$variant(data) => { + // TODO: Prisma Client Rust is broke + // db.$model().create_many(data.into_iter().map(|v| FromData(v).into()).collect()).exec().await?; + + for i in data { + $model::CreateUnchecked::from(FromData(i)).to_query(db).exec().await?; + } + } + )* + } + + Ok(()) + } + } + + /// This exists to determine the next model to sync. + /// It emulates `.window()` functionality but for a `macro_rules` + // TODO: When replacing with a generator this can be removed and done at compile time + #[derive(Debug)] + enum ModelSyncCursorIterator { + Done = 0, + $( + $variant, + )* + } + + impl<'a> From<&'a ModelSyncCursor> for ModelSyncCursorIterator { + fn from(cursor: &'a ModelSyncCursor) -> Self { + match cursor { + $( + ModelSyncCursor::$variant(_) => Self::$variant, + )* + ModelSyncCursor::Done => Self::Done, + } + } + } + + impl ModelSyncCursorIterator { + pub fn next(self) -> ModelSyncCursor { + let i = self as i32; + match i + 1 { + $( + v if v == Self::$variant as i32 => ModelSyncCursor::$variant(0), + )* + _ => ModelSyncCursor::Done, + } + } + } + + /// Represent where we ar eup to with the sync + #[derive(Debug, serde::Serialize, serde::Deserialize)] + pub enum ModelSyncCursor { + $( + $variant(i64), + )* + Done, + } + + impl ModelSyncCursor { + pub fn new() -> Self { + new_impl!($( $variant ),*) + } + + pub async fn next(&mut self, db: &PrismaClient) -> Option> { + match self { + $( + Self::$variant(cursor) => { + match db.$model() + .find_many(vec![]) + .skip(*cursor) + .take(ITEMS_PER_BATCH + 1) + .exec() + .await { + Ok(data) => { + if data.len() <= ITEMS_PER_BATCH as usize { + *self = ModelSyncCursorIterator::from(&*self).next(); + } else { + *self = Self::$variant(*cursor + ITEMS_PER_BATCH); + } + + Some(Ok(ModelData::$variant(data))) + }, + Err(e) => return Some(Err(e)), + } + }, + )* + Self::Done => None + } + } + } + }; +} + +macro_rules! new_impl { + ($x:ident, $($y:ident),+) => { + Self::$x(0) + }; +} + +impl PartialEq for ModelData { + // Crude EQ impl based only on ID's not struct content. + // It's super annoying PCR does have this impl but it kinda makes sense with relation fetching. + fn eq(&self, other: &Self) -> bool { + match (self, other) { + (ModelData::SharedOperation(a), ModelData::SharedOperation(b)) => a + .iter() + .map(|x| x.id.clone()) + .eq(b.iter().map(|x| x.id.clone())), + (ModelData::Volume(a), ModelData::Volume(b)) => { + a.iter().map(|x| x.id).eq(b.iter().map(|x| x.id)) + } + (ModelData::Location(a), ModelData::Location(b)) => { + a.iter().map(|x| x.id).eq(b.iter().map(|x| x.id)) + } + (ModelData::FilePath(a), ModelData::FilePath(b)) => { + a.iter().map(|x| x.id).eq(b.iter().map(|x| x.id)) + } + (ModelData::Object(a), ModelData::Object(b)) => { + a.iter().map(|x| x.id).eq(b.iter().map(|x| x.id)) + } + (ModelData::Tag(a), ModelData::Tag(b)) => { + a.iter().map(|x| x.id).eq(b.iter().map(|x| x.id)) + } + (ModelData::TagOnObject(a), ModelData::TagOnObject(b)) => a + .iter() + .map(|x| (x.tag_id, x.object_id)) + .eq(b.iter().map(|x| (x.tag_id, x.object_id))), + (ModelData::IndexerRule(a), ModelData::IndexerRule(b)) => { + a.iter().map(|x| x.id).eq(b.iter().map(|x| x.id)) + } + (ModelData::IndexerRulesInLocation(a), ModelData::IndexerRulesInLocation(b)) => a + .iter() + .map(|x| (x.location_id, x.indexer_rule_id)) + .eq(b.iter().map(|x| (x.location_id, x.indexer_rule_id))), + (ModelData::Preference(a), ModelData::Preference(b)) => a + .iter() + .map(|x| (x.key.clone(), x.value.clone())) + .eq(b.iter().map(|x| (x.key.clone(), x.value.clone()))), + _ => false, + } + } +} + +/// Meaningless wrapper to avoid Rust's orphan rule +struct FromData(T); + +impl From> for shared_operation::CreateUnchecked { + fn from(FromData(data): FromData) -> Self { + Self { + id: data.id, + timestamp: data.timestamp, + model: data.model, + record_id: data.record_id, + kind: data.kind, + data: data.data, + instance_id: data.instance_id, + _params: vec![], + } + } +} + +impl From> for volume::CreateUnchecked { + fn from(FromData(data): FromData) -> Self { + Self { + name: data.name, + mount_point: data.mount_point, + _params: vec![ + volume::id::set(data.id), + volume::total_bytes_capacity::set(data.total_bytes_capacity), + volume::total_bytes_available::set(data.total_bytes_available), + volume::disk_type::set(data.disk_type), + volume::filesystem::set(data.filesystem), + volume::is_system::set(data.is_system), + volume::date_modified::set(data.date_modified), + ], + } + } +} + +impl From> for location::CreateUnchecked { + fn from(FromData(data): FromData) -> Self { + Self { + pub_id: data.pub_id, + _params: vec![ + location::id::set(data.id), + location::name::set(data.name), + location::path::set(data.path), + location::total_capacity::set(data.total_capacity), + location::available_capacity::set(data.available_capacity), + location::is_archived::set(data.is_archived), + location::generate_preview_media::set(data.generate_preview_media), + location::sync_preview_media::set(data.sync_preview_media), + location::hidden::set(data.hidden), + location::date_created::set(data.date_created), + location::instance_id::set(data.instance_id), + ], + } + } +} + +impl From> for file_path::CreateUnchecked { + fn from(FromData(data): FromData) -> Self { + Self { + pub_id: data.pub_id, + _params: vec![ + file_path::id::set(data.id), + file_path::is_dir::set(data.is_dir), + file_path::cas_id::set(data.cas_id), + file_path::integrity_checksum::set(data.integrity_checksum), + file_path::location_id::set(data.location_id), + file_path::materialized_path::set(data.materialized_path), + file_path::name::set(data.name), + file_path::extension::set(data.extension), + file_path::size_in_bytes::set(data.size_in_bytes), + file_path::size_in_bytes_bytes::set(data.size_in_bytes_bytes), + file_path::inode::set(data.inode), + file_path::device::set(data.device), + file_path::object_id::set(data.object_id), + file_path::key_id::set(data.key_id), + file_path::date_created::set(data.date_created), + file_path::date_modified::set(data.date_modified), + file_path::date_indexed::set(data.date_indexed), + ], + } + } +} + +impl From> for object::CreateUnchecked { + fn from(FromData(data): FromData) -> Self { + Self { + pub_id: data.pub_id, + _params: vec![ + object::id::set(data.id), + object::kind::set(data.kind), + object::key_id::set(data.key_id), + object::hidden::set(data.hidden), + object::favorite::set(data.favorite), + object::important::set(data.important), + object::note::set(data.note), + object::date_created::set(data.date_created), + object::date_accessed::set(data.date_accessed), + ], + } + } +} + +impl From> for tag::CreateUnchecked { + fn from(FromData(data): FromData) -> Self { + Self { + pub_id: data.pub_id, + _params: vec![ + tag::id::set(data.id), + tag::name::set(data.name), + tag::color::set(data.color), + tag::redundancy_goal::set(data.redundancy_goal), + tag::date_created::set(data.date_created), + tag::date_modified::set(data.date_modified), + ], + } + } +} + +impl From> for tag_on_object::CreateUnchecked { + fn from(FromData(data): FromData) -> Self { + Self { + tag_id: data.tag_id, + object_id: data.object_id, + _params: vec![], + } + } +} + +impl From> for indexer_rule::CreateUnchecked { + fn from(FromData(data): FromData) -> Self { + Self { + pub_id: data.pub_id, + _params: vec![ + indexer_rule::id::set(data.id), + indexer_rule::name::set(data.name), + indexer_rule::default::set(data.default), + indexer_rule::rules_per_kind::set(data.rules_per_kind), + indexer_rule::date_created::set(data.date_created), + indexer_rule::date_modified::set(data.date_modified), + ], + } + } +} + +impl From> + for indexer_rules_in_location::CreateUnchecked +{ + fn from(FromData(data): FromData) -> Self { + Self { + location_id: data.location_id, + indexer_rule_id: data.indexer_rule_id, + _params: vec![], + } + } +} + +impl From> for preference::CreateUnchecked { + fn from(FromData(data): FromData) -> Self { + Self { + key: data.key, + _params: vec![preference::value::set(data.value)], + } + } +} + +// Ensure you order the models to Foreign Keys are created before the models that reference them. +impl_for_models! { + Object(object), + SharedOperation(shared_operation), + Volume(volume), + Location(location), + FilePath(file_path), + Tag(tag), + TagOnObject(tag_on_object), + IndexerRule(indexer_rule), + IndexerRulesInLocation(indexer_rules_in_location), + Preference(preference), +} diff --git a/core/src/p2p/pairing/mod.rs b/core/src/p2p/pairing/mod.rs index 5df0c3736..5bcaca8b8 100644 --- a/core/src/p2p/pairing/mod.rs +++ b/core/src/p2p/pairing/mod.rs @@ -16,11 +16,13 @@ use tokio::{ io::{AsyncRead, AsyncWrite, AsyncWriteExt}, sync::broadcast, }; -use tracing::info; +use tracing::{debug, info}; use uuid::Uuid; +mod initial_sync; mod proto; +pub use initial_sync::*; use proto::*; use crate::{ @@ -117,6 +119,21 @@ impl PairingManager { // TODO: Future - Library in pairing state // TODO: Create library + if self + .library_manager + .get_all_libraries() + .await + .into_iter() + .find(|i| i.id == library_id) + .is_some() + { + self.emit_progress(pairing_id, PairingStatus::LibraryAlreadyExists); + + // TODO: Properly handle this at a protocol level so the error is on both sides + + return; + } + let library_config = self .library_manager .create_with_uuid( @@ -124,6 +141,7 @@ impl PairingManager { LibraryName::new(library_name).unwrap(), library_description, node_config, + false, // We will sync everything which will conflict with the seeded stuff ) .await .unwrap(); @@ -144,12 +162,36 @@ impl PairingManager { // 3. // TODO: Either rollback or update library out of pairing state - // TODO: Fake initial sync + // TODO: This should timeout if taking too long so it can't be used as a DOS style thing??? + let mut total = 0; + let mut synced = 0; + loop { + match SyncData::from_stream(&mut stream).await.unwrap() { + SyncData::Data { total_models, data } => { + if let Some(total_models) = total_models { + total = total_models; + } + synced += data.len(); + + data.insert(&library.db).await.unwrap(); + + // Prevent divide by zero + if total != 0 { + self.emit_progress( + pairing_id, + PairingStatus::InitialSyncProgress( + ((synced as f32 / total as f32) * 100.0) as u8, + ), + ); + } + } + SyncData::Finished => break, + } + } // TODO: Done message to frontend self.emit_progress(pairing_id, PairingStatus::PairingComplete(library_id)); - - tokio::time::sleep(std::time::Duration::from_secs(30)).await; // TODO + stream.flush().await.unwrap(); } PairingResponse::Rejected => { info!("Pairing '{pairing_id}' rejected by remote"); @@ -232,12 +274,41 @@ impl PairingManager { // TODO: Pairing confirmation + rollback + let total = ModelData::total_count(&library.db).await.unwrap(); + let mut synced = 0; + info!("Starting sync of {} rows", total); + + let mut cursor = ModelSyncCursor::new(); + while let Some(data) = cursor.next(&library.db).await { + let data = data.unwrap(); + let total_models = match synced { + 0 => Some(total), + _ => None, + }; + synced += data.len(); + self.emit_progress( + pairing_id, + PairingStatus::InitialSyncProgress((synced as f32 / total as f32 * 100.0) as u8), // SAFETY: It's a percentage + ); + debug!( + "Initial library sync cursor={:?} items={}", + cursor, + data.len() + ); + + stream + .write_all(&SyncData::Data { total_models, data }.to_bytes().unwrap()) + .await + .unwrap(); + } + + stream + .write_all(&SyncData::Finished.to_bytes().unwrap()) + .await + .unwrap(); + + self.emit_progress(pairing_id, PairingStatus::PairingComplete(library_id)); stream.flush().await.unwrap(); - tokio::time::sleep(std::time::Duration::from_secs(30)).await; // TODO - - // }; - - // inner().await.unwrap(); } } @@ -253,6 +324,7 @@ pub enum PairingDecision { pub enum PairingStatus { EstablishingConnection, PairingRequested, + LibraryAlreadyExists, PairingDecisionRequest, PairingInProgress { library_name: String, diff --git a/core/src/p2p/pairing/proto.rs b/core/src/p2p/pairing/proto.rs index c86930a7e..2019e6092 100644 --- a/core/src/p2p/pairing/proto.rs +++ b/core/src/p2p/pairing/proto.rs @@ -5,12 +5,14 @@ use sd_p2p::{ proto::{decode, encode}, spacetunnel::Identity, }; -use sd_prisma::prisma::instance; +use sd_prisma::prisma::*; use tokio::io::{AsyncRead, AsyncReadExt}; use uuid::Uuid; use crate::node::Platform; +use super::ModelData; + /// Terminology: /// Instance - DB model which represents a single `.db` file. /// Originator - begins the pairing process and is asking to join a library that will be selected by the responder. @@ -76,6 +78,19 @@ pub enum PairingConfirmation { Error, } +/// 4. Sync the data in the database with the originator. +/// Sent `Responder` -> `Originator`. +#[derive(Debug, PartialEq)] +pub enum SyncData { + Data { + /// Only included in first request and is an **estimate** of how many models will be sent. + /// It will likely be wrong so should be constrained to being used for UI purposes only. + total_models: Option, + data: ModelData, + }, + Finished, +} + impl Instance { pub async fn from_stream( stream: &mut (impl AsyncRead + Unpin), @@ -211,9 +226,7 @@ impl PairingConfirmation { match stream.read_u8().await.unwrap() { 0 => Ok(Self::Ok), 1 => Ok(Self::Error), - _ => { - todo!(); - } + _ => todo!(), // TODO: Error handling } } @@ -225,6 +238,52 @@ impl PairingConfirmation { } } +impl SyncData { + pub async fn from_stream( + stream: &mut (impl AsyncRead + Unpin), + ) -> Result { + let discriminator = stream + .read_u8() + .await + .map_err(|e| ("discriminator", e.into()))?; + + match discriminator { + 0 => Ok(Self::Data { + total_models: match stream + .read_i64_le() + .await + .map_err(|e| ("total_models", e.into()))? + { + 0 => None, + n => Some(n), + }, + data: rmp_serde::from_slice( + &decode::buf(stream).await.map_err(|e| ("data", e.into()))?, + ) + .unwrap(), // TODO: Error handling + }), + 1 => Ok(Self::Finished), + _ => todo!(), // TODO: Error handling + } + } + + pub fn to_bytes(&self) -> Result, rmp_serde::encode::Error> { + let mut buf = Vec::new(); + match self { + Self::Data { total_models, data } => { + buf.push(0); + buf.extend((total_models.unwrap_or(0) as i64).to_le_bytes()); + encode::buf(&mut buf, &rmp_serde::to_vec_named(data)?); + } + Self::Finished => { + buf.push(1); + } + } + + Ok(buf) + } +} + #[cfg(test)] mod tests { use super::*; @@ -298,5 +357,24 @@ mod tests { let result = PairingConfirmation::from_stream(&mut cursor).await.unwrap(); assert_eq!(original, result); } + + { + let original = SyncData::Data { + total_models: Some(123), + data: ModelData::Location(vec![]), + }; + + let mut cursor = std::io::Cursor::new(original.to_bytes().unwrap()); + let result = SyncData::from_stream(&mut cursor).await.unwrap(); + assert_eq!(original, result); + } + + { + let original = SyncData::Finished; + + let mut cursor = std::io::Cursor::new(original.to_bytes().unwrap()); + let result = SyncData::from_stream(&mut cursor).await.unwrap(); + assert_eq!(original, result); + } } } diff --git a/core/src/util/debug_initializer.rs b/core/src/util/debug_initializer.rs index 337d27aae..378e44da1 100644 --- a/core/src/util/debug_initializer.rs +++ b/core/src/util/debug_initializer.rs @@ -114,7 +114,7 @@ impl InitConfig { Some(lib) => lib, None => { let library = library_manager - .create_with_uuid(lib.id, lib.name, lib.description, node_cfg.clone()) + .create_with_uuid(lib.id, lib.name, lib.description, node_cfg.clone(), true) .await?; match library_manager.get_library(library.uuid).await { diff --git a/interface/app/$libraryId/settings/library/nodes.tsx b/interface/app/$libraryId/settings/library/nodes.tsx index 74da6a66a..afb37e583 100644 --- a/interface/app/$libraryId/settings/library/nodes.tsx +++ b/interface/app/$libraryId/settings/library/nodes.tsx @@ -1,4 +1,4 @@ -import { useBridgeMutation, useDiscoveredPeers, useFeatureFlag } from '@sd/client'; +import { isEnabled, useBridgeMutation, useDiscoveredPeers, useFeatureFlag } from '@sd/client'; import { Button } from '@sd/ui'; import { startPairing } from '~/app/p2p/pairing'; import { Heading } from '../Layout'; diff --git a/interface/app/$libraryId/settings/resources/about.tsx b/interface/app/$libraryId/settings/resources/about.tsx index f469ad50c..f9b90ec5d 100644 --- a/interface/app/$libraryId/settings/resources/about.tsx +++ b/interface/app/$libraryId/settings/resources/about.tsx @@ -1,8 +1,7 @@ import { AppLogo } from '@sd/assets/images'; import { Discord, Github } from '@sd/assets/svgs/brands'; import { Globe } from 'phosphor-react'; -import { useEffect, useState } from 'react'; -import { getDebugState, useBridgeQuery } from '@sd/client'; +import { useBridgeQuery, useDebugStateEnabler } from '@sd/client'; import { Button, Divider } from '@sd/ui'; import { useOperatingSystem } from '~/hooks/useOperatingSystem'; import { usePlatform } from '~/util/Platform'; @@ -13,18 +12,7 @@ export const Component = () => { const os = useOperatingSystem(); const currentPlatformNiceName = os === 'browser' ? 'Web' : os == 'macOS' ? os : os.charAt(0).toUpperCase() + os.slice(1); - - const [clicked, setClicked] = useState(0); - - useEffect(() => { - if (clicked >= 5) { - getDebugState().enabled = true; - } - - const timeout = setTimeout(() => setClicked(0), 1000); - - return () => clearTimeout(timeout); - }, [clicked]); + const onClick = useDebugStateEnabler(); return (
@@ -33,7 +21,7 @@ export const Component = () => { src={AppLogo} className="mr-8 h-[88px] w-[88px]" draggable="false" - onClick={() => setClicked((clicked) => clicked + 1)} + onClick={onClick} />

diff --git a/interface/app/p2p/index.tsx b/interface/app/p2p/index.tsx index cf3153e21..ea4a0ae49 100644 --- a/interface/app/p2p/index.tsx +++ b/interface/app/p2p/index.tsx @@ -1,4 +1,4 @@ -import { useOnFeatureFlagsChange, useP2PEvents, withFeatureFlag } from '@sd/client'; +import { useFeatureFlag, useP2PEvents, withFeatureFlag } from '@sd/client'; import { SpacedropUI } from './Spacedrop'; import { startPairing } from './pairing'; @@ -6,8 +6,9 @@ export const SpacedropUI2 = withFeatureFlag('spacedrop', SpacedropUI); // Entrypoint of P2P UI export function P2P() { + const pairingEnabled = useFeatureFlag('p2pPairing'); useP2PEvents((data) => { - if (data.type === 'PairingRequest') { + if (data.type === 'PairingRequest' && pairingEnabled) { startPairing(data.id, { name: data.name, os: data.os diff --git a/interface/app/p2p/pairing.tsx b/interface/app/p2p/pairing.tsx index c32572b07..d890a8252 100644 --- a/interface/app/p2p/pairing.tsx +++ b/interface/app/p2p/pairing.tsx @@ -60,6 +60,9 @@ function OriginatorDialog({ .with({ type: 'PairingRequested' }, () => ( )) + .with({ type: 'LibraryAlreadyExists' }, () => ( + + )) .with({ type: 'PairingDecisionRequest' }, () => ( )) diff --git a/packages/client/src/core.ts b/packages/client/src/core.ts index de284e4c4..f4b80f72c 100644 --- a/packages/client/src/core.ts +++ b/packages/client/src/core.ts @@ -257,7 +257,7 @@ export type P2PEvent = { type: "DiscoveredPeer"; peer_id: PeerId; metadata: Peer export type PairingDecision = { decision: "accept"; libraryId: string } | { decision: "reject" } -export type PairingStatus = { type: "EstablishingConnection" } | { type: "PairingRequested" } | { type: "PairingDecisionRequest" } | { type: "PairingInProgress"; data: { library_name: string; library_description: string | null } } | { type: "InitialSyncProgress"; data: number } | { type: "PairingComplete"; data: string } | { type: "PairingRejected" } +export type PairingStatus = { type: "EstablishingConnection" } | { type: "PairingRequested" } | { type: "LibraryAlreadyExists" } | { type: "PairingDecisionRequest" } | { type: "PairingInProgress"; data: { library_name: string; library_description: string | null } } | { type: "InitialSyncProgress"; data: number } | { type: "PairingComplete"; data: string } | { type: "PairingRejected" } export type PeerId = string diff --git a/packages/client/src/hooks/useDebugState.ts b/packages/client/src/hooks/useDebugState.ts index 6d3eb4586..ae473b4a5 100644 --- a/packages/client/src/hooks/useDebugState.ts +++ b/packages/client/src/hooks/useDebugState.ts @@ -1,3 +1,4 @@ +import { useEffect, useState } from 'react'; import { useSnapshot } from 'valtio'; import { valtioPersist } from '../lib/valito'; @@ -24,3 +25,19 @@ export function useDebugState() { export function getDebugState() { return debugState; } + +export function useDebugStateEnabler(): () => void { + const [clicked, setClicked] = useState(0); + + useEffect(() => { + if (clicked >= 5) { + getDebugState().enabled = true; + } + + const timeout = setTimeout(() => setClicked(0), 1000); + + return () => clearTimeout(timeout); + }, [clicked]); + + return () => setClicked((c) => c + 1); +} diff --git a/packages/client/src/hooks/useP2PEvents.tsx b/packages/client/src/hooks/useP2PEvents.tsx index 6e3142db0..9be18c382 100644 --- a/packages/client/src/hooks/useP2PEvents.tsx +++ b/packages/client/src/hooks/useP2PEvents.tsx @@ -25,7 +25,7 @@ export function P2PContextProvider({ children }: PropsWithChildren) { useBridgeSubscription(['p2p.events'], { onData(data) { - events.current.dispatchEvent(new CustomEvent('p2p-event', { detail: data })); + events.current.dispatchEvent(new CustomEvent('p2p-event', { detail: data })); if (data.type === 'DiscoveredPeer') { setDiscoveredPeer([discoveredPeers.set(data.peer_id, data.metadata)]); diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index 6e0030ae1a2e48c069812f16719fc7a6289ff76a..6dd09caee6cb951f9f1afffb48976291d4e77525 100644 GIT binary patch delta 226 zcmezK-TeDc^9`regvwI$N_0ySi_%j|bPMuxD$_D22ZT#88cY_{7UMI}Gv-nNgVc(G z{K<(rV$F}$+8?Vi0x=U1GXpUT5VHa?8xXT^f2_vwPk;+zP)?46!SsdJ>{=nd$!4LY zWw}Y2c}33ICP7v1m0`I-m7c|pl@-20IRz%pg~3J1fkFNjE|mt3mN_X&nL(AN*(oJX zStkA^0p_mGIqoTi?rENd`IcU}{?0kR#qK`S59)JBwqG^p0Afxc<^p1FAm-VA)tp!T F2mm?HQ!D@g delta 72 zcmezV)BMkO^9`renkCfRCDa*#m