Many tweaks to improve sync responsiveness and observability

This commit is contained in:
Ericson Soares
2024-10-14 23:39:06 -03:00
parent e699ca27ec
commit 576729334a
11 changed files with 114 additions and 93 deletions

BIN
Cargo.lock generated
View File

Binary file not shown.

View File

@@ -4,6 +4,7 @@ use sd_core_sync::{from_cloud_crdt_ops, CompressedCRDTOperationsPerModelPerDevic
use sd_actors::{Actor, Stopper};
use sd_prisma::prisma::{cloud_crdt_operation, SortOrder};
use sd_utils::timestamp_to_datetime;
use std::{
future::IntoFuture,
@@ -19,7 +20,7 @@ use futures_concurrency::future::Race;
use tokio::{sync::Notify, time::sleep};
use tracing::{debug, error};
use super::{timestamp_to_datetime, ReceiveAndIngestNotifiers, SyncActors, ONE_MINUTE};
use super::{ReceiveAndIngestNotifiers, SyncActors, ONE_MINUTE};
const BATCH_SIZE: i64 = 1000;

View File

@@ -1,7 +1,6 @@
use crate::{CloudServices, Error};
use futures_concurrency::future::TryJoin;
use sd_core_sync::{SyncManager, NTP64};
use sd_core_sync::SyncManager;
use sd_actors::{ActorsCollection, IntoActor};
use sd_cloud_schema::sync::groups;
@@ -11,10 +10,10 @@ use std::{
fmt,
path::Path,
sync::{atomic::AtomicBool, Arc},
time::{Duration, SystemTime, UNIX_EPOCH},
time::Duration,
};
use chrono::{DateTime, Utc};
use futures_concurrency::future::TryJoin;
use tokio::sync::Notify;
mod ingest;
@@ -135,15 +134,3 @@ pub async fn declare_actors(
Ok(Arc::clone(&actors_state.receiver_and_ingester_notifiers))
}
fn datetime_to_timestamp(latest_time: DateTime<Utc>) -> NTP64 {
NTP64::from(
SystemTime::from(latest_time)
.duration_since(UNIX_EPOCH)
.expect("hardcoded earlier time, nothing is earlier than UNIX_EPOCH"),
)
}
fn timestamp_to_datetime(timestamp: NTP64) -> DateTime<Utc> {
DateTime::from(timestamp.to_system_time())
}

View File

@@ -22,7 +22,6 @@ use sd_prisma::prisma::PrismaClient;
use std::{
collections::{hash_map::Entry, HashMap},
future::IntoFuture,
num::NonZero,
path::Path,
pin::Pin,
sync::{
@@ -34,7 +33,7 @@ use std::{
use chrono::{DateTime, Utc};
use futures::{FutureExt, StreamExt, TryStreamExt};
use futures_concurrency::future::{Join, Race, TryJoin};
use futures_concurrency::future::{Race, TryJoin};
use quic_rpc::transport::quinn::QuinnConnection;
use reqwest::Response;
use reqwest_middleware::ClientWithMiddleware;
@@ -42,12 +41,11 @@ use serde::{Deserialize, Serialize};
use tokio::{
fs,
io::{self, AsyncRead, AsyncReadExt, ReadBuf},
spawn,
sync::{Notify, Semaphore},
sync::Notify,
time::sleep,
};
use tokio_util::io::StreamReader;
use tracing::{debug, error, instrument};
use tracing::{debug, error, instrument, warn};
use uuid::Uuid;
use super::{ReceiveAndIngestNotifiers, SyncActors, ONE_MINUTE};
@@ -62,7 +60,6 @@ pub struct Receiver {
device_pub_id: devices::PubId,
cloud_services: Arc<CloudServices>,
cloud_client: Client<QuinnConnection<Service>>,
semaphore: Arc<Semaphore>,
key_manager: Arc<KeyManager>,
sync: SyncManager,
notifiers: Arc<ReceiveAndIngestNotifiers>,
@@ -137,11 +134,6 @@ impl Receiver {
device_pub_id: devices::PubId(Uuid::from(&sync.device_pub_id)),
cloud_services,
cloud_client,
semaphore: Arc::new(Semaphore::new(
std::thread::available_parallelism()
.map(NonZero::get)
.unwrap_or(1),
)),
key_manager,
sync,
notifiers,
@@ -179,9 +171,10 @@ impl Receiver {
}
self.handle_new_messages(new_messages).await?;
self.notifiers.notify_ingester();
}
debug!("Finished sync messages receiver actor iteration");
self.keeper.save().await
}
@@ -189,35 +182,36 @@ impl Receiver {
&mut self,
new_messages: Vec<MessagesCollection>,
) -> Result<(), Error> {
let handles = new_messages
.into_iter()
.map(|message| {
let sync_group_pub_id = self.sync_group_pub_id;
let semaphore = Arc::clone(&self.semaphore);
let key_manager = Arc::clone(&self.key_manager);
let sync = self.sync.clone();
let http_client = self.cloud_services.http_client().clone();
debug!(
new_messages_collections_count = new_messages.len(),
start_time = ?new_messages.first().map(|c| c.start_time),
end_time = ?new_messages.first().map(|c| c.end_time),
"Handling new sync messages collections",
);
async move {
spawn(handle_single_message(
sync_group_pub_id,
message,
semaphore,
key_manager,
sync,
http_client,
))
.await
}
})
.collect::<Vec<_>>();
for message in new_messages.into_iter().filter(|message| {
if message.original_device_pub_id == self.device_pub_id {
warn!("Received sync message from the current device, need to check backend, this is a bug!");
false
} else {
true
}
}) {
debug!(
new_messages_count = message.operations_count,
start_time = ?message.start_time,
end_time = ?message.end_time,
"Handling new sync messages",
);
for res in handles.join().await {
let Ok(res) = res else {
return Err(Error::SyncMessagesDownloadAndDecryptTaskPanicked);
};
let (device_pub_id, timestamp) = res?;
let (device_pub_id, timestamp) = handle_single_message(
self.sync_group_pub_id,
message,
&self.key_manager,
&self.sync,
self.cloud_services.http_client(),
)
.await?;
match self.keeper.timestamps.entry(device_pub_id) {
Entry::Occupied(mut entry) => {
@@ -225,10 +219,16 @@ impl Receiver {
*entry.get_mut() = timestamp;
}
}
Entry::Vacant(entry) => {
entry.insert(timestamp);
}
}
// To ingest after each sync message collection is received, we MUST download and
// store the messages SEQUENTIALLY, otherwise we might ingest messages out of order
// due to parallel downloads
self.notifiers.notify_ingester();
}
Ok(())
@@ -249,21 +249,15 @@ async fn handle_single_message(
signed_download_link,
..
}: MessagesCollection,
semaphore: Arc<Semaphore>,
key_manager: Arc<KeyManager>,
sync: SyncManager,
http_client: ClientWithMiddleware,
key_manager: &KeyManager,
sync: &SyncManager,
http_client: &ClientWithMiddleware,
) -> Result<(devices::PubId, DateTime<Utc>), Error> {
// FIXME(@fogodev): If we don't have the key hash, we need to fetch it from another device in the group if possible
let Some(secret_key) = key_manager.get_key(sync_group_pub_id, &key_hash).await else {
return Err(Error::MissingKeyHash);
};
let _permit = semaphore
.acquire()
.await
.expect("sync messages receiver semaphore never closes");
let response = http_client
.get(signed_download_link)
.send()

View File

@@ -14,6 +14,7 @@ use sd_crypto::{
primitives::EncryptedBlock,
CryptoRng, SeedableRng,
};
use sd_utils::{datetime_to_timestamp, timestamp_to_datetime};
use std::{
future::IntoFuture,
@@ -40,11 +41,13 @@ use tokio::{
use tracing::{debug, error};
use uuid::Uuid;
use super::{datetime_to_timestamp, timestamp_to_datetime, SyncActors, ONE_MINUTE};
use super::{SyncActors, ONE_MINUTE};
const TEN_SECONDS: Duration = Duration::from_secs(10);
const THIRTY_SECONDS: Duration = Duration::from_secs(30);
const MESSAGES_COLLECTION_SIZE: u32 = 100_000;
enum RaceNotifiedOrStopped {
Notified,
Stopped,
@@ -173,7 +176,7 @@ impl Sender {
let mut crdt_ops_stream = pin!(self.sync.stream_device_ops(
&self.sync.device_pub_id,
1000,
MESSAGES_COLLECTION_SIZE,
current_latest_timestamp
));

View File

@@ -322,7 +322,7 @@ pub async fn reverse_update_directories_sizes(
)
.await?;
let to_sync_and_update = ancestors
let (sync_ops, update_queries) = ancestors
.into_values()
.filter_map(|materialized_path| {
if let Some((pub_id, size)) =
@@ -350,7 +350,9 @@ pub async fn reverse_update_directories_sizes(
})
.unzip::<_, _, Vec<_>, Vec<_>>();
sync.write_ops(db, to_sync_and_update).await?;
if !sync_ops.is_empty() && !update_queries.is_empty() {
sync.write_ops(db, (sync_ops, update_queries)).await?;
}
Ok(())
}

View File

@@ -19,6 +19,7 @@ sd-utils = { path = "../../../crates/utils" }
# Workspace dependencies
async-channel = { workspace = true }
async-stream = { workspace = true }
chrono = { workspace = true }
futures = { workspace = true }
futures-concurrency = { workspace = true }
prisma-client-rust = { workspace = true, features = ["rspc"] }

View File

@@ -171,7 +171,8 @@ async fn handle_crdt_updates(
}
db._transaction()
.with_timeout(30 * 1000)
.with_timeout(30 * 10000)
.with_max_wait(30 * 10000)
.run(|db| async move {
// fake operation to batch them all at once
ModelSyncData::from_op(CRDTOperation {
@@ -251,7 +252,8 @@ async fn handle_crdt_create_and_updates(
}
db._transaction()
.with_timeout(30 * 1000)
.with_timeout(30 * 10000)
.with_max_wait(30 * 10000)
.run(|db| async move {
// fake a create with a bunch of data rather than individual insert
ModelSyncData::from_op(CRDTOperation {
@@ -309,7 +311,8 @@ async fn handle_crdt_deletion(
};
db._transaction()
.with_timeout(30 * 1000)
.with_timeout(30 * 10000)
.with_max_wait(30 * 10000)
.run(|db| async move {
ModelSyncData::from_op(op.clone())
.ok_or(Error::InvalidModelId(model))?

View File

@@ -8,6 +8,7 @@ use sd_sync::{
CRDTOperation, CompressedCRDTOperationsPerModel, CompressedCRDTOperationsPerModelPerDevice,
ModelId, OperationFactory,
};
use sd_utils::timestamp_to_datetime;
use std::{
collections::BTreeMap,
@@ -23,7 +24,7 @@ use async_stream::stream;
use futures::Stream;
use futures_concurrency::future::TryJoin;
use tokio::sync::{broadcast, Mutex, Notify, RwLock};
use tracing::warn;
use tracing::{debug, warn};
use uhlc::{HLCBuilder, HLC};
use uuid::Uuid;
@@ -319,30 +320,36 @@ impl Manager {
.exec()
.await
{
Ok(ops) => {
if ops.is_empty() {
break;
Ok(ops) if ops.is_empty() => break,
Ok(ops) => match ops
.into_iter()
.map(from_crdt_ops)
.collect::<Result<Vec<_>, _>>()
{
Ok(ops) => {
debug!(
start_datetime = ?ops
.first()
.map(|op| timestamp_to_datetime(op.timestamp)),
end_datetime = ?ops
.last()
.map(|op| timestamp_to_datetime(op.timestamp)),
count = ops.len(),
"Streaming crdt ops",
);
if let Some(last_op) = ops.last() {
current_initial_timestamp = last_op.timestamp;
}
yield Ok(ops);
}
match ops.into_iter().map(from_crdt_ops).collect::<Result<Vec<_>, _>>() {
Ok(ops) => {
if let Some(last_op) = ops.last() {
current_initial_timestamp = last_op.timestamp;
}
yield Ok(ops);
},
Err(e) => {
yield Err(e);
break;
},
}
Err(e) => return yield Err(e),
}
Err(e) => {
yield Err(e.into());
break;
}
Err(e) => return yield Err(e.into())
}
}
}

View File

@@ -12,8 +12,10 @@ rust-version.workspace = true
sd-prisma = { path = "../prisma" }
# Workspace dependencies
chrono = { workspace = true }
prisma-client-rust = { workspace = true }
rspc = { workspace = true, features = ["unstable"] }
thiserror = { workspace = true }
tracing = { workspace = true }
uhlc = { workspace = true }
uuid = { workspace = true }

View File

@@ -27,6 +27,10 @@
#![forbid(deprecated_in_future)]
#![allow(clippy::missing_errors_doc, clippy::module_name_repetitions)]
use std::time::{SystemTime, UNIX_EPOCH};
use chrono::{DateTime, Utc};
use uhlc::NTP64;
use uuid::Uuid;
pub mod db;
@@ -104,6 +108,23 @@ macro_rules! msgpack {
}}
}
/// Helper function to convert a [`chrono::DateTime<Utc>`] to a [`uhlc::NTP64`]
#[allow(clippy::missing_panics_doc)] // Doesn't actually panic
#[must_use]
pub fn datetime_to_timestamp(latest_time: DateTime<Utc>) -> NTP64 {
NTP64::from(
SystemTime::from(latest_time)
.duration_since(UNIX_EPOCH)
.expect("hardcoded earlier time, nothing is earlier than UNIX_EPOCH"),
)
}
/// Helper function to convert a [`uhlc::NTP64`] to a [`chrono::DateTime<Utc>`]
#[must_use]
pub fn timestamp_to_datetime(timestamp: NTP64) -> DateTime<Utc> {
DateTime::from(timestamp.to_system_time())
}
// Only used for testing purposes. Do not use in production code.
use std::any::type_name;