separate cloud sync into sd-core-cloud-sync (#2025)

* separate cloud sync into sd-core-cloud-sync

* ci
This commit is contained in:
Brendan Allan
2024-01-31 16:16:54 +08:00
committed by GitHub
parent c1ae1aed37
commit c45f6d7bfa
16 changed files with 177 additions and 96 deletions

View File

@@ -16,7 +16,7 @@ runs:
uses: dtolnay/rust-toolchain@stable
with:
target: ${{ inputs.target }}
toolchain: '1.73'
toolchain: '1.75'
components: clippy, rustfmt
- name: Cache Rust Dependencies

BIN
Cargo.lock generated
View File

Binary file not shown.

View File

@@ -22,6 +22,7 @@ ai = ["dep:sd-ai"]
# Sub-crates
sd-cache = { path = "../crates/cache" }
sd-core-sync = { path = "./crates/sync" }
sd-core-cloud-sync = { path = "./crates/cloud-sync" }
# sd-cloud-api = { path = "../crates/cloud-api" }
sd-crypto = { path = "../crates/crypto", features = [
"rspc",
@@ -122,6 +123,7 @@ tar = "0.4.40"
aws-sdk-s3 = { version = "1.5.0", features = ["behavior-version-latest"] }
aws-config = "1.0.3"
aws-credential-types = "1.0.3"
sd-actors = { version = "0.1.0", path = "../crates/actors" }
# Override features of transitive dependencies
[dependencies.openssl]

View File

@@ -0,0 +1,21 @@
[package]
name = "sd-core-cloud-sync"
version = "0.1.0"
edition = "2021"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
base64.workspace = true
chrono.workspace = true
sd-cloud-api = { path = "../../../crates/cloud-api" }
sd-core-sync = { path = "../sync" }
sd-prisma = { path = "../../../crates/prisma" }
sd-sync = { path = "../../../crates/sync" }
sd-utils = { path = "../../../crates/utils" }
serde.workspace = true
serde_json = "1.0.113"
tokio.workspace = true
tracing.workspace = true
uuid.workspace = true
prisma-client-rust = { workspace = true }

View File

@@ -1,15 +1,11 @@
use crate::cloud::sync::err_return;
use crate::err_return;
use std::sync::Arc;
use tokio::sync::Notify;
use tracing::info;
use super::Library;
pub async fn run_actor((library, notify): (Arc<Library>, Arc<Notify>)) {
let Library { sync, .. } = library.as_ref();
pub async fn run_actor(sync: Arc<sd_core_sync::Manager>, notify: Arc<Notify>) {
loop {
{
let mut rx = sync.ingest.req_rx.lock().await;
@@ -21,11 +17,11 @@ pub async fn run_actor((library, notify): (Arc<Library>, Arc<Notify>)) {
.await
.is_ok()
{
use crate::sync::ingest::*;
while let Some(req) = rx.recv().await {
const OPS_PER_REQUEST: u32 = 1000;
use sd_core_sync::*;
let timestamps = match req {
Request::FinishedIngesting => break,
Request::Messages { timestamps } => timestamps,
@@ -33,7 +29,7 @@ pub async fn run_actor((library, notify): (Arc<Library>, Arc<Notify>)) {
};
let ops = err_return!(
sync.get_cloud_ops(crate::sync::GetOpsArgs {
sync.get_cloud_ops(GetOpsArgs {
clocks: timestamps,
count: OPS_PER_REQUEST,
})
@@ -46,7 +42,7 @@ pub async fn run_actor((library, notify): (Arc<Library>, Arc<Notify>)) {
sync.ingest
.event_tx
.send(sd_core_sync::Event::Messages(MessagesEvent {
instance_id: library.sync.instance,
instance_id: sync.instance,
has_more: ops.len() == 1000,
messages: ops,
}))

View File

@@ -1,44 +1,11 @@
use crate::{library::Library, Node};
use sd_sync::*;
use serde::{Deserialize, Serialize};
use serde_json::Value;
use uuid::Uuid;
use std::sync::{atomic, Arc};
mod ingest;
mod receive;
mod send;
pub async fn declare_actors(library: &Arc<Library>, node: &Arc<Node>) {
let ingest_notify = Arc::new(Notify::new());
let actors = &library.actors;
let autorun = node.cloud_sync_flag.load(atomic::Ordering::Relaxed);
let args = (library.clone(), node.clone());
actors
.declare("Cloud Sync Sender", move || send::run_actor(args), autorun)
.await;
let args = (library.clone(), node.clone(), ingest_notify.clone());
actors
.declare(
"Cloud Sync Receiver",
move || receive::run_actor(args),
autorun,
)
.await;
let args = (library.clone(), ingest_notify);
actors
.declare(
"Cloud Sync Ingest",
move || ingest::run_actor(args),
autorun,
)
.await;
}
pub mod ingest;
pub mod receive;
pub mod send;
macro_rules! err_break {
($e:expr) => {
@@ -64,9 +31,7 @@ macro_rules! err_return {
}
};
}
pub(crate) use err_return;
use tokio::sync::Notify;
pub type CompressedCRDTOperationsForModel = Vec<(Value, Vec<CompressedCRDTOperation>)>;

View File

@@ -1,9 +1,6 @@
use crate::{
cloud::sync::{err_break, err_return, CompressedCRDTOperations},
library::Library,
Node,
};
use crate::{err_break, err_return, CompressedCRDTOperations};
use sd_cloud_api::RequestConfigProvider;
use sd_core_sync::NTP64;
use sd_prisma::prisma::{cloud_crdt_operation, instance, PrismaClient, SortOrder};
use sd_sync::CRDTOperation;
@@ -22,12 +19,16 @@ use serde_json::to_vec;
use tokio::{sync::Notify, time::sleep};
use uuid::Uuid;
pub async fn run_actor((library, node, ingest_notify): (Arc<Library>, Arc<Node>, Arc<Notify>)) {
let db = &library.db;
let library_id = library.id;
pub async fn run_actor(
db: Arc<PrismaClient>,
library_id: Uuid,
instance_uuid: Uuid,
sync: Arc<sd_core_sync::Manager>,
cloud_api_config_provider: Arc<impl RequestConfigProvider>,
ingest_notify: Arc<Notify>,
) {
let mut cloud_timestamps = {
let timestamps = library.sync.timestamps.read().await;
let timestamps = sync.timestamps.read().await;
let batch = timestamps
.keys()
@@ -74,9 +75,9 @@ pub async fn run_actor((library, node, ingest_notify): (Arc<Library>, Arc<Node>,
let collections = {
use sd_cloud_api::library::message_collections;
message_collections::get(
node.cloud_api_config().await,
cloud_api_config_provider.cloud_api_config().await,
library_id,
library.instance_uuid,
instance_uuid,
instances
.into_iter()
.map(|i| {
@@ -108,8 +109,8 @@ pub async fn run_actor((library, node, ingest_notify): (Arc<Library>, Arc<Node>,
None => {
let Some(fetched_library) = err_break!(
sd_cloud_api::library::get(
node.cloud_api_config().await,
library.id
cloud_api_config_provider.cloud_api_config().await,
library_id
)
.await
) else {
@@ -137,7 +138,7 @@ pub async fn run_actor((library, node, ingest_notify): (Arc<Library>, Arc<Node>,
err_break!(
create_instance(
db,
&db,
collection.instance_uuid,
err_break!(BASE64_STANDARD.decode(instance.identity.clone()))
)
@@ -152,7 +153,7 @@ pub async fn run_actor((library, node, ingest_notify): (Arc<Library>, Arc<Node>,
&BASE64_STANDARD.decode(collection.contents)
)));
err_break!(write_cloud_ops_to_db(compressed_operations.into_ops(), db).await);
err_break!(write_cloud_ops_to_db(compressed_operations.into_ops(), &db).await);
let collection_timestamp =
NTP64(collection.end_time.parse().expect("unable to parse time"));

View File

@@ -1,19 +1,23 @@
use crate::{cloud::sync::CompressedCRDTOperations, Node};
use crate::CompressedCRDTOperations;
use sd_cloud_api::RequestConfigProvider;
use sd_core_sync::{GetOpsArgs, SyncMessage, NTP64};
use sd_prisma::prisma::instance;
use sd_prisma::prisma::{instance, PrismaClient};
use sd_utils::from_bytes_to_uuid;
use uuid::Uuid;
use std::{sync::Arc, time::Duration};
use tokio::time::sleep;
use super::{err_break, Library};
pub async fn run_actor((library, node): (Arc<Library>, Arc<Node>)) {
let db = &library.db;
let library_id = library.id;
use super::err_break;
pub async fn run_actor(
db: Arc<PrismaClient>,
library_id: Uuid,
sync: Arc<sd_core_sync::Manager>,
cloud_api_config_provider: Arc<impl RequestConfigProvider>,
) {
loop {
loop {
let instances = err_break!(
@@ -29,7 +33,7 @@ pub async fn run_actor((library, node): (Arc<Library>, Arc<Node>)) {
let req_adds = err_break!(
sd_cloud_api::library::message_collections::request_add(
node.cloud_api_config().await,
cloud_api_config_provider.cloud_api_config().await,
library_id,
instances,
)
@@ -42,22 +46,20 @@ pub async fn run_actor((library, node): (Arc<Library>, Arc<Node>)) {
for req_add in req_adds {
let ops = err_break!(
library
.sync
.get_ops(GetOpsArgs {
count: 1000,
clocks: vec![(
req_add.instance_uuid,
NTP64(
req_add
.from_time
.unwrap_or_else(|| "0".to_string())
.parse()
.expect("couldn't parse ntp64 value"),
),
)],
})
.await
sync.get_ops(GetOpsArgs {
count: 1000,
clocks: vec![(
req_add.instance_uuid,
NTP64(
req_add
.from_time
.unwrap_or_else(|| "0".to_string())
.parse()
.expect("couldn't parse ntp64 value"),
),
)],
})
.await
);
if ops.is_empty() {
@@ -81,12 +83,19 @@ pub async fn run_actor((library, node): (Arc<Library>, Arc<Node>)) {
break;
}
err_break!(do_add(node.cloud_api_config().await, library_id, instances,).await);
err_break!(
do_add(
cloud_api_config_provider.cloud_api_config().await,
library_id,
instances,
)
.await
);
}
{
// recreate subscription each time so that existing messages are dropped
let mut rx = library.sync.subscribe();
let mut rx = sync.subscribe();
// wait until Created message comes in
loop {

65
core/src/cloud/sync.rs Normal file
View File

@@ -0,0 +1,65 @@
use std::sync::{atomic, Arc};
use tokio::sync::Notify;
use crate::{library::Library, Node};
pub async fn declare_actors(library: &Arc<Library>, node: &Arc<Node>) {
let ingest_notify = Arc::new(Notify::new());
let actors = &library.actors;
let autorun = node.cloud_sync_flag.load(atomic::Ordering::Relaxed);
actors
.declare(
"Cloud Sync Sender",
{
let library = library.clone();
let node = node.clone();
move || {
sd_core_cloud_sync::send::run_actor(
library.db.clone(),
library.id,
library.sync.clone(),
node.clone(),
)
}
},
autorun,
)
.await;
actors
.declare(
"Cloud Sync Receiver",
{
let library = library.clone();
let node = node.clone();
let ingest_notify = ingest_notify.clone();
move || {
sd_core_cloud_sync::receive::run_actor(
library.db.clone(),
library.id,
library.instance_uuid,
library.sync.clone(),
node.clone(),
ingest_notify,
)
}
},
autorun,
)
.await;
actors
.declare(
"Cloud Sync Ingest",
{
let library = library.clone();
move || sd_core_cloud_sync::ingest::run_actor(library.sync.clone(), ingest_notify)
},
autorun,
)
.await;
}

View File

@@ -304,6 +304,12 @@ impl Node {
}
}
impl sd_cloud_api::RequestConfigProvider for Node {
async fn cloud_api_config(self: &Arc<Self>) -> sd_cloud_api::RequestConfig {
Node::cloud_api_config(self).await
}
}
/// Error type for Node related errors.
#[derive(Error, Debug)]
pub enum NodeError {

View File

@@ -20,7 +20,7 @@ use tokio::{fs, io, sync::broadcast, sync::RwLock};
use tracing::warn;
use uuid::Uuid;
use super::{Actors, LibraryConfig, LibraryManagerError};
use super::{LibraryConfig, LibraryManagerError};
// TODO: Finish this
// pub enum LibraryNew {
@@ -53,7 +53,7 @@ pub struct Library {
// TODO(@Oscar): Get rid of this with the new invalidation system.
event_bus_tx: broadcast::Sender<CoreEvent>,
pub actors: Arc<Actors>,
pub actors: Arc<sd_actors::Actors>,
}
impl Debug for Library {

View File

@@ -1,4 +1,3 @@
mod actors;
mod config;
#[allow(clippy::module_inception)]
mod library;
@@ -6,7 +5,6 @@ mod manager;
mod name;
mod statistics;
pub use actors::*;
pub use config::*;
pub use library::*;
pub use manager::*;

12
crates/actors/Cargo.toml Normal file
View File

@@ -0,0 +1,12 @@
[package]
name = "sd-actors"
version = "0.1.0"
license.workspace = true
edition.workspace = true
repository.workspace = true
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
futures.workspace = true
tokio.workspace = true

View File

@@ -1,5 +1,7 @@
pub mod auth;
use std::{future::Future, sync::Arc};
use auth::OAuthToken;
use serde::{Deserialize, Serialize};
use serde_json::json;
@@ -12,6 +14,10 @@ pub struct RequestConfig {
pub auth_token: Option<auth::OAuthToken>,
}
pub trait RequestConfigProvider {
fn cloud_api_config(self: &Arc<Self>) -> impl Future<Output = RequestConfig> + Send;
}
#[derive(thiserror::Error, Debug)]
#[error("{0}")]
pub struct Error(String);

View File

@@ -1,2 +1,2 @@
[toolchain]
channel = "1.73"
channel = "1.75"