diff --git a/core/crates/sync/src/ingest.rs b/core/crates/sync/src/ingest.rs index c37ec4efc..c4bf2a09e 100644 --- a/core/crates/sync/src/ingest.rs +++ b/core/crates/sync/src/ingest.rs @@ -116,28 +116,30 @@ impl Actor { } } + // where the magic happens async fn receive_crdt_operation(&mut self, op: CRDTOperation) { + // first, we update the HLC's timestamp with the incoming one. + // this involves a drift check + sets the last time of the clock self.clock .update_with_timestamp(&Timestamp::new(op.timestamp, op.instance.into())) - .ok(); + .expect("timestamp has too much drift!"); - let mut timestamp = { - let mut clocks = self.timestamps.write().await; - *clocks.entry(op.instance).or_insert_with(|| op.timestamp) - }; - - if timestamp < op.timestamp { - timestamp = op.timestamp; - } + // read the timestamp for the operation's instance, or insert one if it doesn't exist + let timestamp = self.timestamps.write().await.get(&op.instance).cloned(); + // copy some fields bc rust ownership let op_instance = op.instance; + let op_timestamp = op.timestamp; - let is_old = self.compare_message(&op).await; - - if !is_old { + if !self.is_operation_old(&op).await { + // actually go and apply the operation in the db self.apply_op(op).await.ok(); - self.timestamps.write().await.insert(op_instance, timestamp); + // update the stored timestamp for this instance - will be derived from the crdt operations table on restart + self.timestamps.write().await.insert( + op_instance, + NTP64::max(timestamp.unwrap_or_default(), op_timestamp), + ); } } @@ -145,11 +147,13 @@ impl Actor { self.db ._transaction() .run(|db| async move { + // apply the operation to the actual record ModelSyncData::from_op(op.clone()) .unwrap() .exec(&db) .await?; + // write the operation to the operations table write_crdt_op_to_db(&op, &db).await?; Ok(()) @@ -161,7 +165,8 @@ impl Actor { Ok(()) } - async fn compare_message(&mut self, op: &CRDTOperation) -> bool { + // determines if an operation is old and shouldn't be applied + async fn is_operation_old(&mut self, op: &CRDTOperation) -> bool { let db = &self.db; let old_timestamp = {