diff --git a/core/crates/sync/src/ingest_utils.rs b/core/crates/sync/src/ingest_utils.rs index e63f317ed..297dd3e5c 100644 --- a/core/crates/sync/src/ingest_utils.rs +++ b/core/crates/sync/src/ingest_utils.rs @@ -159,6 +159,7 @@ pub async fn process_crdt_operations( Ok(()) } +#[instrument(skip_all, err)] async fn handle_crdt_updates( db: &PrismaClient, device_pub_id: &DevicePubId, @@ -213,6 +214,7 @@ async fn handle_crdt_updates( .await } +#[instrument(skip_all, err)] async fn handle_crdt_create_and_updates( db: &PrismaClient, device_pub_id: &DevicePubId, @@ -291,6 +293,7 @@ async fn handle_crdt_create_and_updates( .await } +#[instrument(skip_all, err)] async fn handle_crdt_deletion( db: &PrismaClient, device_pub_id: &DevicePubId, diff --git a/core/crates/sync/src/manager.rs b/core/crates/sync/src/manager.rs index a01b6716b..d330852d4 100644 --- a/core/crates/sync/src/manager.rs +++ b/core/crates/sync/src/manager.rs @@ -25,7 +25,7 @@ use tokio::{ spawn, sync::{broadcast, Mutex, Notify, RwLock, Semaphore}, }; -use tracing::{debug, warn}; +use tracing::{debug, instrument, warn}; use uhlc::{HLCBuilder, HLC}; use uuid::Uuid; @@ -161,100 +161,112 @@ impl Manager { .collect::, Vec<_>), _>>() } + #[instrument(skip(self))] async fn ingest_by_model(&self, model_id: ModelId) -> Result<(), Error> { - let (ops_ids, ops) = self.fetch_cloud_crdt_ops(model_id, 10_000).await?; - if ops_ids.is_empty() { - return Ok(()); - } + let mut total_count = 0; - debug!( - messages_count = ops.len(), - first_message = ?ops - .first() - .map_or_else(|| SystemTime::UNIX_EPOCH.into(), |op| timestamp_to_datetime(op.timestamp)), - last_message = ?ops - .last() - .map_or_else(|| SystemTime::UNIX_EPOCH.into(), |op| timestamp_to_datetime(op.timestamp)), - model_id, - "Messages by model to ingest", - ); + loop { + let (ops_ids, ops) = self.fetch_cloud_crdt_ops(model_id, 10_000).await?; + if ops_ids.is_empty() { + break; + } - let mut compressed_map = - BTreeMap::, (RecordId, Vec)>>::new(); + debug!( + messages_count = ops.len(), + first_message = ?ops + .first() + .map_or_else(|| SystemTime::UNIX_EPOCH.into(), |op| timestamp_to_datetime(op.timestamp)), + last_message = ?ops + .last() + .map_or_else(|| SystemTime::UNIX_EPOCH.into(), |op| timestamp_to_datetime(op.timestamp)), + "Messages by model to ingest", + ); - for CRDTOperation { - device_pub_id, - timestamp, - model_id: _, // Ignoring model_id as we know it already - record_id, - data, - } in ops - { - let records = compressed_map.entry(device_pub_id).or_default(); + let mut compressed_map = + BTreeMap::, (RecordId, Vec)>>::new(); - // Can't use RecordId as a key because rmpv::Value doesn't implement Hash + Eq. - // So we use it's serialized bytes as a key. - let record_id_bytes = - rmp_serde::to_vec_named(&record_id).expect("already serialized to Value"); + for CRDTOperation { + device_pub_id, + timestamp, + model_id: _, // Ignoring model_id as we know it already + record_id, + data, + } in ops + { + let records = compressed_map.entry(device_pub_id).or_default(); - match records.entry(record_id_bytes) { - Entry::Occupied(mut entry) => { - entry - .get_mut() - .1 - .push(CompressedCRDTOperation { timestamp, data }); - } - Entry::Vacant(entry) => { - entry.insert((record_id, vec![CompressedCRDTOperation { timestamp, data }])); + // Can't use RecordId as a key because rmpv::Value doesn't implement Hash + Eq. + // So we use it's serialized bytes as a key. + let record_id_bytes = + rmp_serde::to_vec_named(&record_id).expect("already serialized to Value"); + + match records.entry(record_id_bytes) { + Entry::Occupied(mut entry) => { + entry + .get_mut() + .1 + .push(CompressedCRDTOperation { timestamp, data }); + } + Entry::Vacant(entry) => { + entry + .insert((record_id, vec![CompressedCRDTOperation { timestamp, data }])); + } } } - } - let _lock_guard = self.sync_lock.lock().await; + let _lock_guard = self.sync_lock.lock().await; - let semaphore = &Arc::new(Semaphore::new(self.available_parallelism)); + let semaphore = &Arc::new(Semaphore::new(self.available_parallelism)); - let handles = compressed_map - .into_iter() - .flat_map(|(device_pub_id, records)| { - records.into_values().map(move |(record_id, ops)| { - // We can process each record in parallel as they are independent - spawn({ - let clock = Arc::clone(&self.clock); - let timestamp_per_device = Arc::clone(&self.timestamp_per_device); - let db = Arc::clone(&self.db); - let device_pub_id = device_pub_id.into(); - let semaphore = Arc::clone(semaphore); + let handles = compressed_map + .into_iter() + .flat_map(|(device_pub_id, records)| { + records.into_values().map(move |(record_id, ops)| { + // We can process each record in parallel as they are independent + spawn({ + let clock = Arc::clone(&self.clock); + let timestamp_per_device = Arc::clone(&self.timestamp_per_device); + let db = Arc::clone(&self.db); + let device_pub_id = device_pub_id.into(); + let semaphore = Arc::clone(semaphore); - async move { - let _permit = - semaphore.acquire().await.expect("semaphore never closes"); + async move { + let _permit = + semaphore.acquire().await.expect("semaphore never closes"); - process_crdt_operations( - &clock, - ×tamp_per_device, - &db, - device_pub_id, - model_id, - record_id, - ops, - ) - .await - } + let count = ops.len(); + + process_crdt_operations( + &clock, + ×tamp_per_device, + &db, + device_pub_id, + model_id, + record_id, + ops, + ) + .await + .map(|()| count) + } + }) }) }) - }) - .collect::>(); + .collect::>(); - for res in handles.join().await { - res.map_err(Error::ProcessCrdtPanic)??; + for res in handles.join().await { + let count = res.map_err(Error::ProcessCrdtPanic)??; + debug!(count, "Ingested operations of model"); + total_count += count; + } + + self.db + .cloud_crdt_operation() + .delete_many(vec![cloud_crdt_operation::id::in_vec(ops_ids)]) + .exec() + .await?; } - self.db - .cloud_crdt_operation() - .delete_many(vec![cloud_crdt_operation::id::in_vec(ops_ids)]) - .exec() - .await?; + debug!(total_count, "Ingested all operations of this model"); Ok(()) }