From 576729334ace811cd5ff490ccf600d617236ba08 Mon Sep 17 00:00:00 2001 From: Ericson Soares Date: Mon, 14 Oct 2024 23:39:06 -0300 Subject: [PATCH] Many tweaks to improve sync responsiveness and observability --- Cargo.lock | Bin 345091 -> 345122 bytes core/crates/cloud-services/src/sync/ingest.rs | 3 +- core/crates/cloud-services/src/sync/mod.rs | 19 +--- .../crates/cloud-services/src/sync/receive.rs | 90 ++++++++---------- core/crates/cloud-services/src/sync/send.rs | 7 +- core/crates/heavy-lifting/src/indexer/mod.rs | 6 +- core/crates/sync/Cargo.toml | 1 + core/crates/sync/src/ingest_utils.rs | 9 +- core/crates/sync/src/manager.rs | 49 ++++++---- crates/utils/Cargo.toml | 2 + crates/utils/src/lib.rs | 21 ++++ 11 files changed, 114 insertions(+), 93 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 5e237f4cddbeb85b8c6b825dc70988e70dbeb9fe..674aab99ecb23a43ded9898271510f0734fa0183 100644 GIT binary patch delta 61 zcmV-D0K)%+h!vuU6@Y{RgaWh!)2Wv(n*$4%tsnyox8JD)j1rfRgaiwh1gQfTmx8lyaTt=yaX|a+(j5! delta 65 zcmV-H0KWgCh!ul~6@Y{RgaWh!)2WvRnFA!3mAV5GmbwE7w;ZYiS`wG;xdRxNnY;rr Xm%gI|2A8m?0~m+#yaTuKyaX|aO5_@t diff --git a/core/crates/cloud-services/src/sync/ingest.rs b/core/crates/cloud-services/src/sync/ingest.rs index 90714e144..065ceb964 100644 --- a/core/crates/cloud-services/src/sync/ingest.rs +++ b/core/crates/cloud-services/src/sync/ingest.rs @@ -4,6 +4,7 @@ use sd_core_sync::{from_cloud_crdt_ops, CompressedCRDTOperationsPerModelPerDevic use sd_actors::{Actor, Stopper}; use sd_prisma::prisma::{cloud_crdt_operation, SortOrder}; +use sd_utils::timestamp_to_datetime; use std::{ future::IntoFuture, @@ -19,7 +20,7 @@ use futures_concurrency::future::Race; use tokio::{sync::Notify, time::sleep}; use tracing::{debug, error}; -use super::{timestamp_to_datetime, ReceiveAndIngestNotifiers, SyncActors, ONE_MINUTE}; +use super::{ReceiveAndIngestNotifiers, SyncActors, ONE_MINUTE}; const BATCH_SIZE: i64 = 1000; diff --git a/core/crates/cloud-services/src/sync/mod.rs b/core/crates/cloud-services/src/sync/mod.rs index 0d5ebf45c..b694befb4 100644 --- a/core/crates/cloud-services/src/sync/mod.rs +++ b/core/crates/cloud-services/src/sync/mod.rs @@ -1,7 +1,6 @@ use crate::{CloudServices, Error}; -use futures_concurrency::future::TryJoin; -use sd_core_sync::{SyncManager, NTP64}; +use sd_core_sync::SyncManager; use sd_actors::{ActorsCollection, IntoActor}; use sd_cloud_schema::sync::groups; @@ -11,10 +10,10 @@ use std::{ fmt, path::Path, sync::{atomic::AtomicBool, Arc}, - time::{Duration, SystemTime, UNIX_EPOCH}, + time::Duration, }; -use chrono::{DateTime, Utc}; +use futures_concurrency::future::TryJoin; use tokio::sync::Notify; mod ingest; @@ -135,15 +134,3 @@ pub async fn declare_actors( Ok(Arc::clone(&actors_state.receiver_and_ingester_notifiers)) } - -fn datetime_to_timestamp(latest_time: DateTime) -> NTP64 { - NTP64::from( - SystemTime::from(latest_time) - .duration_since(UNIX_EPOCH) - .expect("hardcoded earlier time, nothing is earlier than UNIX_EPOCH"), - ) -} - -fn timestamp_to_datetime(timestamp: NTP64) -> DateTime { - DateTime::from(timestamp.to_system_time()) -} diff --git a/core/crates/cloud-services/src/sync/receive.rs b/core/crates/cloud-services/src/sync/receive.rs index 27b4b5ae8..c8e411107 100644 --- a/core/crates/cloud-services/src/sync/receive.rs +++ b/core/crates/cloud-services/src/sync/receive.rs @@ -22,7 +22,6 @@ use sd_prisma::prisma::PrismaClient; use std::{ collections::{hash_map::Entry, HashMap}, future::IntoFuture, - num::NonZero, path::Path, pin::Pin, sync::{ @@ -34,7 +33,7 @@ use std::{ use chrono::{DateTime, Utc}; use futures::{FutureExt, StreamExt, TryStreamExt}; -use futures_concurrency::future::{Join, Race, TryJoin}; +use futures_concurrency::future::{Race, TryJoin}; use quic_rpc::transport::quinn::QuinnConnection; use reqwest::Response; use reqwest_middleware::ClientWithMiddleware; @@ -42,12 +41,11 @@ use serde::{Deserialize, Serialize}; use tokio::{ fs, io::{self, AsyncRead, AsyncReadExt, ReadBuf}, - spawn, - sync::{Notify, Semaphore}, + sync::Notify, time::sleep, }; use tokio_util::io::StreamReader; -use tracing::{debug, error, instrument}; +use tracing::{debug, error, instrument, warn}; use uuid::Uuid; use super::{ReceiveAndIngestNotifiers, SyncActors, ONE_MINUTE}; @@ -62,7 +60,6 @@ pub struct Receiver { device_pub_id: devices::PubId, cloud_services: Arc, cloud_client: Client>, - semaphore: Arc, key_manager: Arc, sync: SyncManager, notifiers: Arc, @@ -137,11 +134,6 @@ impl Receiver { device_pub_id: devices::PubId(Uuid::from(&sync.device_pub_id)), cloud_services, cloud_client, - semaphore: Arc::new(Semaphore::new( - std::thread::available_parallelism() - .map(NonZero::get) - .unwrap_or(1), - )), key_manager, sync, notifiers, @@ -179,9 +171,10 @@ impl Receiver { } self.handle_new_messages(new_messages).await?; - self.notifiers.notify_ingester(); } + debug!("Finished sync messages receiver actor iteration"); + self.keeper.save().await } @@ -189,35 +182,36 @@ impl Receiver { &mut self, new_messages: Vec, ) -> Result<(), Error> { - let handles = new_messages - .into_iter() - .map(|message| { - let sync_group_pub_id = self.sync_group_pub_id; - let semaphore = Arc::clone(&self.semaphore); - let key_manager = Arc::clone(&self.key_manager); - let sync = self.sync.clone(); - let http_client = self.cloud_services.http_client().clone(); + debug!( + new_messages_collections_count = new_messages.len(), + start_time = ?new_messages.first().map(|c| c.start_time), + end_time = ?new_messages.first().map(|c| c.end_time), + "Handling new sync messages collections", + ); - async move { - spawn(handle_single_message( - sync_group_pub_id, - message, - semaphore, - key_manager, - sync, - http_client, - )) - .await - } - }) - .collect::>(); + for message in new_messages.into_iter().filter(|message| { + if message.original_device_pub_id == self.device_pub_id { + warn!("Received sync message from the current device, need to check backend, this is a bug!"); + false + } else { + true + } + }) { + debug!( + new_messages_count = message.operations_count, + start_time = ?message.start_time, + end_time = ?message.end_time, + "Handling new sync messages", + ); - for res in handles.join().await { - let Ok(res) = res else { - return Err(Error::SyncMessagesDownloadAndDecryptTaskPanicked); - }; - - let (device_pub_id, timestamp) = res?; + let (device_pub_id, timestamp) = handle_single_message( + self.sync_group_pub_id, + message, + &self.key_manager, + &self.sync, + self.cloud_services.http_client(), + ) + .await?; match self.keeper.timestamps.entry(device_pub_id) { Entry::Occupied(mut entry) => { @@ -225,10 +219,16 @@ impl Receiver { *entry.get_mut() = timestamp; } } + Entry::Vacant(entry) => { entry.insert(timestamp); } } + + // To ingest after each sync message collection is received, we MUST download and + // store the messages SEQUENTIALLY, otherwise we might ingest messages out of order + // due to parallel downloads + self.notifiers.notify_ingester(); } Ok(()) @@ -249,21 +249,15 @@ async fn handle_single_message( signed_download_link, .. }: MessagesCollection, - semaphore: Arc, - key_manager: Arc, - sync: SyncManager, - http_client: ClientWithMiddleware, + key_manager: &KeyManager, + sync: &SyncManager, + http_client: &ClientWithMiddleware, ) -> Result<(devices::PubId, DateTime), Error> { // FIXME(@fogodev): If we don't have the key hash, we need to fetch it from another device in the group if possible let Some(secret_key) = key_manager.get_key(sync_group_pub_id, &key_hash).await else { return Err(Error::MissingKeyHash); }; - let _permit = semaphore - .acquire() - .await - .expect("sync messages receiver semaphore never closes"); - let response = http_client .get(signed_download_link) .send() diff --git a/core/crates/cloud-services/src/sync/send.rs b/core/crates/cloud-services/src/sync/send.rs index 060188fec..2e36b8118 100644 --- a/core/crates/cloud-services/src/sync/send.rs +++ b/core/crates/cloud-services/src/sync/send.rs @@ -14,6 +14,7 @@ use sd_crypto::{ primitives::EncryptedBlock, CryptoRng, SeedableRng, }; +use sd_utils::{datetime_to_timestamp, timestamp_to_datetime}; use std::{ future::IntoFuture, @@ -40,11 +41,13 @@ use tokio::{ use tracing::{debug, error}; use uuid::Uuid; -use super::{datetime_to_timestamp, timestamp_to_datetime, SyncActors, ONE_MINUTE}; +use super::{SyncActors, ONE_MINUTE}; const TEN_SECONDS: Duration = Duration::from_secs(10); const THIRTY_SECONDS: Duration = Duration::from_secs(30); +const MESSAGES_COLLECTION_SIZE: u32 = 100_000; + enum RaceNotifiedOrStopped { Notified, Stopped, @@ -173,7 +176,7 @@ impl Sender { let mut crdt_ops_stream = pin!(self.sync.stream_device_ops( &self.sync.device_pub_id, - 1000, + MESSAGES_COLLECTION_SIZE, current_latest_timestamp )); diff --git a/core/crates/heavy-lifting/src/indexer/mod.rs b/core/crates/heavy-lifting/src/indexer/mod.rs index 028ec8d2d..8ec8d59e8 100644 --- a/core/crates/heavy-lifting/src/indexer/mod.rs +++ b/core/crates/heavy-lifting/src/indexer/mod.rs @@ -322,7 +322,7 @@ pub async fn reverse_update_directories_sizes( ) .await?; - let to_sync_and_update = ancestors + let (sync_ops, update_queries) = ancestors .into_values() .filter_map(|materialized_path| { if let Some((pub_id, size)) = @@ -350,7 +350,9 @@ pub async fn reverse_update_directories_sizes( }) .unzip::<_, _, Vec<_>, Vec<_>>(); - sync.write_ops(db, to_sync_and_update).await?; + if !sync_ops.is_empty() && !update_queries.is_empty() { + sync.write_ops(db, (sync_ops, update_queries)).await?; + } Ok(()) } diff --git a/core/crates/sync/Cargo.toml b/core/crates/sync/Cargo.toml index 5e87856af..229a58ead 100644 --- a/core/crates/sync/Cargo.toml +++ b/core/crates/sync/Cargo.toml @@ -19,6 +19,7 @@ sd-utils = { path = "../../../crates/utils" } # Workspace dependencies async-channel = { workspace = true } async-stream = { workspace = true } +chrono = { workspace = true } futures = { workspace = true } futures-concurrency = { workspace = true } prisma-client-rust = { workspace = true, features = ["rspc"] } diff --git a/core/crates/sync/src/ingest_utils.rs b/core/crates/sync/src/ingest_utils.rs index d8e6629df..3cc4c8a68 100644 --- a/core/crates/sync/src/ingest_utils.rs +++ b/core/crates/sync/src/ingest_utils.rs @@ -171,7 +171,8 @@ async fn handle_crdt_updates( } db._transaction() - .with_timeout(30 * 1000) + .with_timeout(30 * 10000) + .with_max_wait(30 * 10000) .run(|db| async move { // fake operation to batch them all at once ModelSyncData::from_op(CRDTOperation { @@ -251,7 +252,8 @@ async fn handle_crdt_create_and_updates( } db._transaction() - .with_timeout(30 * 1000) + .with_timeout(30 * 10000) + .with_max_wait(30 * 10000) .run(|db| async move { // fake a create with a bunch of data rather than individual insert ModelSyncData::from_op(CRDTOperation { @@ -309,7 +311,8 @@ async fn handle_crdt_deletion( }; db._transaction() - .with_timeout(30 * 1000) + .with_timeout(30 * 10000) + .with_max_wait(30 * 10000) .run(|db| async move { ModelSyncData::from_op(op.clone()) .ok_or(Error::InvalidModelId(model))? diff --git a/core/crates/sync/src/manager.rs b/core/crates/sync/src/manager.rs index ce8e6c39c..382261b9d 100644 --- a/core/crates/sync/src/manager.rs +++ b/core/crates/sync/src/manager.rs @@ -8,6 +8,7 @@ use sd_sync::{ CRDTOperation, CompressedCRDTOperationsPerModel, CompressedCRDTOperationsPerModelPerDevice, ModelId, OperationFactory, }; +use sd_utils::timestamp_to_datetime; use std::{ collections::BTreeMap, @@ -23,7 +24,7 @@ use async_stream::stream; use futures::Stream; use futures_concurrency::future::TryJoin; use tokio::sync::{broadcast, Mutex, Notify, RwLock}; -use tracing::warn; +use tracing::{debug, warn}; use uhlc::{HLCBuilder, HLC}; use uuid::Uuid; @@ -319,30 +320,36 @@ impl Manager { .exec() .await { - Ok(ops) => { - if ops.is_empty() { - break; + Ok(ops) if ops.is_empty() => break, + + Ok(ops) => match ops + .into_iter() + .map(from_crdt_ops) + .collect::, _>>() + { + Ok(ops) => { + debug!( + start_datetime = ?ops + .first() + .map(|op| timestamp_to_datetime(op.timestamp)), + end_datetime = ?ops + .last() + .map(|op| timestamp_to_datetime(op.timestamp)), + count = ops.len(), + "Streaming crdt ops", + ); + + if let Some(last_op) = ops.last() { + current_initial_timestamp = last_op.timestamp; + } + + yield Ok(ops); } - match ops.into_iter().map(from_crdt_ops).collect::, _>>() { - Ok(ops) => { - if let Some(last_op) = ops.last() { - current_initial_timestamp = last_op.timestamp; - } - - yield Ok(ops); - }, - Err(e) => { - yield Err(e); - break; - }, - } + Err(e) => return yield Err(e), } - Err(e) => { - yield Err(e.into()); - break; - } + Err(e) => return yield Err(e.into()) } } } diff --git a/crates/utils/Cargo.toml b/crates/utils/Cargo.toml index c9685fa14..45891b344 100644 --- a/crates/utils/Cargo.toml +++ b/crates/utils/Cargo.toml @@ -12,8 +12,10 @@ rust-version.workspace = true sd-prisma = { path = "../prisma" } # Workspace dependencies +chrono = { workspace = true } prisma-client-rust = { workspace = true } rspc = { workspace = true, features = ["unstable"] } thiserror = { workspace = true } tracing = { workspace = true } +uhlc = { workspace = true } uuid = { workspace = true } diff --git a/crates/utils/src/lib.rs b/crates/utils/src/lib.rs index da92b7b79..09f45d609 100644 --- a/crates/utils/src/lib.rs +++ b/crates/utils/src/lib.rs @@ -27,6 +27,10 @@ #![forbid(deprecated_in_future)] #![allow(clippy::missing_errors_doc, clippy::module_name_repetitions)] +use std::time::{SystemTime, UNIX_EPOCH}; + +use chrono::{DateTime, Utc}; +use uhlc::NTP64; use uuid::Uuid; pub mod db; @@ -104,6 +108,23 @@ macro_rules! msgpack { }} } +/// Helper function to convert a [`chrono::DateTime`] to a [`uhlc::NTP64`] +#[allow(clippy::missing_panics_doc)] // Doesn't actually panic +#[must_use] +pub fn datetime_to_timestamp(latest_time: DateTime) -> NTP64 { + NTP64::from( + SystemTime::from(latest_time) + .duration_since(UNIX_EPOCH) + .expect("hardcoded earlier time, nothing is earlier than UNIX_EPOCH"), + ) +} + +/// Helper function to convert a [`uhlc::NTP64`] to a [`chrono::DateTime`] +#[must_use] +pub fn timestamp_to_datetime(timestamp: NTP64) -> DateTime { + DateTime::from(timestamp.to_system_time()) +} + // Only used for testing purposes. Do not use in production code. use std::any::type_name;