From 94ca18925ddfddffa018d9ce632a59f9fbf85edd Mon Sep 17 00:00:00 2001 From: Brendan Allan Date: Tue, 26 Mar 2024 15:26:37 +0800 Subject: [PATCH] [ENG-1691] Sync status subscription (#2246) * sync status subscription * clippy --- core/crates/sync/src/ingest.rs | 13 +++- core/crates/sync/src/lib.rs | 2 + core/crates/sync/src/manager.rs | 4 +- core/src/api/cloud.rs | 4 +- core/src/api/sync.rs | 27 +++++++-- core/src/cloud/mod.rs | 32 ++++++++++ core/src/cloud/sync/ingest.rs | 18 +++++- core/src/cloud/sync/mod.rs | 59 ++++++++++++++----- core/src/cloud/sync/receive.rs | 47 +++++++-------- core/src/cloud/sync/send.rs | 18 +++++- core/src/library/library.rs | 10 +++- core/src/library/manager/mod.rs | 26 ++++++-- .../Layout/Sidebar/SidebarLayout/Footer.tsx | 14 ++++- packages/client/src/core.ts | 1 + 14 files changed, 218 insertions(+), 57 deletions(-) diff --git a/core/crates/sync/src/ingest.rs b/core/crates/sync/src/ingest.rs index 0be7b5070..c5f9462b8 100644 --- a/core/crates/sync/src/ingest.rs +++ b/core/crates/sync/src/ingest.rs @@ -1,4 +1,7 @@ -use std::{ops::Deref, sync::Arc}; +use std::{ + ops::Deref, + sync::{atomic::Ordering, Arc}, +}; use sd_prisma::{ prisma::{crdt_operation, SortOrder}, @@ -61,8 +64,14 @@ impl Actor { async fn tick(mut self) -> Option { let state = match self.state.take()? { State::WaitingForNotification => { + self.shared.active.store(false, Ordering::Relaxed); + self.shared.active_notify.notify_waiters(); + wait!(self.io.event_rx, Event::Notification); + self.shared.active.store(true, Ordering::Relaxed); + self.shared.active_notify.notify_waiters(); + State::RetrievingMessages } State::RetrievingMessages => { @@ -270,6 +279,8 @@ mod test { clock: HLCBuilder::new().with_id(instance.into()).build(), timestamps: Default::default(), emit_messages_flag: Arc::new(AtomicBool::new(true)), + active: Default::default(), + active_notify: Default::default(), }); (Actor::spawn(shared.clone()), shared) diff --git a/core/crates/sync/src/lib.rs b/core/crates/sync/src/lib.rs index fcde66b79..d6ed9e0c1 100644 --- a/core/crates/sync/src/lib.rs +++ b/core/crates/sync/src/lib.rs @@ -32,6 +32,8 @@ pub struct SharedState { pub instance: uuid::Uuid, pub timestamps: Timestamps, pub clock: uhlc::HLC, + pub active: AtomicBool, + pub active_notify: tokio::sync::Notify, } #[must_use] diff --git a/core/crates/sync/src/manager.rs b/core/crates/sync/src/manager.rs index 0298a456f..72b54a618 100644 --- a/core/crates/sync/src/manager.rs +++ b/core/crates/sync/src/manager.rs @@ -22,7 +22,7 @@ use uuid::Uuid; pub struct Manager { pub tx: broadcast::Sender, pub ingest: ingest::Handler, - shared: Arc, + pub shared: Arc, } #[derive(serde::Serialize, serde::Deserialize, Debug, PartialEq, Eq)] @@ -54,6 +54,8 @@ impl Manager { clock, timestamps: Arc::new(RwLock::new(timestamps)), emit_messages_flag: emit_messages_flag.clone(), + active: Default::default(), + active_notify: Default::default(), }); let ingest = ingest::Actor::spawn(shared.clone()); diff --git a/core/src/api/cloud.rs b/core/src/api/cloud.rs index 799212ba1..ec9df0abe 100644 --- a/core/src/api/cloud.rs +++ b/core/src/api/cloud.rs @@ -145,7 +145,9 @@ mod library { for instance in instances { crate::cloud::sync::receive::upsert_instance( - &library, + library.id, + &library.db, + &library.sync, &node.libraries, instance.uuid, instance.identity, diff --git a/core/src/api/sync.rs b/core/src/api/sync.rs index 3cca88117..f80c6b7cf 100644 --- a/core/src/api/sync.rs +++ b/core/src/api/sync.rs @@ -1,8 +1,6 @@ -use std::sync::atomic::Ordering; - -use sd_core_sync::GetOpsArgs; - use rspc::alpha::AlphaRouter; +use sd_core_sync::GetOpsArgs; +use std::sync::atomic::Ordering; use crate::util::MaybeUndefined; @@ -77,4 +75,25 @@ pub(crate) fn mount() -> AlphaRouter { .load(Ordering::Relaxed)) }) }) + .procedure("active", { + R.with2(library()) + .subscription(|(_, library), _: ()| async move { + async_stream::stream! { + let cloud_sync = &library.cloud.sync; + let sync = &library.sync.shared; + + loop { + yield sync.active.load(Ordering::Relaxed) + || cloud_sync.send_active.load(Ordering::Relaxed) + || cloud_sync.receive_active.load(Ordering::Relaxed) + || cloud_sync.ingest_active.load(Ordering::Relaxed); + + tokio::select! { + _ = cloud_sync.notifier.notified() => {}, + _ = sync.active_notify.notified() => {} + } + } + } + }) + }) } diff --git a/core/src/cloud/mod.rs b/core/src/cloud/mod.rs index d086d5bd6..529c18cd1 100644 --- a/core/src/cloud/mod.rs +++ b/core/src/cloud/mod.rs @@ -1 +1,33 @@ +use std::sync::Arc; + +use uuid::Uuid; + +use crate::Node; + pub mod sync; + +#[derive(Default)] +pub struct State { + pub sync: sync::State, +} + +pub async fn start( + node: &Arc, + actors: &Arc, + library_id: Uuid, + instance_uuid: Uuid, + sync: &Arc, + db: &Arc, +) -> State { + let sync = sync::declare_actors( + node, + actors, + library_id, + instance_uuid, + sync.clone(), + db.clone(), + ) + .await; + + State { sync } +} diff --git a/core/src/cloud/sync/ingest.rs b/core/src/cloud/sync/ingest.rs index 08220816c..8c489f0f7 100644 --- a/core/src/cloud/sync/ingest.rs +++ b/core/src/cloud/sync/ingest.rs @@ -1,4 +1,7 @@ -use std::sync::Arc; +use std::sync::{ + atomic::{AtomicBool, Ordering}, + Arc, +}; use tokio::sync::Notify; use tracing::debug; @@ -7,8 +10,16 @@ use crate::cloud::sync::err_break; // Responsible for taking sync operations received from the cloud, // and applying them to the local database via the sync system's ingest actor. -pub async fn run_actor(sync: Arc, notify: Arc) { +pub async fn run_actor( + sync: Arc, + notify: Arc, + state: Arc, + state_notify: Arc, +) { loop { + state.store(true, Ordering::Relaxed); + state_notify.notify_waiters(); + { let mut rx = sync.ingest.req_rx.lock().await; @@ -65,6 +76,9 @@ pub async fn run_actor(sync: Arc, notify: Arc) { } } + state.store(false, Ordering::Relaxed); + state_notify.notify_waiters(); + notify.notified().await; } } diff --git a/core/src/cloud/sync/mod.rs b/core/src/cloud/sync/mod.rs index 570697049..5b12e6578 100644 --- a/core/src/cloud/sync/mod.rs +++ b/core/src/cloud/sync/mod.rs @@ -1,16 +1,35 @@ use sd_sync::*; -use std::sync::{atomic, Arc}; +use std::sync::{ + atomic::{self, AtomicBool}, + Arc, +}; use tokio::sync::Notify; +use uuid::Uuid; -use crate::{library::Library, Node}; +use crate::Node; pub mod ingest; pub mod receive; pub mod send; -pub async fn declare_actors(library: &Arc, node: &Arc) { +#[derive(Default)] +pub struct State { + pub send_active: Arc, + pub receive_active: Arc, + pub ingest_active: Arc, + pub notifier: Arc, +} + +pub async fn declare_actors( + node: &Arc, + actors: &Arc, + library_id: Uuid, + instance_uuid: Uuid, + sync: Arc, + db: Arc, +) -> State { let ingest_notify = Arc::new(Notify::new()); - let actors = &library.actors; + let state = State::default(); let autorun = node.cloud_sync_flag.load(atomic::Ordering::Relaxed); @@ -18,10 +37,12 @@ pub async fn declare_actors(library: &Arc, node: &Arc) { .declare( "Cloud Sync Sender", { - let library = library.clone(); + let sync = sync.clone(); let node = node.clone(); + let active = state.send_active.clone(); + let active_notifier = state.notifier.clone(); - move || send::run_actor(library.id, library.sync.clone(), node.clone()) + move || send::run_actor(library_id, sync, node, active, active_notifier) }, autorun, ) @@ -31,21 +52,23 @@ pub async fn declare_actors(library: &Arc, node: &Arc) { .declare( "Cloud Sync Receiver", { - let library = library.clone(); + let sync = sync.clone(); let node = node.clone(); let ingest_notify = ingest_notify.clone(); + let active_notifier = state.notifier.clone(); + let active = state.receive_active.clone(); move || { receive::run_actor( - library.clone(), node.libraries.clone(), - library.db.clone(), - library.id, - library.instance_uuid, - library.sync.clone(), - node.clone(), + db.clone(), + library_id, + instance_uuid, + sync, ingest_notify, - node.clone(), + node, + active, + active_notifier, ) } }, @@ -57,12 +80,16 @@ pub async fn declare_actors(library: &Arc, node: &Arc) { .declare( "Cloud Sync Ingest", { - let library = library.clone(); - move || ingest::run_actor(library.sync.clone(), ingest_notify) + let active = state.ingest_active.clone(); + let active_notifier = state.notifier.clone(); + + move || ingest::run_actor(sync.clone(), ingest_notify, active, active_notifier) }, autorun, ) .await; + + state } macro_rules! err_break { diff --git a/core/src/cloud/sync/receive.rs b/core/src/cloud/sync/receive.rs index 28b5916c2..a5ff0b8fe 100644 --- a/core/src/cloud/sync/receive.rs +++ b/core/src/cloud/sync/receive.rs @@ -1,11 +1,7 @@ -use crate::{ - library::{Libraries, Library}, - Node, -}; +use crate::{library::Libraries, Node}; use super::{err_break, CompressedCRDTOperations}; use sd_cloud_api::RequestConfigProvider; -use sd_core_sync::NTP64; use sd_p2p::RemoteIdentity; use sd_prisma::prisma::{cloud_crdt_operation, instance, PrismaClient, SortOrder}; use sd_sync::CRDTOperation; @@ -14,7 +10,10 @@ use tracing::{debug, info}; use std::{ collections::{hash_map::Entry, HashMap}, - sync::Arc, + sync::{ + atomic::{AtomicBool, Ordering}, + Arc, + }, time::Duration, }; @@ -28,17 +27,20 @@ use uuid::Uuid; #[allow(clippy::too_many_arguments)] pub async fn run_actor( - library: Arc, libraries: Arc, db: Arc, library_id: Uuid, instance_uuid: Uuid, sync: Arc, - cloud_api_config_provider: Arc, ingest_notify: Arc, node: Arc, + active: Arc, + active_notify: Arc, ) { loop { + active.store(true, Ordering::Relaxed); + active_notify.notify_waiters(); + loop { // We need to know the lastest operations we should be retrieving let mut cloud_timestamps = { @@ -106,7 +108,7 @@ pub async fn run_actor( let collections = err_break!( sd_cloud_api::library::message_collections::get( - cloud_api_config_provider.get_request_config().await, + node.get_request_config().await, library_id, instance_uuid, instance_timestamps, @@ -128,7 +130,7 @@ pub async fn run_actor( None => { let Some(fetched_library) = err_break!( sd_cloud_api::library::get( - cloud_api_config_provider.get_request_config().await, + node.get_request_config().await, library_id ) .await @@ -157,7 +159,9 @@ pub async fn run_actor( err_break!( upsert_instance( - &library, + library_id, + &db, + &sync, &libraries, collection.instance_uuid, instance.identity, @@ -200,6 +204,9 @@ pub async fn run_actor( ingest_notify.notify_waiters(); } + active.store(false, Ordering::Relaxed); + active_notify.notify_waiters(); + sleep(Duration::from_secs(60)).await; } } @@ -227,16 +234,16 @@ fn crdt_op_db(op: &CRDTOperation) -> cloud_crdt_operation::Create { } pub async fn upsert_instance( - library: &Arc, + library_id: Uuid, + db: &PrismaClient, + sync: &sd_core_sync::Manager, libraries: &Libraries, uuid: Uuid, identity: RemoteIdentity, node_id: Uuid, metadata: HashMap, ) -> prisma_client_rust::Result<()> { - library - .db - .instance() + db.instance() .upsert( instance::pub_id::equals(uuid_to_bytes(uuid)), instance::create( @@ -254,16 +261,10 @@ pub async fn upsert_instance( .exec() .await?; - library - .sync - .timestamps - .write() - .await - .entry(uuid) - .or_default(); + sync.timestamps.write().await.entry(uuid).or_default(); // Called again so the new instances are picked up - libraries.update_instances(library.clone()).await; + libraries.update_instances_by_id(library_id).await; Ok(()) } diff --git a/core/src/cloud/sync/send.rs b/core/src/cloud/sync/send.rs index 92f9ff24c..120211fb6 100644 --- a/core/src/cloud/sync/send.rs +++ b/core/src/cloud/sync/send.rs @@ -5,9 +5,15 @@ use sd_core_sync::{SyncMessage, NTP64}; use tracing::debug; use uuid::Uuid; -use std::{sync::Arc, time::Duration}; +use std::{ + sync::{ + atomic::{AtomicBool, Ordering}, + Arc, + }, + time::Duration, +}; -use tokio::time::sleep; +use tokio::{sync::Notify, time::sleep}; use super::err_break; @@ -17,8 +23,13 @@ pub async fn run_actor( library_id: Uuid, sync: Arc, cloud_api_config_provider: Arc, + state: Arc, + state_notify: Arc, ) { loop { + state.store(true, Ordering::Relaxed); + state_notify.notify_waiters(); + loop { // all available instances will have a default timestamp from create_instance let instances = sync @@ -109,6 +120,9 @@ pub async fn run_actor( ); } + state.store(false, Ordering::Relaxed); + state_notify.notify_waiters(); + { // recreate subscription each time so that existing messages are dropped let mut rx = sync.subscribe(); diff --git a/core/src/library/library.rs b/core/src/library/library.rs index 93744b6dc..96ee7606b 100644 --- a/core/src/library/library.rs +++ b/core/src/library/library.rs @@ -1,4 +1,6 @@ -use crate::{api::CoreEvent, object::media::old_thumbnail::get_indexed_thumbnail_path, sync, Node}; +use crate::{ + api::CoreEvent, cloud, object::media::old_thumbnail::get_indexed_thumbnail_path, sync, Node, +}; use sd_file_path_helper::{file_path_to_full_path, IsolatedFilePathData}; use sd_p2p::Identity; @@ -35,6 +37,7 @@ pub struct Library { /// db holds the database client for the current library. pub db: Arc, pub sync: Arc, + pub cloud: cloud::State, /// key manager that provides encryption keys to functions that require them // pub key_manager: Arc, /// p2p identity @@ -76,12 +79,15 @@ impl Library { db: Arc, node: &Arc, sync: Arc, + cloud: cloud::State, do_cloud_sync: broadcast::Sender<()>, + actors: Arc, ) -> Arc { Arc::new(Self { id, config: RwLock::new(config), sync, + cloud, db: db.clone(), // key_manager, identity, @@ -90,7 +96,7 @@ impl Library { do_cloud_sync, env: node.env.clone(), event_bus_tx: node.event_bus.0.clone(), - actors: Default::default(), + actors, }) } diff --git a/core/src/library/manager/mod.rs b/core/src/library/manager/mod.rs index 53557e479..797689670 100644 --- a/core/src/library/manager/mod.rs +++ b/core/src/library/manager/mod.rs @@ -489,6 +489,11 @@ impl Libraries { }) .collect() }); + let sync_manager = Arc::new(sync.manager); + + let actors = Default::default(); + + let cloud = crate::cloud::start(node, &actors, id, instance_id, &sync_manager, &db).await; let (tx, mut rx) = broadcast::channel(10); let library = Library::new( @@ -499,16 +504,16 @@ impl Libraries { // key_manager, db, node, - Arc::new(sync.manager), + sync_manager, + cloud, tx, + actors, ) .await; // This is an exception. Generally subscribe to this by `self.tx.subscribe`. tokio::spawn(sync_rx_actor(library.clone(), node.clone(), sync.rx)); - crate::cloud::sync::declare_actors(&library, node).await; - self.tx .emit(LibraryManagerEvent::Load(library.clone())) .await; @@ -611,7 +616,9 @@ impl Libraries { for instance in lib.instances { if let Err(err) = cloud::sync::receive::upsert_instance( - &library, + library.id, + &library.db, + &library.sync, &node.libraries, instance.uuid, instance.identity, @@ -664,6 +671,17 @@ impl Libraries { .emit(LibraryManagerEvent::InstancesModified(library)) .await; } + + pub async fn update_instances_by_id(&self, library_id: Uuid) { + let Some(library) = self.libraries.read().await.get(&library_id).cloned() else { + warn!("Failed to find instance to update by id"); + return; + }; + + self.tx + .emit(LibraryManagerEvent::InstancesModified(library)) + .await; + } } async fn sync_rx_actor( diff --git a/interface/app/$libraryId/Layout/Sidebar/SidebarLayout/Footer.tsx b/interface/app/$libraryId/Layout/Sidebar/SidebarLayout/Footer.tsx index 70300164d..12c3eb608 100644 --- a/interface/app/$libraryId/Layout/Sidebar/SidebarLayout/Footer.tsx +++ b/interface/app/$libraryId/Layout/Sidebar/SidebarLayout/Footer.tsx @@ -1,6 +1,6 @@ import { Gear } from '@phosphor-icons/react'; import { useNavigate } from 'react-router'; -import { JobManagerContextProvider, useClientContext, useDebugState } from '@sd/client'; +import { JobManagerContextProvider, LibraryContextProvider, useClientContext, useDebugState, useLibrarySubscription } from '@sd/client'; import { Button, ButtonLink, Popover, Tooltip, usePopover } from '@sd/ui'; import { useKeysMatcher, useLocale, useShortcut } from '~/hooks'; import { usePlatform } from '~/util/Platform'; @@ -8,6 +8,7 @@ import { usePlatform } from '~/util/Platform'; import DebugPopover from '../DebugPopover'; import { IsRunningJob, JobManager } from '../JobManager'; import FeedbackButton from './FeedbackButton'; +import { useState } from 'react'; export default () => { const { library } = useClientContext(); @@ -44,6 +45,7 @@ export default () => { )} )} + {library && }
{
); }; + +function SyncStatusIndicator() { + const [syncing, setSyncing] = useState(false); + + useLibrarySubscription(["sync.active"], { + onData: setSyncing + }) + + return null +} diff --git a/packages/client/src/core.ts b/packages/client/src/core.ts index 5760eca42..9780efda4 100644 --- a/packages/client/src/core.ts +++ b/packages/client/src/core.ts @@ -136,6 +136,7 @@ export type Procedures = { { key: "notifications.listen", input: never, result: Notification } | { key: "p2p.events", input: never, result: P2PEvent } | { key: "search.ephemeralPaths", input: LibraryArgs, result: EphemeralPathsResultItem } | + { key: "sync.active", input: LibraryArgs, result: boolean } | { key: "sync.newMessage", input: LibraryArgs, result: null } };