From c45f6d7bfa919aad1a2f325ca56e2d8f61bc0ee8 Mon Sep 17 00:00:00 2001 From: Brendan Allan Date: Wed, 31 Jan 2024 16:16:54 +0800 Subject: [PATCH] separate cloud sync into sd-core-cloud-sync (#2025) * separate cloud sync into sd-core-cloud-sync * ci --- .github/actions/setup-rust/action.yaml | 2 +- Cargo.lock | Bin 257101 -> 257488 bytes core/Cargo.toml | 2 + core/crates/cloud-sync/Cargo.toml | 21 ++++++ .../sync => crates/cloud-sync/src}/ingest.rs | 16 ++--- .../mod.rs => crates/cloud-sync/src/lib.rs} | 41 +---------- .../sync => crates/cloud-sync/src}/receive.rs | 33 ++++----- .../sync => crates/cloud-sync/src}/send.rs | 61 +++++++++------- core/src/cloud/sync.rs | 65 ++++++++++++++++++ core/src/lib.rs | 6 ++ core/src/library/library.rs | 4 +- core/src/library/mod.rs | 2 - crates/actors/Cargo.toml | 12 ++++ .../actors.rs => crates/actors/src/lib.rs | 0 crates/cloud-api/src/lib.rs | 6 ++ rust-toolchain.toml | 2 +- 16 files changed, 177 insertions(+), 96 deletions(-) create mode 100644 core/crates/cloud-sync/Cargo.toml rename core/{src/cloud/sync => crates/cloud-sync/src}/ingest.rs (74%) rename core/{src/cloud/sync/mod.rs => crates/cloud-sync/src/lib.rs} (78%) rename core/{src/cloud/sync => crates/cloud-sync/src}/receive.rs (89%) rename core/{src/cloud/sync => crates/cloud-sync/src}/send.rs (62%) create mode 100644 core/src/cloud/sync.rs create mode 100644 crates/actors/Cargo.toml rename core/src/library/actors.rs => crates/actors/src/lib.rs (100%) diff --git a/.github/actions/setup-rust/action.yaml b/.github/actions/setup-rust/action.yaml index 04636be62..53b2222b1 100644 --- a/.github/actions/setup-rust/action.yaml +++ b/.github/actions/setup-rust/action.yaml @@ -16,7 +16,7 @@ runs: uses: dtolnay/rust-toolchain@stable with: target: ${{ inputs.target }} - toolchain: '1.73' + toolchain: '1.75' components: clippy, rustfmt - name: Cache Rust Dependencies diff --git a/Cargo.lock b/Cargo.lock index fd86b7c914eb460dc18c785635ff17e2f9b265a4..d7691869948576a88dde39d0f6d3a567a6b499ee 100644 GIT binary patch delta 1244 zcmY*ZU2GLa6wb`F_ZHexh4x}w=`X*n)!m()of!#MBtaV~x#d9!#)tXa_+Z);&}a;i zgzz9SCb+SqJb(|zgvd|vCJyR@`bbSntAt=w2r(Kp!8RsJBf&VmOZ>?`>^JA^Io~roKe&;bhbmxoj?7fKDMzzk|A?w6`|N5B7`4OC z0GY9CFXrAauJv!MBHIN(oog+EW4$PPFmz}7z}|rjm|cT0#@YmoOC1wVLt|7LCkZnq zFovbEGAg86#wnAUrBo^$aw-yKFcpGoZb3UJxwCt~!4o;r!lEbIc?*1)x92+B-~$a+ z*F-n0vBMi+%fwN%%gwEa-_=y@?qWYYQmB?S62bXmp6B&#u;}E&uyT(b*$&EsCW@m- z=p-hZam9JcQfv~*nBb{4O3+kDBNW4l41*xTG*mH8sA4H+u@Kq#dnG%)1Jv2p2Gr?p zJP9|!oPFI+_*=6TKY)=(cxV(Cl9*I;?1NGgV3Bw^6wxFiA;Cn4f!Bt{{)rUklqyAp zQp_us-FvK*&7WK0rWCySc&$1xwr_t1PB+_k&cIdnl{Z0y-M!UwSKfy^YIC>!V-k|( z>bbG!^}zm@MwRP04_Ctf82*h9N^qGy*^QPJ)U*4>M#pUVG`!!gJP+5|D<8uZ_QDc6 z)P?G8@e|l$pZXJa+LLEt!97%g&8xGy@$w*}UYNurK`H{K5}pQ$Z<6}a2AA;`oa{} zLh%F}CH+Kj5=-H0(t@khllxJpCZ@!2&V0p$D<+ws+16Dh``Qqyuddcxmtbo?al6Fh zzMO*#aLHrAVYt@)d>O9I@0{Cs1@^YtU-!cMoLYeG`IU2T7tu#3U&SEB1cwP`CJv}h zRlvfSCZSi1svrDVH`j+wEXx=5Xp9z?=U+B$LACDqAo{j$se?=$Xt9U3qYgVggnI1E t6X@dq%+8zJ^Iwa-v`Z7AW4KVj=h8{)!JvD(I8_7vF{Djum{0+(~nil{7 delta 1110 zcmY*YO=w+36lP|cG}V{HT798;Nt&0Gv`yP|?)==DtJoxpH8w``D&qfQ{;n=W43r{D z+eLLNJTEc|MG>J;v6}b-(}f6K3896C7L^J$#gzm}17agZabD6cWEOM2a}MX{JLg>f zrSsCyowHZ_04b`2I~srXt$_`(C{>s4C`$v&86b_dmw($+EWOiHES=iaxc&aEfpySv z_G0sI{B^dcHTu_os5C}q|7iw^Vlgt?T1u<-DbJri=9re}0Zr=|;TiCuC)CW;Ag2TCC@X~&NVjLV9wVYy? zs6^_-Xl9sXE_IY~o+@lJCIwD2E*Z;mqp(q0aB9vM3QADM@x5a!_DAT;LTwyxsBvFTC5 zt<5sQT`Da$nk$tg1REkHr8GC06g1H`u{I$&PZP&d=@b)08qA26#rU@JhtIc!w=Tjp z*bI0uG9Xf;I8LIVGaVUFhErpWuu8f-CW$l5sFgOy88cRrLSV@>+Lf+Y{a~ae zxeQZ)eeDpK9*t8q!cnIT!_p=RPcSje5T-26sKYUa)>LtuGs|(RB+ezzg%XS?PErpl zpzJ#*(Y@<~tt`UrP2yvQ{_xxtcoR0&9-M+Z!y7-sopIMg{U`WH_qzAuD#UTieSJPI zPwy6V9{H`uVoh8Nixr7HNSmf{%8JK@w1RT3l_Hen!r8bK*g0l%;#^Km5+Alya$^1B z6vY0M5*iNQx1;mDF~iI*)bCI1M`izSKWg{qPQorfHU>KU(k^r~Ji8Blwq?Vs5;{_} za!<2`3Dgt1$I-8C%~}g}^g(~~5bE>OlW4#ncru21?J#Qm&laMs5-;0VUqL;7, Arc)) { - let Library { sync, .. } = library.as_ref(); - +pub async fn run_actor(sync: Arc, notify: Arc) { loop { { let mut rx = sync.ingest.req_rx.lock().await; @@ -21,11 +17,11 @@ pub async fn run_actor((library, notify): (Arc, Arc)) { .await .is_ok() { - use crate::sync::ingest::*; - while let Some(req) = rx.recv().await { const OPS_PER_REQUEST: u32 = 1000; + use sd_core_sync::*; + let timestamps = match req { Request::FinishedIngesting => break, Request::Messages { timestamps } => timestamps, @@ -33,7 +29,7 @@ pub async fn run_actor((library, notify): (Arc, Arc)) { }; let ops = err_return!( - sync.get_cloud_ops(crate::sync::GetOpsArgs { + sync.get_cloud_ops(GetOpsArgs { clocks: timestamps, count: OPS_PER_REQUEST, }) @@ -46,7 +42,7 @@ pub async fn run_actor((library, notify): (Arc, Arc)) { sync.ingest .event_tx .send(sd_core_sync::Event::Messages(MessagesEvent { - instance_id: library.sync.instance, + instance_id: sync.instance, has_more: ops.len() == 1000, messages: ops, })) diff --git a/core/src/cloud/sync/mod.rs b/core/crates/cloud-sync/src/lib.rs similarity index 78% rename from core/src/cloud/sync/mod.rs rename to core/crates/cloud-sync/src/lib.rs index dac42b6c3..e178d22a6 100644 --- a/core/src/cloud/sync/mod.rs +++ b/core/crates/cloud-sync/src/lib.rs @@ -1,44 +1,11 @@ -use crate::{library::Library, Node}; use sd_sync::*; use serde::{Deserialize, Serialize}; use serde_json::Value; use uuid::Uuid; -use std::sync::{atomic, Arc}; - -mod ingest; -mod receive; -mod send; - -pub async fn declare_actors(library: &Arc, node: &Arc) { - let ingest_notify = Arc::new(Notify::new()); - let actors = &library.actors; - - let autorun = node.cloud_sync_flag.load(atomic::Ordering::Relaxed); - - let args = (library.clone(), node.clone()); - actors - .declare("Cloud Sync Sender", move || send::run_actor(args), autorun) - .await; - - let args = (library.clone(), node.clone(), ingest_notify.clone()); - actors - .declare( - "Cloud Sync Receiver", - move || receive::run_actor(args), - autorun, - ) - .await; - - let args = (library.clone(), ingest_notify); - actors - .declare( - "Cloud Sync Ingest", - move || ingest::run_actor(args), - autorun, - ) - .await; -} +pub mod ingest; +pub mod receive; +pub mod send; macro_rules! err_break { ($e:expr) => { @@ -64,9 +31,7 @@ macro_rules! err_return { } }; } - pub(crate) use err_return; -use tokio::sync::Notify; pub type CompressedCRDTOperationsForModel = Vec<(Value, Vec)>; diff --git a/core/src/cloud/sync/receive.rs b/core/crates/cloud-sync/src/receive.rs similarity index 89% rename from core/src/cloud/sync/receive.rs rename to core/crates/cloud-sync/src/receive.rs index 68c5f2dd6..ea7a494bc 100644 --- a/core/src/cloud/sync/receive.rs +++ b/core/crates/cloud-sync/src/receive.rs @@ -1,9 +1,6 @@ -use crate::{ - cloud::sync::{err_break, err_return, CompressedCRDTOperations}, - library::Library, - Node, -}; +use crate::{err_break, err_return, CompressedCRDTOperations}; +use sd_cloud_api::RequestConfigProvider; use sd_core_sync::NTP64; use sd_prisma::prisma::{cloud_crdt_operation, instance, PrismaClient, SortOrder}; use sd_sync::CRDTOperation; @@ -22,12 +19,16 @@ use serde_json::to_vec; use tokio::{sync::Notify, time::sleep}; use uuid::Uuid; -pub async fn run_actor((library, node, ingest_notify): (Arc, Arc, Arc)) { - let db = &library.db; - let library_id = library.id; - +pub async fn run_actor( + db: Arc, + library_id: Uuid, + instance_uuid: Uuid, + sync: Arc, + cloud_api_config_provider: Arc, + ingest_notify: Arc, +) { let mut cloud_timestamps = { - let timestamps = library.sync.timestamps.read().await; + let timestamps = sync.timestamps.read().await; let batch = timestamps .keys() @@ -74,9 +75,9 @@ pub async fn run_actor((library, node, ingest_notify): (Arc, Arc, let collections = { use sd_cloud_api::library::message_collections; message_collections::get( - node.cloud_api_config().await, + cloud_api_config_provider.cloud_api_config().await, library_id, - library.instance_uuid, + instance_uuid, instances .into_iter() .map(|i| { @@ -108,8 +109,8 @@ pub async fn run_actor((library, node, ingest_notify): (Arc, Arc, None => { let Some(fetched_library) = err_break!( sd_cloud_api::library::get( - node.cloud_api_config().await, - library.id + cloud_api_config_provider.cloud_api_config().await, + library_id ) .await ) else { @@ -137,7 +138,7 @@ pub async fn run_actor((library, node, ingest_notify): (Arc, Arc, err_break!( create_instance( - db, + &db, collection.instance_uuid, err_break!(BASE64_STANDARD.decode(instance.identity.clone())) ) @@ -152,7 +153,7 @@ pub async fn run_actor((library, node, ingest_notify): (Arc, Arc, &BASE64_STANDARD.decode(collection.contents) ))); - err_break!(write_cloud_ops_to_db(compressed_operations.into_ops(), db).await); + err_break!(write_cloud_ops_to_db(compressed_operations.into_ops(), &db).await); let collection_timestamp = NTP64(collection.end_time.parse().expect("unable to parse time")); diff --git a/core/src/cloud/sync/send.rs b/core/crates/cloud-sync/src/send.rs similarity index 62% rename from core/src/cloud/sync/send.rs rename to core/crates/cloud-sync/src/send.rs index c542f20cd..441a3ef04 100644 --- a/core/src/cloud/sync/send.rs +++ b/core/crates/cloud-sync/src/send.rs @@ -1,19 +1,23 @@ -use crate::{cloud::sync::CompressedCRDTOperations, Node}; +use crate::CompressedCRDTOperations; +use sd_cloud_api::RequestConfigProvider; use sd_core_sync::{GetOpsArgs, SyncMessage, NTP64}; -use sd_prisma::prisma::instance; +use sd_prisma::prisma::{instance, PrismaClient}; use sd_utils::from_bytes_to_uuid; +use uuid::Uuid; use std::{sync::Arc, time::Duration}; use tokio::time::sleep; -use super::{err_break, Library}; - -pub async fn run_actor((library, node): (Arc, Arc)) { - let db = &library.db; - let library_id = library.id; +use super::err_break; +pub async fn run_actor( + db: Arc, + library_id: Uuid, + sync: Arc, + cloud_api_config_provider: Arc, +) { loop { loop { let instances = err_break!( @@ -29,7 +33,7 @@ pub async fn run_actor((library, node): (Arc, Arc)) { let req_adds = err_break!( sd_cloud_api::library::message_collections::request_add( - node.cloud_api_config().await, + cloud_api_config_provider.cloud_api_config().await, library_id, instances, ) @@ -42,22 +46,20 @@ pub async fn run_actor((library, node): (Arc, Arc)) { for req_add in req_adds { let ops = err_break!( - library - .sync - .get_ops(GetOpsArgs { - count: 1000, - clocks: vec![( - req_add.instance_uuid, - NTP64( - req_add - .from_time - .unwrap_or_else(|| "0".to_string()) - .parse() - .expect("couldn't parse ntp64 value"), - ), - )], - }) - .await + sync.get_ops(GetOpsArgs { + count: 1000, + clocks: vec![( + req_add.instance_uuid, + NTP64( + req_add + .from_time + .unwrap_or_else(|| "0".to_string()) + .parse() + .expect("couldn't parse ntp64 value"), + ), + )], + }) + .await ); if ops.is_empty() { @@ -81,12 +83,19 @@ pub async fn run_actor((library, node): (Arc, Arc)) { break; } - err_break!(do_add(node.cloud_api_config().await, library_id, instances,).await); + err_break!( + do_add( + cloud_api_config_provider.cloud_api_config().await, + library_id, + instances, + ) + .await + ); } { // recreate subscription each time so that existing messages are dropped - let mut rx = library.sync.subscribe(); + let mut rx = sync.subscribe(); // wait until Created message comes in loop { diff --git a/core/src/cloud/sync.rs b/core/src/cloud/sync.rs new file mode 100644 index 000000000..f28940d70 --- /dev/null +++ b/core/src/cloud/sync.rs @@ -0,0 +1,65 @@ +use std::sync::{atomic, Arc}; +use tokio::sync::Notify; + +use crate::{library::Library, Node}; + +pub async fn declare_actors(library: &Arc, node: &Arc) { + let ingest_notify = Arc::new(Notify::new()); + let actors = &library.actors; + + let autorun = node.cloud_sync_flag.load(atomic::Ordering::Relaxed); + + actors + .declare( + "Cloud Sync Sender", + { + let library = library.clone(); + let node = node.clone(); + + move || { + sd_core_cloud_sync::send::run_actor( + library.db.clone(), + library.id, + library.sync.clone(), + node.clone(), + ) + } + }, + autorun, + ) + .await; + + actors + .declare( + "Cloud Sync Receiver", + { + let library = library.clone(); + let node = node.clone(); + let ingest_notify = ingest_notify.clone(); + + move || { + sd_core_cloud_sync::receive::run_actor( + library.db.clone(), + library.id, + library.instance_uuid, + library.sync.clone(), + node.clone(), + ingest_notify, + ) + } + }, + autorun, + ) + .await; + + actors + .declare( + "Cloud Sync Ingest", + { + let library = library.clone(); + move || sd_core_cloud_sync::ingest::run_actor(library.sync.clone(), ingest_notify) + }, + autorun, + ) + .await; +} diff --git a/core/src/lib.rs b/core/src/lib.rs index bb109e4bc..3c7dc7429 100644 --- a/core/src/lib.rs +++ b/core/src/lib.rs @@ -304,6 +304,12 @@ impl Node { } } +impl sd_cloud_api::RequestConfigProvider for Node { + async fn cloud_api_config(self: &Arc) -> sd_cloud_api::RequestConfig { + Node::cloud_api_config(self).await + } +} + /// Error type for Node related errors. #[derive(Error, Debug)] pub enum NodeError { diff --git a/core/src/library/library.rs b/core/src/library/library.rs index 6bf8d2c3d..60c9562cd 100644 --- a/core/src/library/library.rs +++ b/core/src/library/library.rs @@ -20,7 +20,7 @@ use tokio::{fs, io, sync::broadcast, sync::RwLock}; use tracing::warn; use uuid::Uuid; -use super::{Actors, LibraryConfig, LibraryManagerError}; +use super::{LibraryConfig, LibraryManagerError}; // TODO: Finish this // pub enum LibraryNew { @@ -53,7 +53,7 @@ pub struct Library { // TODO(@Oscar): Get rid of this with the new invalidation system. event_bus_tx: broadcast::Sender, - pub actors: Arc, + pub actors: Arc, } impl Debug for Library { diff --git a/core/src/library/mod.rs b/core/src/library/mod.rs index 944f40410..49057177f 100644 --- a/core/src/library/mod.rs +++ b/core/src/library/mod.rs @@ -1,4 +1,3 @@ -mod actors; mod config; #[allow(clippy::module_inception)] mod library; @@ -6,7 +5,6 @@ mod manager; mod name; mod statistics; -pub use actors::*; pub use config::*; pub use library::*; pub use manager::*; diff --git a/crates/actors/Cargo.toml b/crates/actors/Cargo.toml new file mode 100644 index 000000000..bdcf2401e --- /dev/null +++ b/crates/actors/Cargo.toml @@ -0,0 +1,12 @@ +[package] +name = "sd-actors" +version = "0.1.0" +license.workspace = true +edition.workspace = true +repository.workspace = true + +# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html + +[dependencies] +futures.workspace = true +tokio.workspace = true diff --git a/core/src/library/actors.rs b/crates/actors/src/lib.rs similarity index 100% rename from core/src/library/actors.rs rename to crates/actors/src/lib.rs diff --git a/crates/cloud-api/src/lib.rs b/crates/cloud-api/src/lib.rs index eae0f61c5..6e8b0cc92 100644 --- a/crates/cloud-api/src/lib.rs +++ b/crates/cloud-api/src/lib.rs @@ -1,5 +1,7 @@ pub mod auth; +use std::{future::Future, sync::Arc}; + use auth::OAuthToken; use serde::{Deserialize, Serialize}; use serde_json::json; @@ -12,6 +14,10 @@ pub struct RequestConfig { pub auth_token: Option, } +pub trait RequestConfigProvider { + fn cloud_api_config(self: &Arc) -> impl Future + Send; +} + #[derive(thiserror::Error, Debug)] #[error("{0}")] pub struct Error(String); diff --git a/rust-toolchain.toml b/rust-toolchain.toml index 50fa4928b..6d833ff50 100644 --- a/rust-toolchain.toml +++ b/rust-toolchain.toml @@ -1,2 +1,2 @@ [toolchain] -channel = "1.73" +channel = "1.75"