Tweak crdt ops compression and use it on cloud sync

This commit is contained in:
Ericson Soares
2024-09-06 21:18:22 -03:00
parent 278fccba0f
commit 0ac38b6845
5 changed files with 172 additions and 117 deletions

View File

@@ -21,6 +21,7 @@ mod ingest;
mod receive;
mod send;
// use ingest::Ingester;
use receive::Receiver;
use send::Sender;
@@ -74,8 +75,8 @@ pub async fn declare_actors(
data_dir,
sync_group_pub_id,
cloud_services,
sync,
ingest_notify,
sync.clone(),
Arc::clone(&ingest_notify),
Arc::clone(&actors_state.receive_active),
Arc::clone(&actors_state.notifier),
),
@@ -83,25 +84,21 @@ pub async fn declare_actors(
.try_join()
.await?;
// let ingester = Ingester::new(
// sync,
// ingest_notify,
// Arc::clone(&actors_state.ingest_active),
// Arc::clone(&actors_state.notifier),
// );
actors
.declare_many_boxed([sender.into_actor(), receiver.into_actor()])
.declare_many_boxed([
sender.into_actor(),
receiver.into_actor(),
// ingester.into_actor(),
])
.await;
// actors
// .declare(
// "Cloud Sync Ingest",
// {
// let active = state.ingest_active.clone();
// let active_notifier = state.notifier.clone();
// move |stop| {
// ingest::run_actor(sync.clone(), ingest_notify, active, active_notifier, stop)
// }
// },
// autorun,
// )
// .await;
Ok(())
}

View File

@@ -8,7 +8,9 @@ use sd_cloud_schema::{
},
Client, Service,
};
use sd_core_sync::{cloud_crdt_op_db, CRDTOperation, SyncManager};
use sd_core_sync::{
cloud_crdt_op_db, CRDTOperation, CompressedCRDTOperationsPerModel, SyncManager,
};
use sd_actors::{Actor, Stopper};
use sd_crypto::{
@@ -54,7 +56,7 @@ use super::SyncActors;
const CLOUD_SYNC_DATA_KEEPER_FILE: &str = "cloud_sync_data_keeper.bin";
const ONE_MINUTE: Duration = Duration::from_secs(60);
// Responsible for downloading sync operations from the cloud to be processed by the ingester
/// Responsible for downloading sync operations from the cloud to be processed by the ingester
pub struct Receiver {
keeper: LastTimestampKeeper,
@@ -266,9 +268,9 @@ async fn handle_single_message(
.map_err(Error::ErrorResponseDownloadSyncMessages)?;
let crdt_ops = if let Some(size) = response.content_length() {
extract_messages_known_size(response, size, secret_key).await
extract_messages_known_size(response, size, secret_key, original_device_pub_id).await
} else {
extract_messages_unknown_size(response, secret_key).await
extract_messages_unknown_size(response, secret_key, original_device_pub_id).await
}?;
assert_eq!(
crdt_ops.len(),
@@ -284,6 +286,7 @@ async fn extract_messages_known_size(
response: Response,
size: u64,
secret_key: SecretKey,
devices::PubId(device_pub_id): devices::PubId,
) -> Result<Vec<CRDTOperation>, Error> {
let plain_text = if size <= EncryptedBlock::CIPHER_TEXT_SIZE as u64 {
OneShotDecryption::decrypt(
@@ -320,13 +323,16 @@ async fn extract_messages_known_size(
plain_text
};
postcard::from_bytes(&plain_text).map_err(Error::DeserializationFailureToPullSyncMessages)
postcard::from_bytes::<CompressedCRDTOperationsPerModel>(&plain_text)
.map(|compressed_ops| compressed_ops.into_ops(device_pub_id))
.map_err(Error::DeserializationFailureToPullSyncMessages)
}
#[instrument(skip_all, err)]
async fn extract_messages_unknown_size(
response: Response,
secret_key: SecretKey,
devices::PubId(device_pub_id): devices::PubId,
) -> Result<Vec<CRDTOperation>, Error> {
let plain_text = match UnknownDownloadKind::new(response).await? {
UnknownDownloadKind::OneShot(buffer) => {
@@ -345,7 +351,9 @@ async fn extract_messages_unknown_size(
}
};
postcard::from_bytes(&plain_text).map_err(Error::DeserializationFailureToPullSyncMessages)
postcard::from_bytes::<CompressedCRDTOperationsPerModel>(&plain_text)
.map(|compressed_ops| compressed_ops.into_ops(device_pub_id))
.map_err(Error::DeserializationFailureToPullSyncMessages)
}
#[instrument(skip_all, err)]

View File

@@ -1,6 +1,6 @@
use crate::{CloudServices, Error, KeyManager};
use sd_core_sync::{SyncEvent, SyncManager, NTP64};
use sd_core_sync::{CompressedCRDTOperationsPerModelPerDevice, SyncEvent, SyncManager, NTP64};
use sd_actors::{Actor, Stopper};
use sd_cloud_schema::{
@@ -173,8 +173,12 @@ impl Sender {
let start_time = timestamp_to_datetime(first.timestamp);
let end_time = timestamp_to_datetime(last.timestamp);
let messages_bytes =
postcard::to_stdvec(&ops).map_err(Error::SerializationFailureToPushSyncMessages)?;
// Ignoring this device_pub_id here as we already know it
let (_device_pub_id, compressed_ops) =
CompressedCRDTOperationsPerModelPerDevice::new_single_device(ops);
let messages_bytes = postcard::to_stdvec(&compressed_ops)
.map_err(Error::SerializationFailureToPushSyncMessages)?;
let (mut push_updates, mut push_responses) = self
.cloud_client

View File

@@ -28,7 +28,6 @@
#![allow(clippy::missing_errors_doc, clippy::module_name_repetitions)]
use sd_prisma::prisma::{cloud_crdt_operation, crdt_operation, device, PrismaClient};
use sd_sync::ModelId;
use sd_utils::uuid_to_bytes;
use std::{
@@ -56,8 +55,9 @@ pub enum SyncEvent {
pub use sd_core_prisma_helpers::DevicePubId;
pub use sd_sync::{
CRDTOperation, OperationFactory, RelationSyncId, RelationSyncModel, SharedSyncModel, SyncId,
SyncModel,
CRDTOperation, CompressedCRDTOperation, CompressedCRDTOperationsPerModel,
CompressedCRDTOperationsPerModelPerDevice, ModelId, OperationFactory, RecordId, RelationSyncId,
RelationSyncModel, SharedSyncModel, SyncId, SyncModel,
};
pub type TimestampPerDevice = Arc<RwLock<HashMap<DevicePubId, NTP64>>>;

View File

@@ -1,13 +1,14 @@
use crate::{CRDTOperation, CRDTOperationData, DevicePubId, ModelId, RecordId};
use std::mem;
use std::collections::BTreeMap;
use serde::{Deserialize, Serialize};
use uhlc::NTP64;
use uuid::Uuid;
pub type CompressedCRDTOperationsPerModel =
Vec<(ModelId, Vec<(RecordId, Vec<CompressedCRDTOperation>)>)>;
#[derive(Serialize, Deserialize, Debug)]
pub struct CompressedCRDTOperationsPerModel(pub Vec<(ModelId, CompressedCRDTOperationsPerRecord)>);
pub type CompressedCRDTOperationsPerRecord = Vec<(RecordId, Vec<CompressedCRDTOperation>)>;
/// Stores a bunch of [`CRDTOperation`]s in a more memory-efficient form for sending to the cloud.
#[derive(Serialize, Deserialize, Debug)]
@@ -18,67 +19,68 @@ pub struct CompressedCRDTOperationsPerModelPerDevice(
impl CompressedCRDTOperationsPerModelPerDevice {
#[must_use]
pub fn new(ops: Vec<CRDTOperation>) -> Self {
let mut compressed = vec![];
let mut compressed_map = BTreeMap::<
DevicePubId,
BTreeMap<ModelId, Vec<(RecordId, Vec<CompressedCRDTOperation>)>>,
>::new();
let mut ops_iter = ops.into_iter();
for CRDTOperation {
device_pub_id,
timestamp,
model_id,
record_id,
data,
} in ops
{
let records = compressed_map
.entry(device_pub_id)
.or_default()
.entry(model_id)
.or_default();
let Some(first) = ops_iter.next() else {
return Self(vec![]);
};
let mut device_pub_id = first.device_pub_id;
let mut device_messages = vec![];
let mut model_id = first.model_id;
let mut model = vec![];
let mut record_id = first.record_id.clone();
let mut record = vec![first.into()];
for op in ops_iter {
if device_pub_id != op.device_pub_id {
model.push((
mem::replace(&mut record_id, op.record_id.clone()),
mem::take(&mut record),
));
device_messages.push((
mem::replace(&mut model_id, op.model_id),
mem::take(&mut model),
));
compressed.push((
mem::replace(&mut device_pub_id, op.device_pub_id),
mem::take(&mut device_messages),
));
} else if model_id != op.model_id {
model.push((
mem::replace(&mut record_id, op.record_id.clone()),
mem::take(&mut record),
));
device_messages.push((
mem::replace(&mut model_id, op.model_id),
mem::take(&mut model),
));
} else if record_id != op.record_id {
model.push((
mem::replace(&mut record_id, op.record_id.clone()),
mem::take(&mut record),
));
// Can't use RecordId as a key because rmpv::Value doesn't implement Hash + Eq
if let Some((_, ops)) = records
.iter_mut()
.find(|(current_record_id, _)| *current_record_id == record_id)
{
ops.push(CompressedCRDTOperation { timestamp, data });
} else {
records.push((record_id, vec![CompressedCRDTOperation { timestamp, data }]));
}
record.push(CompressedCRDTOperation::from(op));
}
model.push((record_id, record));
device_messages.push((model_id, model));
compressed.push((device_pub_id, device_messages));
Self(
compressed_map
.into_iter()
.map(|(device_pub_id, model_map)| {
(
device_pub_id,
CompressedCRDTOperationsPerModel(model_map.into_iter().collect()),
)
})
.collect(),
)
}
Self(compressed)
/// Creates a new [`CompressedCRDTOperationsPerModel`] from crdt operation of a single device.
///
/// # Panics
/// Will panic if there are more than one device.
#[must_use]
pub fn new_single_device(
ops: Vec<CRDTOperation>,
) -> (DevicePubId, CompressedCRDTOperationsPerModel) {
let Self(mut compressed) = Self::new(ops);
assert_eq!(compressed.len(), 1, "Expected a single device");
compressed.remove(0)
}
#[must_use]
pub fn first(&self) -> Option<(Uuid, u16, &rmpv::Value, &CompressedCRDTOperation)> {
pub fn first(&self) -> Option<(DevicePubId, ModelId, &RecordId, &CompressedCRDTOperation)> {
self.0.first().and_then(|(instance, data)| {
data.first().and_then(|(model, data)| {
data.0.first().and_then(|(model, data)| {
data.first()
.and_then(|(record, ops)| ops.first().map(|op| (*instance, *model, record, op)))
})
@@ -86,9 +88,9 @@ impl CompressedCRDTOperationsPerModelPerDevice {
}
#[must_use]
pub fn last(&self) -> Option<(Uuid, u16, &rmpv::Value, &CompressedCRDTOperation)> {
pub fn last(&self) -> Option<(DevicePubId, ModelId, &RecordId, &CompressedCRDTOperation)> {
self.0.last().and_then(|(instance, data)| {
data.last().and_then(|(model, data)| {
data.0.last().and_then(|(model, data)| {
data.last()
.and_then(|(record, ops)| ops.last().map(|op| (*instance, *model, record, op)))
})
@@ -100,11 +102,12 @@ impl CompressedCRDTOperationsPerModelPerDevice {
self.0
.iter()
.map(|(_, data)| {
data.iter()
data.0
.iter()
.map(|(_, data)| data.iter().map(|(_, ops)| ops.len()).sum::<usize>())
.sum::<usize>()
})
.sum::<usize>()
.sum()
}
#[must_use]
@@ -114,10 +117,10 @@ impl CompressedCRDTOperationsPerModelPerDevice {
#[must_use]
pub fn into_ops(self) -> Vec<CRDTOperation> {
let mut ops = vec![];
let mut ops = Vec::with_capacity(self.len());
for (device_pub_id, device_messages) in self.0 {
for (model_id, model_messages) in device_messages {
for (model_id, model_messages) in device_messages.0 {
for (record_id, record) in model_messages {
for op in record {
ops.push(CRDTOperation {
@@ -136,6 +139,58 @@ impl CompressedCRDTOperationsPerModelPerDevice {
}
}
impl CompressedCRDTOperationsPerModel {
#[must_use]
pub fn first(&self) -> Option<(ModelId, &RecordId, &CompressedCRDTOperation)> {
self.0.first().and_then(|(model_id, data)| {
data.first()
.and_then(|(record_id, ops)| ops.first().map(|op| (*model_id, record_id, op)))
})
}
#[must_use]
pub fn last(&self) -> Option<(ModelId, &RecordId, &CompressedCRDTOperation)> {
self.0.last().and_then(|(model_id, data)| {
data.last()
.and_then(|(record_id, ops)| ops.last().map(|op| (*model_id, record_id, op)))
})
}
#[must_use]
pub fn len(&self) -> usize {
self.0
.iter()
.map(|(_, data)| data.iter().map(|(_, ops)| ops.len()).sum::<usize>())
.sum()
}
#[must_use]
pub fn is_empty(&self) -> bool {
self.len() == 0
}
#[must_use]
pub fn into_ops(self, device_pub_id: DevicePubId) -> Vec<CRDTOperation> {
let mut ops = Vec::with_capacity(self.len());
for (model_id, model_messages) in self.0 {
for (record_id, record) in model_messages {
for op in record {
ops.push(CRDTOperation {
device_pub_id,
model_id,
record_id: record_id.clone(),
timestamp: op.timestamp,
data: op.data,
});
}
}
}
ops
}
}
#[derive(PartialEq, Serialize, Deserialize, Clone, Debug)]
pub struct CompressedCRDTOperation {
pub timestamp: NTP64,
@@ -155,6 +210,7 @@ impl From<CRDTOperation> for CompressedCRDTOperation {
#[cfg(test)]
mod test {
use super::*;
use uuid::Uuid;
#[test]
fn compress() {
@@ -215,20 +271,18 @@ mod test {
let CompressedCRDTOperationsPerModelPerDevice(compressed) =
CompressedCRDTOperationsPerModelPerDevice::new(uncompressed);
assert_eq!(compressed[0].1[0].0, 0);
assert_eq!(compressed[0].1[1].0, 1);
assert_eq!(compressed[0].1[2].0, 0);
assert_eq!(compressed[0].1 .0[0].0, 0);
assert_eq!(compressed[0].1 .0[1].0, 1);
assert_eq!(compressed[0].1[0].1[0].1.len(), 3);
assert_eq!(compressed[0].1[1].1[0].1.len(), 2);
assert_eq!(compressed[0].1[2].1[0].1.len(), 2);
assert_eq!(compressed[0].1 .0[0].1[0].1.len(), 5);
assert_eq!(compressed[0].1 .0[1].1[0].1.len(), 2);
}
#[test]
fn into_ops() {
let compressed = CompressedCRDTOperationsPerModelPerDevice(vec![(
Uuid::new_v4(),
vec![
CompressedCRDTOperationsPerModel(vec![
(
0,
vec![(
@@ -246,6 +300,14 @@ mod test {
timestamp: NTP64(0),
data: CRDTOperationData::create(),
},
CompressedCRDTOperation {
timestamp: NTP64(0),
data: CRDTOperationData::create(),
},
CompressedCRDTOperation {
timestamp: NTP64(0),
data: CRDTOperationData::create(),
},
],
)],
),
@@ -265,30 +327,14 @@ mod test {
],
)],
),
(
0,
vec![(
rmpv::Value::Nil,
vec![
CompressedCRDTOperation {
timestamp: NTP64(0),
data: CRDTOperationData::create(),
},
CompressedCRDTOperation {
timestamp: NTP64(0),
data: CRDTOperationData::create(),
},
],
)],
),
],
]),
)]);
let uncompressed = compressed.into_ops();
assert_eq!(uncompressed.len(), 7);
assert_eq!(uncompressed[2].model_id, 0);
assert_eq!(uncompressed[4].model_id, 1);
assert_eq!(uncompressed[6].model_id, 0);
assert_eq!(uncompressed[4].model_id, 0);
assert_eq!(uncompressed[6].model_id, 1);
}
}