Ingest sync ops in correct order

This commit is contained in:
Ericson Soares
2024-10-08 14:45:14 -03:00
parent f4bf50c175
commit ead5393e6e

View File

@@ -1,12 +1,16 @@
use sd_core_prisma_helpers::DevicePubId;
use sd_prisma::prisma::{crdt_operation, device, PrismaClient, SortOrder};
use sd_prisma::{
prisma::{crdt_operation, device, PrismaClient, SortOrder},
prisma_sync,
};
use sd_sync::{
CRDTOperation, CompressedCRDTOperationsPerModel, CompressedCRDTOperationsPerModelPerDevice,
OperationFactory,
ModelId, OperationFactory,
};
use std::{
collections::BTreeMap,
fmt,
num::NonZeroU128,
sync::{
@@ -135,38 +139,52 @@ impl Manager {
&self,
CompressedCRDTOperationsPerModelPerDevice(compressed_ops): CompressedCRDTOperationsPerModelPerDevice,
) -> Result<(), Error> {
// WARN: this order here exists because sync messages MUST be processed in this exact order
// due to relationship dependencies between these tables.
const INGEST_ORDER: &[ModelId] = &[
prisma_sync::device::MODEL_ID,
prisma_sync::storage_statistics::MODEL_ID,
prisma_sync::tag::MODEL_ID,
prisma_sync::location::MODEL_ID,
prisma_sync::object::MODEL_ID,
prisma_sync::exif_data::MODEL_ID,
prisma_sync::file_path::MODEL_ID,
prisma_sync::label::MODEL_ID,
prisma_sync::tag_on_object::MODEL_ID,
prisma_sync::label_on_object::MODEL_ID,
];
let _lock_guard = self.sync_lock.lock().await;
// TODO(@fogodev): I'm almost sure that we need to order better which models we process first
// due to relations between them. For example, if we process `file_path` before `object`, we
// will have issues with foreign keys, as we'll be trying to insert a `file_path` pointing to
// a `object` that doesn't exist yet.
let mut ops_fut_by_model = INGEST_ORDER
.iter()
.map(|&model_id| (model_id, vec![]))
.collect::<BTreeMap<_, _>>();
// Each `ops` vec is for an independent record, so we can process them concurrently
compressed_ops
.into_iter()
.flat_map(
|(device_pub_id, CompressedCRDTOperationsPerModel(ops_per_model))| {
ops_per_model
.into_iter()
.flat_map(move |(model_id, ops_per_record)| {
ops_per_record.into_iter().map(move |(record_id, ops)| {
process_crdt_operations(
&self.clock,
&self.timestamp_per_device,
&self.db,
device_pub_id.into(),
model_id,
record_id,
ops,
)
})
})
},
)
.collect::<Vec<_>>()
.try_join()
.await?;
for (device_pub_id, CompressedCRDTOperationsPerModel(ops_per_model)) in compressed_ops {
for (model_id, ops_per_record) in ops_per_model {
for (record_id, ops) in ops_per_record {
ops_fut_by_model
.get_mut(&model_id)
.ok_or(Error::InvalidModelId(model_id))?
.push(process_crdt_operations(
&self.clock,
&self.timestamp_per_device,
&self.db,
device_pub_id.into(),
model_id,
record_id,
ops,
));
}
}
}
for model_id in INGEST_ORDER {
if let Some(futs) = ops_fut_by_model.remove(model_id) {
futs.try_join().await?;
}
}
if self.tx.send(SyncEvent::Ingested).is_err() {
warn!("failed to send ingested message on `ingest_ops`");