From 0ac38b6845574494fe5adfa31451f755cdb07f37 Mon Sep 17 00:00:00 2001 From: Ericson Soares Date: Fri, 6 Sep 2024 21:18:22 -0300 Subject: [PATCH] Tweak crdt ops compression and use it on cloud sync --- core/crates/cloud-services/src/sync/mod.rs | 33 ++- .../crates/cloud-services/src/sync/receive.rs | 20 +- core/crates/cloud-services/src/sync/send.rs | 10 +- core/crates/sync/src/lib.rs | 6 +- crates/sync/src/compressed.rs | 220 +++++++++++------- 5 files changed, 172 insertions(+), 117 deletions(-) diff --git a/core/crates/cloud-services/src/sync/mod.rs b/core/crates/cloud-services/src/sync/mod.rs index 0221fd4f8..2d3f6575e 100644 --- a/core/crates/cloud-services/src/sync/mod.rs +++ b/core/crates/cloud-services/src/sync/mod.rs @@ -21,6 +21,7 @@ mod ingest; mod receive; mod send; +// use ingest::Ingester; use receive::Receiver; use send::Sender; @@ -74,8 +75,8 @@ pub async fn declare_actors( data_dir, sync_group_pub_id, cloud_services, - sync, - ingest_notify, + sync.clone(), + Arc::clone(&ingest_notify), Arc::clone(&actors_state.receive_active), Arc::clone(&actors_state.notifier), ), @@ -83,25 +84,21 @@ pub async fn declare_actors( .try_join() .await?; + // let ingester = Ingester::new( + // sync, + // ingest_notify, + // Arc::clone(&actors_state.ingest_active), + // Arc::clone(&actors_state.notifier), + // ); + actors - .declare_many_boxed([sender.into_actor(), receiver.into_actor()]) + .declare_many_boxed([ + sender.into_actor(), + receiver.into_actor(), + // ingester.into_actor(), + ]) .await; - // actors - // .declare( - // "Cloud Sync Ingest", - // { - // let active = state.ingest_active.clone(); - // let active_notifier = state.notifier.clone(); - - // move |stop| { - // ingest::run_actor(sync.clone(), ingest_notify, active, active_notifier, stop) - // } - // }, - // autorun, - // ) - // .await; - Ok(()) } diff --git a/core/crates/cloud-services/src/sync/receive.rs b/core/crates/cloud-services/src/sync/receive.rs index abaf10f31..149851203 100644 --- a/core/crates/cloud-services/src/sync/receive.rs +++ b/core/crates/cloud-services/src/sync/receive.rs @@ -8,7 +8,9 @@ use sd_cloud_schema::{ }, Client, Service, }; -use sd_core_sync::{cloud_crdt_op_db, CRDTOperation, SyncManager}; +use sd_core_sync::{ + cloud_crdt_op_db, CRDTOperation, CompressedCRDTOperationsPerModel, SyncManager, +}; use sd_actors::{Actor, Stopper}; use sd_crypto::{ @@ -54,7 +56,7 @@ use super::SyncActors; const CLOUD_SYNC_DATA_KEEPER_FILE: &str = "cloud_sync_data_keeper.bin"; const ONE_MINUTE: Duration = Duration::from_secs(60); -// Responsible for downloading sync operations from the cloud to be processed by the ingester +/// Responsible for downloading sync operations from the cloud to be processed by the ingester pub struct Receiver { keeper: LastTimestampKeeper, @@ -266,9 +268,9 @@ async fn handle_single_message( .map_err(Error::ErrorResponseDownloadSyncMessages)?; let crdt_ops = if let Some(size) = response.content_length() { - extract_messages_known_size(response, size, secret_key).await + extract_messages_known_size(response, size, secret_key, original_device_pub_id).await } else { - extract_messages_unknown_size(response, secret_key).await + extract_messages_unknown_size(response, secret_key, original_device_pub_id).await }?; assert_eq!( crdt_ops.len(), @@ -284,6 +286,7 @@ async fn extract_messages_known_size( response: Response, size: u64, secret_key: SecretKey, + devices::PubId(device_pub_id): devices::PubId, ) -> Result, Error> { let plain_text = if size <= EncryptedBlock::CIPHER_TEXT_SIZE as u64 { OneShotDecryption::decrypt( @@ -320,13 +323,16 @@ async fn extract_messages_known_size( plain_text }; - postcard::from_bytes(&plain_text).map_err(Error::DeserializationFailureToPullSyncMessages) + postcard::from_bytes::(&plain_text) + .map(|compressed_ops| compressed_ops.into_ops(device_pub_id)) + .map_err(Error::DeserializationFailureToPullSyncMessages) } #[instrument(skip_all, err)] async fn extract_messages_unknown_size( response: Response, secret_key: SecretKey, + devices::PubId(device_pub_id): devices::PubId, ) -> Result, Error> { let plain_text = match UnknownDownloadKind::new(response).await? { UnknownDownloadKind::OneShot(buffer) => { @@ -345,7 +351,9 @@ async fn extract_messages_unknown_size( } }; - postcard::from_bytes(&plain_text).map_err(Error::DeserializationFailureToPullSyncMessages) + postcard::from_bytes::(&plain_text) + .map(|compressed_ops| compressed_ops.into_ops(device_pub_id)) + .map_err(Error::DeserializationFailureToPullSyncMessages) } #[instrument(skip_all, err)] diff --git a/core/crates/cloud-services/src/sync/send.rs b/core/crates/cloud-services/src/sync/send.rs index dcba941b7..b0e6d03ba 100644 --- a/core/crates/cloud-services/src/sync/send.rs +++ b/core/crates/cloud-services/src/sync/send.rs @@ -1,6 +1,6 @@ use crate::{CloudServices, Error, KeyManager}; -use sd_core_sync::{SyncEvent, SyncManager, NTP64}; +use sd_core_sync::{CompressedCRDTOperationsPerModelPerDevice, SyncEvent, SyncManager, NTP64}; use sd_actors::{Actor, Stopper}; use sd_cloud_schema::{ @@ -173,8 +173,12 @@ impl Sender { let start_time = timestamp_to_datetime(first.timestamp); let end_time = timestamp_to_datetime(last.timestamp); - let messages_bytes = - postcard::to_stdvec(&ops).map_err(Error::SerializationFailureToPushSyncMessages)?; + // Ignoring this device_pub_id here as we already know it + let (_device_pub_id, compressed_ops) = + CompressedCRDTOperationsPerModelPerDevice::new_single_device(ops); + + let messages_bytes = postcard::to_stdvec(&compressed_ops) + .map_err(Error::SerializationFailureToPushSyncMessages)?; let (mut push_updates, mut push_responses) = self .cloud_client diff --git a/core/crates/sync/src/lib.rs b/core/crates/sync/src/lib.rs index 1b437e7e1..af9aa1552 100644 --- a/core/crates/sync/src/lib.rs +++ b/core/crates/sync/src/lib.rs @@ -28,7 +28,6 @@ #![allow(clippy::missing_errors_doc, clippy::module_name_repetitions)] use sd_prisma::prisma::{cloud_crdt_operation, crdt_operation, device, PrismaClient}; -use sd_sync::ModelId; use sd_utils::uuid_to_bytes; use std::{ @@ -56,8 +55,9 @@ pub enum SyncEvent { pub use sd_core_prisma_helpers::DevicePubId; pub use sd_sync::{ - CRDTOperation, OperationFactory, RelationSyncId, RelationSyncModel, SharedSyncModel, SyncId, - SyncModel, + CRDTOperation, CompressedCRDTOperation, CompressedCRDTOperationsPerModel, + CompressedCRDTOperationsPerModelPerDevice, ModelId, OperationFactory, RecordId, RelationSyncId, + RelationSyncModel, SharedSyncModel, SyncId, SyncModel, }; pub type TimestampPerDevice = Arc>>; diff --git a/crates/sync/src/compressed.rs b/crates/sync/src/compressed.rs index 52de0db7a..75bff0165 100644 --- a/crates/sync/src/compressed.rs +++ b/crates/sync/src/compressed.rs @@ -1,13 +1,14 @@ use crate::{CRDTOperation, CRDTOperationData, DevicePubId, ModelId, RecordId}; -use std::mem; +use std::collections::BTreeMap; use serde::{Deserialize, Serialize}; use uhlc::NTP64; -use uuid::Uuid; -pub type CompressedCRDTOperationsPerModel = - Vec<(ModelId, Vec<(RecordId, Vec)>)>; +#[derive(Serialize, Deserialize, Debug)] +pub struct CompressedCRDTOperationsPerModel(pub Vec<(ModelId, CompressedCRDTOperationsPerRecord)>); + +pub type CompressedCRDTOperationsPerRecord = Vec<(RecordId, Vec)>; /// Stores a bunch of [`CRDTOperation`]s in a more memory-efficient form for sending to the cloud. #[derive(Serialize, Deserialize, Debug)] @@ -18,67 +19,68 @@ pub struct CompressedCRDTOperationsPerModelPerDevice( impl CompressedCRDTOperationsPerModelPerDevice { #[must_use] pub fn new(ops: Vec) -> Self { - let mut compressed = vec![]; + let mut compressed_map = BTreeMap::< + DevicePubId, + BTreeMap)>>, + >::new(); - let mut ops_iter = ops.into_iter(); + for CRDTOperation { + device_pub_id, + timestamp, + model_id, + record_id, + data, + } in ops + { + let records = compressed_map + .entry(device_pub_id) + .or_default() + .entry(model_id) + .or_default(); - let Some(first) = ops_iter.next() else { - return Self(vec![]); - }; - - let mut device_pub_id = first.device_pub_id; - let mut device_messages = vec![]; - - let mut model_id = first.model_id; - let mut model = vec![]; - - let mut record_id = first.record_id.clone(); - let mut record = vec![first.into()]; - - for op in ops_iter { - if device_pub_id != op.device_pub_id { - model.push(( - mem::replace(&mut record_id, op.record_id.clone()), - mem::take(&mut record), - )); - device_messages.push(( - mem::replace(&mut model_id, op.model_id), - mem::take(&mut model), - )); - compressed.push(( - mem::replace(&mut device_pub_id, op.device_pub_id), - mem::take(&mut device_messages), - )); - } else if model_id != op.model_id { - model.push(( - mem::replace(&mut record_id, op.record_id.clone()), - mem::take(&mut record), - )); - device_messages.push(( - mem::replace(&mut model_id, op.model_id), - mem::take(&mut model), - )); - } else if record_id != op.record_id { - model.push(( - mem::replace(&mut record_id, op.record_id.clone()), - mem::take(&mut record), - )); + // Can't use RecordId as a key because rmpv::Value doesn't implement Hash + Eq + if let Some((_, ops)) = records + .iter_mut() + .find(|(current_record_id, _)| *current_record_id == record_id) + { + ops.push(CompressedCRDTOperation { timestamp, data }); + } else { + records.push((record_id, vec![CompressedCRDTOperation { timestamp, data }])); } - - record.push(CompressedCRDTOperation::from(op)); } - model.push((record_id, record)); - device_messages.push((model_id, model)); - compressed.push((device_pub_id, device_messages)); + Self( + compressed_map + .into_iter() + .map(|(device_pub_id, model_map)| { + ( + device_pub_id, + CompressedCRDTOperationsPerModel(model_map.into_iter().collect()), + ) + }) + .collect(), + ) + } - Self(compressed) + /// Creates a new [`CompressedCRDTOperationsPerModel`] from crdt operation of a single device. + /// + /// # Panics + /// Will panic if there are more than one device. + #[must_use] + pub fn new_single_device( + ops: Vec, + ) -> (DevicePubId, CompressedCRDTOperationsPerModel) { + let Self(mut compressed) = Self::new(ops); + + assert_eq!(compressed.len(), 1, "Expected a single device"); + + compressed.remove(0) } #[must_use] - pub fn first(&self) -> Option<(Uuid, u16, &rmpv::Value, &CompressedCRDTOperation)> { + pub fn first(&self) -> Option<(DevicePubId, ModelId, &RecordId, &CompressedCRDTOperation)> { self.0.first().and_then(|(instance, data)| { - data.first().and_then(|(model, data)| { + data.0.first().and_then(|(model, data)| { data.first() .and_then(|(record, ops)| ops.first().map(|op| (*instance, *model, record, op))) }) @@ -86,9 +88,9 @@ impl CompressedCRDTOperationsPerModelPerDevice { } #[must_use] - pub fn last(&self) -> Option<(Uuid, u16, &rmpv::Value, &CompressedCRDTOperation)> { + pub fn last(&self) -> Option<(DevicePubId, ModelId, &RecordId, &CompressedCRDTOperation)> { self.0.last().and_then(|(instance, data)| { - data.last().and_then(|(model, data)| { + data.0.last().and_then(|(model, data)| { data.last() .and_then(|(record, ops)| ops.last().map(|op| (*instance, *model, record, op))) }) @@ -100,11 +102,12 @@ impl CompressedCRDTOperationsPerModelPerDevice { self.0 .iter() .map(|(_, data)| { - data.iter() + data.0 + .iter() .map(|(_, data)| data.iter().map(|(_, ops)| ops.len()).sum::()) .sum::() }) - .sum::() + .sum() } #[must_use] @@ -114,10 +117,10 @@ impl CompressedCRDTOperationsPerModelPerDevice { #[must_use] pub fn into_ops(self) -> Vec { - let mut ops = vec![]; + let mut ops = Vec::with_capacity(self.len()); for (device_pub_id, device_messages) in self.0 { - for (model_id, model_messages) in device_messages { + for (model_id, model_messages) in device_messages.0 { for (record_id, record) in model_messages { for op in record { ops.push(CRDTOperation { @@ -136,6 +139,58 @@ impl CompressedCRDTOperationsPerModelPerDevice { } } +impl CompressedCRDTOperationsPerModel { + #[must_use] + pub fn first(&self) -> Option<(ModelId, &RecordId, &CompressedCRDTOperation)> { + self.0.first().and_then(|(model_id, data)| { + data.first() + .and_then(|(record_id, ops)| ops.first().map(|op| (*model_id, record_id, op))) + }) + } + + #[must_use] + pub fn last(&self) -> Option<(ModelId, &RecordId, &CompressedCRDTOperation)> { + self.0.last().and_then(|(model_id, data)| { + data.last() + .and_then(|(record_id, ops)| ops.last().map(|op| (*model_id, record_id, op))) + }) + } + + #[must_use] + pub fn len(&self) -> usize { + self.0 + .iter() + .map(|(_, data)| data.iter().map(|(_, ops)| ops.len()).sum::()) + .sum() + } + + #[must_use] + pub fn is_empty(&self) -> bool { + self.len() == 0 + } + + #[must_use] + pub fn into_ops(self, device_pub_id: DevicePubId) -> Vec { + let mut ops = Vec::with_capacity(self.len()); + + for (model_id, model_messages) in self.0 { + for (record_id, record) in model_messages { + for op in record { + ops.push(CRDTOperation { + device_pub_id, + model_id, + record_id: record_id.clone(), + timestamp: op.timestamp, + data: op.data, + }); + } + } + } + + ops + } +} + #[derive(PartialEq, Serialize, Deserialize, Clone, Debug)] pub struct CompressedCRDTOperation { pub timestamp: NTP64, @@ -155,6 +210,7 @@ impl From for CompressedCRDTOperation { #[cfg(test)] mod test { use super::*; + use uuid::Uuid; #[test] fn compress() { @@ -215,20 +271,18 @@ mod test { let CompressedCRDTOperationsPerModelPerDevice(compressed) = CompressedCRDTOperationsPerModelPerDevice::new(uncompressed); - assert_eq!(compressed[0].1[0].0, 0); - assert_eq!(compressed[0].1[1].0, 1); - assert_eq!(compressed[0].1[2].0, 0); + assert_eq!(compressed[0].1 .0[0].0, 0); + assert_eq!(compressed[0].1 .0[1].0, 1); - assert_eq!(compressed[0].1[0].1[0].1.len(), 3); - assert_eq!(compressed[0].1[1].1[0].1.len(), 2); - assert_eq!(compressed[0].1[2].1[0].1.len(), 2); + assert_eq!(compressed[0].1 .0[0].1[0].1.len(), 5); + assert_eq!(compressed[0].1 .0[1].1[0].1.len(), 2); } #[test] fn into_ops() { let compressed = CompressedCRDTOperationsPerModelPerDevice(vec![( Uuid::new_v4(), - vec![ + CompressedCRDTOperationsPerModel(vec![ ( 0, vec![( @@ -246,6 +300,14 @@ mod test { timestamp: NTP64(0), data: CRDTOperationData::create(), }, + CompressedCRDTOperation { + timestamp: NTP64(0), + data: CRDTOperationData::create(), + }, + CompressedCRDTOperation { + timestamp: NTP64(0), + data: CRDTOperationData::create(), + }, ], )], ), @@ -265,30 +327,14 @@ mod test { ], )], ), - ( - 0, - vec![( - rmpv::Value::Nil, - vec![ - CompressedCRDTOperation { - timestamp: NTP64(0), - data: CRDTOperationData::create(), - }, - CompressedCRDTOperation { - timestamp: NTP64(0), - data: CRDTOperationData::create(), - }, - ], - )], - ), - ], + ]), )]); let uncompressed = compressed.into_ops(); assert_eq!(uncompressed.len(), 7); assert_eq!(uncompressed[2].model_id, 0); - assert_eq!(uncompressed[4].model_id, 1); - assert_eq!(uncompressed[6].model_id, 0); + assert_eq!(uncompressed[4].model_id, 0); + assert_eq!(uncompressed[6].model_id, 1); } }