diff --git a/core/crates/sync/src/manager.rs b/core/crates/sync/src/manager.rs index 4753cd5d0..ce8e6c39c 100644 --- a/core/crates/sync/src/manager.rs +++ b/core/crates/sync/src/manager.rs @@ -1,12 +1,16 @@ use sd_core_prisma_helpers::DevicePubId; -use sd_prisma::prisma::{crdt_operation, device, PrismaClient, SortOrder}; +use sd_prisma::{ + prisma::{crdt_operation, device, PrismaClient, SortOrder}, + prisma_sync, +}; use sd_sync::{ CRDTOperation, CompressedCRDTOperationsPerModel, CompressedCRDTOperationsPerModelPerDevice, - OperationFactory, + ModelId, OperationFactory, }; use std::{ + collections::BTreeMap, fmt, num::NonZeroU128, sync::{ @@ -135,38 +139,52 @@ impl Manager { &self, CompressedCRDTOperationsPerModelPerDevice(compressed_ops): CompressedCRDTOperationsPerModelPerDevice, ) -> Result<(), Error> { + // WARN: this order here exists because sync messages MUST be processed in this exact order + // due to relationship dependencies between these tables. + const INGEST_ORDER: &[ModelId] = &[ + prisma_sync::device::MODEL_ID, + prisma_sync::storage_statistics::MODEL_ID, + prisma_sync::tag::MODEL_ID, + prisma_sync::location::MODEL_ID, + prisma_sync::object::MODEL_ID, + prisma_sync::exif_data::MODEL_ID, + prisma_sync::file_path::MODEL_ID, + prisma_sync::label::MODEL_ID, + prisma_sync::tag_on_object::MODEL_ID, + prisma_sync::label_on_object::MODEL_ID, + ]; + let _lock_guard = self.sync_lock.lock().await; - // TODO(@fogodev): I'm almost sure that we need to order better which models we process first - // due to relations between them. For example, if we process `file_path` before `object`, we - // will have issues with foreign keys, as we'll be trying to insert a `file_path` pointing to - // a `object` that doesn't exist yet. + let mut ops_fut_by_model = INGEST_ORDER + .iter() + .map(|&model_id| (model_id, vec![])) + .collect::>(); - // Each `ops` vec is for an independent record, so we can process them concurrently - compressed_ops - .into_iter() - .flat_map( - |(device_pub_id, CompressedCRDTOperationsPerModel(ops_per_model))| { - ops_per_model - .into_iter() - .flat_map(move |(model_id, ops_per_record)| { - ops_per_record.into_iter().map(move |(record_id, ops)| { - process_crdt_operations( - &self.clock, - &self.timestamp_per_device, - &self.db, - device_pub_id.into(), - model_id, - record_id, - ops, - ) - }) - }) - }, - ) - .collect::>() - .try_join() - .await?; + for (device_pub_id, CompressedCRDTOperationsPerModel(ops_per_model)) in compressed_ops { + for (model_id, ops_per_record) in ops_per_model { + for (record_id, ops) in ops_per_record { + ops_fut_by_model + .get_mut(&model_id) + .ok_or(Error::InvalidModelId(model_id))? + .push(process_crdt_operations( + &self.clock, + &self.timestamp_per_device, + &self.db, + device_pub_id.into(), + model_id, + record_id, + ops, + )); + } + } + } + + for model_id in INGEST_ORDER { + if let Some(futs) = ops_fut_by_model.remove(model_id) { + futs.try_join().await?; + } + } if self.tx.send(SyncEvent::Ingested).is_err() { warn!("failed to send ingested message on `ingest_ops`");