From 4bf8db02f78a1dd79fc881a40e2bf91a0090de49 Mon Sep 17 00:00:00 2001 From: Ericson Soares Date: Tue, 27 Aug 2024 17:01:42 -0300 Subject: [PATCH] Make core compile again --- Cargo.lock | Bin 329700 -> 329727 bytes .../heavy-lifting/src/job_system/job.rs | 4 +- core/crates/prisma-helpers/src/lib.rs | 43 ++- core/crates/sync/Cargo.toml | 2 + core/crates/sync/src/backfill.rs | 110 +++---- core/crates/sync/src/db_operation.rs | 145 +++++---- core/crates/sync/src/ingest.rs | 93 +++--- core/crates/sync/src/lib.rs | 14 +- core/crates/sync/src/manager.rs | 120 +++---- core/crates/sync/tests/lib.rs | 18 +- core/crates/sync/tests/mock_instance.rs | 43 +-- core/prisma/schema.prisma | 25 +- core/src/api/cloud/libraries.rs | 106 +++--- core/src/api/cloud/library.rs | 134 -------- core/src/api/cloud/mod.rs | 7 +- core/src/api/mod.rs | 20 +- core/src/api/nodes.rs | 26 +- core/src/api/search/saved.rs | 2 +- core/src/api/sync.rs | 7 +- core/src/api/tags.rs | 2 +- core/src/cloud/sync/ingest.rs | 2 +- core/src/cloud/sync/receive.rs | 52 +-- core/src/library/config.rs | 84 +---- core/src/library/manager/error.rs | 8 +- core/src/library/manager/mod.rs | 31 +- core/src/location/manager/runner.rs | 34 +- core/src/location/mod.rs | 6 +- core/src/node/config.rs | 23 +- core/src/node/hardware.rs | 304 +++++++++++------- core/src/object/tag/mod.rs | 2 +- core/src/p2p/manager.rs | 4 +- core/src/p2p/metadata.rs | 2 +- core/src/p2p/sync/mod.rs | 2 +- core/src/volume/mod.rs | 24 +- core/src/volume/watcher.rs | 1 - crates/sync-generator/src/sync_data.rs | 2 +- crates/sync/src/factory.rs | 4 +- packages/client/src/core.ts | 55 ++-- 38 files changed, 715 insertions(+), 846 deletions(-) delete mode 100644 core/src/api/cloud/library.rs diff --git a/Cargo.lock b/Cargo.lock index d27b0c74293c5687107f23344775406cba48708f..aaee6b64cbb4bdae34599a7fe2cc4657418ce281 100644 GIT binary patch delta 65 zcmV-H0KWg^j}-rp6o7;QgaWh!r+b&Ta045c`E>&hmw<5t441ij0~D9cfC3qpAZq~& Xm(KPA2$!xb0u6@_d;_-*d<52uV0{?F delta 30 mcmez0FY=^cq@jheg=q`(^6Ke7OPTrFFH|#czfjGh-va>E6%9H7 diff --git a/core/crates/heavy-lifting/src/job_system/job.rs b/core/crates/heavy-lifting/src/job_system/job.rs index 0a3642797..785e33263 100644 --- a/core/crates/heavy-lifting/src/job_system/job.rs +++ b/core/crates/heavy-lifting/src/job_system/job.rs @@ -158,7 +158,7 @@ where JobCtx: JobContext, { fn into_job(self) -> Box> { - let id = JobId::new_v4(); + let id = JobId::now_v7(); Box::new(JobHolder { id, @@ -333,7 +333,7 @@ where } pub fn new(job: J) -> Self { - let id = JobId::new_v4(); + let id = JobId::now_v7(); Self { id, job, diff --git a/core/crates/prisma-helpers/src/lib.rs b/core/crates/prisma-helpers/src/lib.rs index 2d5abddd9..c9310aa44 100644 --- a/core/crates/prisma-helpers/src/lib.rs +++ b/core/crates/prisma-helpers/src/lib.rs @@ -34,7 +34,6 @@ use sd_utils::{from_bytes_to_uuid, uuid_to_bytes}; use std::{borrow::Cow, fmt}; use serde::{Deserialize, Serialize}; -use specta::Type; use uuid::Uuid; // File Path selectables! @@ -244,7 +243,7 @@ job::select!(job_without_data { location::select!(location_ids_and_path { id pub_id - instance_id + device_pub_id path }); @@ -259,6 +258,7 @@ impl From for location::Data { id: data.id, pub_id: data.pub_id, path: data.path, + device_pub_id: data.device_pub_id, instance_id: data.instance_id, name: data.name, total_capacity: data.total_capacity, @@ -272,6 +272,7 @@ impl From for location::Data { scan_state: data.scan_state, file_paths: None, indexer_rules: None, + device: None, instance: None, } } @@ -283,6 +284,7 @@ impl From<&location_with_indexer_rules::Data> for location::Data { id: data.id, pub_id: data.pub_id.clone(), path: data.path.clone(), + device_pub_id: data.device_pub_id.clone(), instance_id: data.instance_id, name: data.name.clone(), total_capacity: data.total_capacity, @@ -296,6 +298,7 @@ impl From<&location_with_indexer_rules::Data> for location::Data { scan_state: data.scan_state, file_paths: None, indexer_rules: None, + device: None, instance: None, } } @@ -311,7 +314,7 @@ label::include!((take: i64) => label_with_objects { } }); -#[derive(Debug, Serialize, Deserialize, Hash, PartialEq, Eq, Type)] +#[derive(Debug, Serialize, Deserialize, Hash, PartialEq, Eq, specta::Type)] #[serde(transparent)] pub struct CasId<'cas_id>(Cow<'cas_id, str>); @@ -374,17 +377,26 @@ impl From<&CasId<'_>> for String { } } -#[derive(Debug, Serialize, Deserialize, Hash, PartialEq, Eq, Clone)] +#[derive(Debug, Serialize, Deserialize, Hash, PartialEq, Eq, Clone, specta::Type)] #[serde(transparent)] #[repr(transparent)] +#[specta(rename = "CoreDevicePubId")] +pub struct DevicePubId(PubId); + +#[derive(Debug, Serialize, Deserialize, Hash, PartialEq, Eq, Clone, specta::Type)] +#[serde(transparent)] +#[repr(transparent)] +#[specta(rename = "CoreFilePathPubId")] pub struct FilePathPubId(PubId); -#[derive(Debug, Serialize, Deserialize, Hash, PartialEq, Eq, Clone)] +#[derive(Debug, Serialize, Deserialize, Hash, PartialEq, Eq, Clone, specta::Type)] #[serde(transparent)] #[repr(transparent)] +#[specta(rename = "CoreObjectPubId")] pub struct ObjectPubId(PubId); -#[derive(Debug, Serialize, Deserialize, Hash, PartialEq, Eq, Clone)] +#[derive(Debug, Serialize, Deserialize, Hash, PartialEq, Eq, Clone, specta::Type)] +#[specta(rename = "CorePubId")] enum PubId { Uuid(Uuid), Vec(Vec), @@ -392,7 +404,7 @@ enum PubId { impl PubId { fn new() -> Self { - Self::Uuid(Uuid::new_v4()) + Self::Uuid(Uuid::now_v7()) } fn to_db(&self) -> Vec { @@ -451,6 +463,15 @@ impl From for Uuid { } } +impl From<&PubId> for Uuid { + fn from(pub_id: &PubId) -> Self { + match pub_id { + PubId::Uuid(uuid) => *uuid, + PubId::Vec(bytes) => from_bytes_to_uuid(bytes), + } + } +} + impl fmt::Display for PubId { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { match self { @@ -499,6 +520,12 @@ macro_rules! delegate_pub_id { } } + impl From<&$type_name> for ::uuid::Uuid { + fn from(pub_id: &$type_name) -> Self { + (&pub_id.0).into() + } + } + impl ::std::fmt::Display for $type_name { fn fmt(&self, f: &mut ::std::fmt::Formatter<'_>) -> ::std::fmt::Result { write!(f, "{}", self.0) @@ -526,4 +553,4 @@ macro_rules! delegate_pub_id { }; } -delegate_pub_id!(FilePathPubId, ObjectPubId); +delegate_pub_id!(FilePathPubId, ObjectPubId, DevicePubId); diff --git a/core/crates/sync/Cargo.toml b/core/crates/sync/Cargo.toml index 930c3cdd0..e4f6a80fe 100644 --- a/core/crates/sync/Cargo.toml +++ b/core/crates/sync/Cargo.toml @@ -9,6 +9,8 @@ default = [] [dependencies] # Spacedrive Sub-crates +sd-core-prisma-helpers = { path = "../prisma-helpers" } + sd-actors = { path = "../../../crates/actors" } sd-prisma = { path = "../../../crates/prisma" } sd-sync = { path = "../../../crates/sync" } diff --git a/core/crates/sync/src/backfill.rs b/core/crates/sync/src/backfill.rs index 77f16a575..6a363cbf2 100644 --- a/core/crates/sync/src/backfill.rs +++ b/core/crates/sync/src/backfill.rs @@ -1,7 +1,7 @@ use sd_prisma::{ prisma::{ - crdt_operation, exif_data, file_path, instance, label, label_on_object, location, object, - tag, tag_on_object, PrismaClient, SortOrder, + crdt_operation, exif_data, file_path, label, label_on_object, location, object, tag, + tag_on_object, PrismaClient, SortOrder, }, prisma_sync, }; @@ -17,11 +17,7 @@ use super::{crdt_op_unchecked_db, Error}; /// Takes all the syncable data in the database and generates [`CRDTOperations`] for it. /// This is a requirement before the library can sync. -pub async fn backfill_operations( - db: &PrismaClient, - sync: &crate::Manager, - instance_id: instance::id::Type, -) -> Result<(), Error> { +pub async fn backfill_operations(db: &PrismaClient, sync: &crate::Manager) -> Result<(), Error> { let lock = sync.timestamp_lock.lock().await; let res = db @@ -31,18 +27,20 @@ pub async fn backfill_operations( debug!("backfill started"); let start = Instant::now(); db.crdt_operation() - .delete_many(vec![crdt_operation::instance_id::equals(instance_id)]) + .delete_many(vec![crdt_operation::device_pub_id::equals( + sync.device_pub_id.to_db(), + )]) .exec() .await?; - paginate_tags(&db, sync, instance_id).await?; - paginate_locations(&db, sync, instance_id).await?; - paginate_objects(&db, sync, instance_id).await?; - paginate_exif_datas(&db, sync, instance_id).await?; - paginate_file_paths(&db, sync, instance_id).await?; - paginate_tags_on_objects(&db, sync, instance_id).await?; - paginate_labels(&db, sync, instance_id).await?; - paginate_labels_on_objects(&db, sync, instance_id).await?; + paginate_tags(&db, sync).await?; + paginate_locations(&db, sync).await?; + paginate_objects(&db, sync).await?; + paginate_exif_datas(&db, sync).await?; + paginate_file_paths(&db, sync).await?; + paginate_tags_on_objects(&db, sync).await?; + paginate_labels(&db, sync).await?; + paginate_labels_on_objects(&db, sync).await?; debug!(elapsed = ?start.elapsed(), "backfill ended"); @@ -112,13 +110,11 @@ where } #[instrument(skip(db, sync), err)] -async fn paginate_tags( - db: &PrismaClient, - sync: &crate::Manager, - instance_id: instance::id::Type, -) -> Result<(), Error> { +async fn paginate_tags(db: &PrismaClient, sync: &crate::Manager) -> Result<(), Error> { use tag::{color, date_created, date_modified, id, name}; + let device_pub_id = &sync.device_pub_id; + paginate( |cursor| { db.tag() @@ -143,7 +139,7 @@ async fn paginate_tags( ), ) }) - .map(|o| crdt_op_unchecked_db(&o, instance_id)) + .map(|o| crdt_op_unchecked_db(&o, device_pub_id)) .collect::, _>>() .map(|creates| db.crdt_operation().create_many(creates).exec()) }, @@ -152,16 +148,14 @@ async fn paginate_tags( } #[instrument(skip(db, sync), err)] -async fn paginate_locations( - db: &PrismaClient, - sync: &crate::Manager, - instance_id: instance::id::Type, -) -> Result<(), Error> { +async fn paginate_locations(db: &PrismaClient, sync: &crate::Manager) -> Result<(), Error> { use location::{ available_capacity, date_created, generate_preview_media, hidden, id, include, instance, is_archived, name, path, size_in_bytes, sync_preview_media, total_capacity, }; + let device_pub_id = &sync.device_pub_id; + paginate( |cursor| { db.location() @@ -209,7 +203,7 @@ async fn paginate_locations( ), ) }) - .map(|o| crdt_op_unchecked_db(&o, instance_id)) + .map(|o| crdt_op_unchecked_db(&o, device_pub_id)) .collect::, _>>() .map(|creates| db.crdt_operation().create_many(creates).exec()) }, @@ -218,13 +212,11 @@ async fn paginate_locations( } #[instrument(skip(db, sync), err)] -async fn paginate_objects( - db: &PrismaClient, - sync: &crate::Manager, - instance_id: instance::id::Type, -) -> Result<(), Error> { +async fn paginate_objects(db: &PrismaClient, sync: &crate::Manager) -> Result<(), Error> { use object::{date_accessed, date_created, favorite, hidden, id, important, kind, note}; + let device_pub_id = &sync.device_pub_id; + paginate( |cursor| { db.object() @@ -254,7 +246,7 @@ async fn paginate_objects( ), ) }) - .map(|o| crdt_op_unchecked_db(&o, instance_id)) + .map(|o| crdt_op_unchecked_db(&o, device_pub_id)) .collect::, _>>() .map(|creates| db.crdt_operation().create_many(creates).exec()) }, @@ -263,16 +255,14 @@ async fn paginate_objects( } #[instrument(skip(db, sync), err)] -async fn paginate_exif_datas( - db: &PrismaClient, - sync: &crate::Manager, - instance_id: instance::id::Type, -) -> Result<(), Error> { +async fn paginate_exif_datas(db: &PrismaClient, sync: &crate::Manager) -> Result<(), Error> { use exif_data::{ artist, camera_data, copyright, description, epoch_time, exif_version, id, include, media_date, media_location, resolution, }; + let device_pub_id = &sync.device_pub_id; + paginate( |cursor| { db.exif_data() @@ -311,7 +301,7 @@ async fn paginate_exif_datas( ), ) }) - .map(|o| crdt_op_unchecked_db(&o, instance_id)) + .map(|o| crdt_op_unchecked_db(&o, device_pub_id)) .collect::, _>>() .map(|creates| db.crdt_operation().create_many(creates).exec()) }, @@ -320,16 +310,14 @@ async fn paginate_exif_datas( } #[instrument(skip(db, sync), err)] -async fn paginate_file_paths( - db: &PrismaClient, - sync: &crate::Manager, - instance_id: instance::id::Type, -) -> Result<(), Error> { +async fn paginate_file_paths(db: &PrismaClient, sync: &crate::Manager) -> Result<(), Error> { use file_path::{ cas_id, date_created, date_indexed, date_modified, extension, hidden, id, include, inode, integrity_checksum, is_dir, location, materialized_path, name, object, size_in_bytes_bytes, }; + let device_pub_id = &sync.device_pub_id; + paginate( |cursor| { db.file_path() @@ -379,7 +367,7 @@ async fn paginate_file_paths( ), ) }) - .map(|o| crdt_op_unchecked_db(&o, instance_id)) + .map(|o| crdt_op_unchecked_db(&o, device_pub_id)) .collect::, _>>() .map(|creates| db.crdt_operation().create_many(creates).exec()) }, @@ -388,13 +376,11 @@ async fn paginate_file_paths( } #[instrument(skip(db, sync), err)] -async fn paginate_tags_on_objects( - db: &PrismaClient, - sync: &crate::Manager, - instance_id: instance::id::Type, -) -> Result<(), Error> { +async fn paginate_tags_on_objects(db: &PrismaClient, sync: &crate::Manager) -> Result<(), Error> { use tag_on_object::{date_created, include, object_id, tag_id}; + let device_pub_id = &sync.device_pub_id; + paginate_relation( |group_id, item_id| { db.tag_on_object() @@ -427,7 +413,7 @@ async fn paginate_tags_on_objects( ), ) }) - .map(|o| crdt_op_unchecked_db(&o, instance_id)) + .map(|o| crdt_op_unchecked_db(&o, device_pub_id)) .collect::, _>>() .map(|creates| db.crdt_operation().create_many(creates).exec()) }, @@ -436,13 +422,11 @@ async fn paginate_tags_on_objects( } #[instrument(skip(db, sync), err)] -async fn paginate_labels( - db: &PrismaClient, - sync: &crate::Manager, - instance_id: instance::id::Type, -) -> Result<(), Error> { +async fn paginate_labels(db: &PrismaClient, sync: &crate::Manager) -> Result<(), Error> { use label::{date_created, date_modified, id}; + let device_pub_id = &sync.device_pub_id; + paginate( |cursor| { db.label() @@ -466,7 +450,7 @@ async fn paginate_labels( ), ) }) - .map(|o| crdt_op_unchecked_db(&o, instance_id)) + .map(|o| crdt_op_unchecked_db(&o, device_pub_id)) .collect::, _>>() .map(|creates| db.crdt_operation().create_many(creates).exec()) }, @@ -475,13 +459,11 @@ async fn paginate_labels( } #[instrument(skip(db, sync), err)] -async fn paginate_labels_on_objects( - db: &PrismaClient, - sync: &crate::Manager, - instance_id: instance::id::Type, -) -> Result<(), Error> { +async fn paginate_labels_on_objects(db: &PrismaClient, sync: &crate::Manager) -> Result<(), Error> { use label_on_object::{date_created, include, label_id, object_id}; + let device_pub_id = &sync.device_pub_id; + paginate_relation( |group_id, item_id| { db.label_on_object() @@ -511,7 +493,7 @@ async fn paginate_labels_on_objects( [sync_entry!(l_o.date_created, date_created)], ) }) - .map(|o| crdt_op_unchecked_db(&o, instance_id)) + .map(|o| crdt_op_unchecked_db(&o, device_pub_id)) .collect::, _>>() .map(|creates| db.crdt_operation().create_many(creates).exec()) }, diff --git a/core/crates/sync/src/db_operation.rs b/core/crates/sync/src/db_operation.rs index 858788f18..2e5b883b2 100644 --- a/core/crates/sync/src/db_operation.rs +++ b/core/crates/sync/src/db_operation.rs @@ -1,82 +1,13 @@ -use sd_prisma::prisma::{cloud_crdt_operation, crdt_operation, instance, PrismaClient}; +use sd_core_prisma_helpers::DevicePubId; + +use sd_prisma::prisma::{cloud_crdt_operation, crdt_operation, device, PrismaClient}; use sd_sync::CRDTOperation; -use sd_utils::from_bytes_to_uuid; use tracing::instrument; use uhlc::NTP64; -use uuid::Uuid; use super::Error; -crdt_operation::include!(crdt_with_instance { - instance: select { pub_id } -}); - -cloud_crdt_operation::include!(cloud_crdt_with_instance { - instance: select { pub_id } -}); - -impl crdt_with_instance::Data { - #[allow(clippy::cast_sign_loss)] // SAFETY: we had to store using i64 due to SQLite limitations - pub const fn timestamp(&self) -> NTP64 { - NTP64(self.timestamp as u64) - } - - pub fn instance(&self) -> Uuid { - from_bytes_to_uuid(&self.instance.pub_id) - } - - pub fn into_operation(self) -> Result { - Ok(CRDTOperation { - device_pub_id: self.instance(), - timestamp: self.timestamp(), - record_id: rmp_serde::from_slice(&self.record_id)?, - - model_id: { - #[allow(clippy::cast_possible_truncation, clippy::cast_sign_loss)] - // SAFETY: we will not have more than 2^16 models and we had to store using signed - // integers due to SQLite limitations - { - self.model as u16 - } - }, - data: rmp_serde::from_slice(&self.data)?, - }) - } -} - -impl cloud_crdt_with_instance::Data { - #[allow(clippy::cast_sign_loss)] // SAFETY: we had to store using i64 due to SQLite limitations - pub const fn timestamp(&self) -> NTP64 { - NTP64(self.timestamp as u64) - } - - pub fn instance(&self) -> Uuid { - from_bytes_to_uuid(&self.instance.pub_id) - } - - #[instrument(skip(self), err)] - pub fn into_operation(self) -> Result<(i32, CRDTOperation), Error> { - Ok(( - self.id, - CRDTOperation { - device_pub_id: self.instance(), - timestamp: self.timestamp(), - record_id: rmp_serde::from_slice(&self.record_id)?, - model_id: { - #[allow(clippy::cast_possible_truncation, clippy::cast_sign_loss)] - // SAFETY: we will not have more than 2^16 models and we had to store using signed - // integers due to SQLite limitations - { - self.model as u16 - } - }, - data: rmp_serde::from_slice(&self.data)?, - }, - )) - } -} - #[instrument(skip(op, db), err)] pub async fn write_crdt_op_to_db(op: &CRDTOperation, db: &PrismaClient) -> Result<(), Error> { crdt_operation::Create { @@ -87,7 +18,7 @@ pub async fn write_crdt_op_to_db(op: &CRDTOperation, db: &PrismaClient) -> Resul op.timestamp.0 as i64 } }, - instance: instance::pub_id::equals(op.device_pub_id.as_bytes().to_vec()), + device: device::pub_id::equals(op.device_pub_id.as_bytes().to_vec()), kind: op.kind().to_string(), data: rmp_serde::to_vec(&op.data)?, model: i32::from(op.model_id), @@ -100,3 +31,71 @@ pub async fn write_crdt_op_to_db(op: &CRDTOperation, db: &PrismaClient) -> Resul .await .map_or_else(|e| Err(e.into()), |_| Ok(())) } + +pub fn into_ops( + crdt_operation::Data { + timestamp, + model, + record_id, + data, + device_pub_id, + .. + }: crdt_operation::Data, +) -> Result { + Ok(CRDTOperation { + device_pub_id: DevicePubId::from(device_pub_id).into(), + timestamp: { + #[allow(clippy::cast_sign_loss)] + { + // SAFETY: we had to store using i64 due to SQLite limitations + NTP64(timestamp as u64) + } + }, + model_id: { + #[allow(clippy::cast_possible_truncation, clippy::cast_sign_loss)] + { + // SAFETY: we will not have more than 2^16 models and we had to store using signed + // integers due to SQLite limitations + model as u16 + } + }, + record_id: rmp_serde::from_slice(&record_id)?, + data: rmp_serde::from_slice(&data)?, + }) +} + +pub fn into_cloud_ops( + cloud_crdt_operation::Data { + id, + timestamp, + model, + record_id, + data, + device_pub_id, + .. + }: cloud_crdt_operation::Data, +) -> Result<(cloud_crdt_operation::id::Type, CRDTOperation), Error> { + Ok(( + id, + CRDTOperation { + device_pub_id: DevicePubId::from(device_pub_id).into(), + timestamp: { + #[allow(clippy::cast_sign_loss)] + { + // SAFETY: we had to store using i64 due to SQLite limitations + NTP64(timestamp as u64) + } + }, + model_id: { + #[allow(clippy::cast_possible_truncation, clippy::cast_sign_loss)] + { + // SAFETY: we will not have more than 2^16 models and we had to store using signed + // integers due to SQLite limitations + model as u16 + } + }, + record_id: rmp_serde::from_slice(&record_id)?, + data: rmp_serde::from_slice(&data)?, + }, + )) +} diff --git a/core/crates/sync/src/ingest.rs b/core/crates/sync/src/ingest.rs index 08a06a75d..e5fa60050 100644 --- a/core/crates/sync/src/ingest.rs +++ b/core/crates/sync/src/ingest.rs @@ -1,10 +1,12 @@ +use sd_core_prisma_helpers::DevicePubId; + use sd_prisma::{ prisma::{crdt_operation, PrismaClient, SortOrder}, prisma_sync::ModelSyncData, }; use sd_sync::{ CRDTOperation, CRDTOperationData, CompressedCRDTOperation, - CompressedCRDTOperationsPerModelPerDevice, OperationKind, + CompressedCRDTOperationsPerModelPerDevice, ModelId, OperationKind, }; use std::{ @@ -40,7 +42,7 @@ use super::{ /// Stuff that can be handled outside the actor pub enum Request { Messages { - timestamps: Vec<(Uuid, NTP64)>, + timestamps: Vec<(DevicePubId, NTP64)>, tx: oneshot::Sender<()>, }, FinishedIngesting, @@ -137,7 +139,7 @@ impl Actor { .read() .await .iter() - .map(|(&uid, ×tamp)| (uid, timestamp)) + .map(|(uid, ×tamp)| (uid.clone(), timestamp)) .collect(); if self @@ -193,11 +195,11 @@ impl Actor { "Ingesting operations;", ); - for (instance, data) in event.messages.0 { + for (device_pub_id, data) in event.messages.0 { for (model, data) in data { for (record, ops) in data { if let Err(e) = self - .process_crdt_operations(instance, model, record, ops) + .process_crdt_operations(device_pub_id.into(), model, record, ops) .await { error!(?e, "Failed to ingest CRDT operations;"); @@ -268,7 +270,7 @@ impl Actor { #[instrument(skip(self, ops), fields(operations_count = %ops.len()), err)] async fn process_crdt_operations( &mut self, - instance: Uuid, + device_pub_id: DevicePubId, model: u16, record_id: rmpv::Value, mut ops: Vec, @@ -284,7 +286,9 @@ impl Actor { self.clock .update_with_timestamp(&Timestamp::new( new_timestamp, - uhlc::ID::from(NonZeroU128::new(instance.to_u128_le()).expect("Non zero id")), + uhlc::ID::from( + NonZeroU128::new(Uuid::from(&device_pub_id).to_u128_le()).expect("Non zero id"), + ), )) .expect("timestamp has too much drift!"); @@ -293,7 +297,7 @@ impl Actor { .timestamp_per_device .read() .await - .get(&instance) + .get(&device_pub_id) .copied(); // Delete - ignores all other messages @@ -303,7 +307,7 @@ impl Actor { .find(|op| matches!(op.data, CRDTOperationData::Delete)) { trace!("Deleting operation"); - handle_crdt_deletion(db, instance, model, record_id, delete_op).await?; + handle_crdt_deletion(db, &device_pub_id, model, record_id, delete_op).await?; } // Create + > 0 Update - overwrites the create's data with the updates else if let Some(timestamp) = ops @@ -330,7 +334,8 @@ impl Actor { return Ok(()); } - handle_crdt_create_and_updates(db, instance, model, record_id, ops, timestamp).await?; + handle_crdt_create_and_updates(db, &device_pub_id, model, record_id, ops, timestamp) + .await?; } // > 0 Update - batches updates with a fake Create op else { @@ -387,7 +392,7 @@ impl Actor { return Ok(()); } - handle_crdt_updates(db, instance, model, record_id, data, updates).await?; + handle_crdt_updates(db, &device_pub_id, model, record_id, data, updates).await?; } // update the stored timestamp for this instance - will be derived from the crdt operations table on restart @@ -396,7 +401,7 @@ impl Actor { self.timestamp_per_device .write() .await - .insert(instance, new_ts); + .insert(device_pub_id, new_ts); Ok(()) } @@ -404,13 +409,14 @@ impl Actor { async fn handle_crdt_updates( db: &PrismaClient, - instance: Uuid, + device_pub_id: &DevicePubId, model: u16, record_id: rmpv::Value, mut data: BTreeMap, updates: Vec>, ) -> Result<(), Error> { let keys = data.keys().cloned().collect::>(); + let device_pub_id = sd_sync::DevicePubId::from(device_pub_id); // does the same thing as processing ops one-by-one and returning early if a newer op was found for (update, key) in updates.into_iter().zip(keys) { @@ -424,7 +430,7 @@ async fn handle_crdt_updates( .run(|db| async move { // fake operation to batch them all at once ModelSyncData::from_op(CRDTOperation { - device_pub_id: instance, + device_pub_id, model_id: model, record_id: record_id.clone(), timestamp: NTP64(0), @@ -447,7 +453,7 @@ async fn handle_crdt_updates( async move { write_crdt_op_to_db( &CRDTOperation { - device_pub_id: instance, + device_pub_id, model_id: model, record_id, timestamp, @@ -468,23 +474,24 @@ async fn handle_crdt_updates( async fn handle_crdt_create_and_updates( db: &PrismaClient, - instance: Uuid, - model: u16, + device_pub_id: &DevicePubId, + model_id: ModelId, record_id: rmpv::Value, ops: Vec, timestamp: NTP64, ) -> Result<(), Error> { let mut data = BTreeMap::new(); + let device_pub_id = sd_sync::DevicePubId::from(device_pub_id); let mut applied_ops = vec![]; // search for all Updates until a Create is found - for op in ops.iter().rev() { + for op in ops.into_iter().rev() { match &op.data { CRDTOperationData::Delete => unreachable!("Delete can't exist here!"), CRDTOperationData::Create(create_data) => { for (k, v) in create_data { - data.entry(k).or_insert(v); + data.entry(k.clone()).or_insert_with(|| v.clone()); } applied_ops.push(op); @@ -492,8 +499,8 @@ async fn handle_crdt_create_and_updates( break; } CRDTOperationData::Update { field, value } => { + data.insert(field.clone(), value.clone()); applied_ops.push(op); - data.insert(field, value); } } } @@ -503,35 +510,33 @@ async fn handle_crdt_create_and_updates( .run(|db| async move { // fake a create with a bunch of data rather than individual insert ModelSyncData::from_op(CRDTOperation { - device_pub_id: instance, - model_id: model, + device_pub_id, + model_id, record_id: record_id.clone(), timestamp, - data: CRDTOperationData::Create( - data.into_iter() - .map(|(k, v)| (k.clone(), v.clone())) - .collect(), - ), + data: CRDTOperationData::Create(data), }) - .ok_or(Error::InvalidModelId(model))? + .ok_or(Error::InvalidModelId(model_id))? .exec(&db) .await?; applied_ops .into_iter() - .map(|op| { + .map(|CompressedCRDTOperation { timestamp, data }| { let record_id = record_id.clone(); let db = &db; async move { - let operation = CRDTOperation { - device_pub_id: instance, - model_id: model, - record_id, - timestamp: op.timestamp, - data: op.data.clone(), - }; - - write_crdt_op_to_db(&operation, db).await + write_crdt_op_to_db( + &CRDTOperation { + device_pub_id, + timestamp, + model_id, + record_id, + data, + }, + db, + ) + .await } }) .collect::>() @@ -544,14 +549,14 @@ async fn handle_crdt_create_and_updates( async fn handle_crdt_deletion( db: &PrismaClient, - instance: Uuid, + device_pub_id: &DevicePubId, model: u16, record_id: rmpv::Value, delete_op: &CompressedCRDTOperation, ) -> Result<(), Error> { // deletes are the be all and end all, no need to check anything let op = CRDTOperation { - device_pub_id: instance, + device_pub_id: device_pub_id.into(), model_id: model, record_id, timestamp: delete_op.timestamp, @@ -586,7 +591,7 @@ pub struct Handler { #[derive(Debug)] pub struct MessagesEvent { - pub instance_id: Uuid, + pub device_pub_id: DevicePubId, pub messages: CompressedCRDTOperationsPerModelPerDevice, pub has_more: bool, pub wait_tx: Option>, @@ -608,13 +613,13 @@ mod test { use super::*; async fn new_actor() -> (Handler, Arc) { - let instance = Uuid::new_v4(); + let device_pub_id = Uuid::now_v7(); let shared = Arc::new(SharedState { db: sd_prisma::test_db().await, - instance, + device_pub_id: device_pub_id.into(), clock: HLCBuilder::new() .with_id(uhlc::ID::from( - NonZeroU128::new(instance.to_u128_le()).expect("Non zero id"), + NonZeroU128::new(device_pub_id.to_u128_le()).expect("Non zero id"), )) .build(), timestamp_per_device: Arc::default(), diff --git a/core/crates/sync/src/lib.rs b/core/crates/sync/src/lib.rs index 118a343b6..1297e48cd 100644 --- a/core/crates/sync/src/lib.rs +++ b/core/crates/sync/src/lib.rs @@ -27,7 +27,7 @@ #![forbid(deprecated_in_future)] #![allow(clippy::missing_errors_doc, clippy::module_name_repetitions)] -use sd_prisma::prisma::{crdt_operation, instance, PrismaClient}; +use sd_prisma::prisma::{crdt_operation, device, PrismaClient}; use sd_sync::CRDTOperation; use std::{ @@ -36,7 +36,6 @@ use std::{ }; use tokio::sync::{Notify, RwLock}; -use uuid::Uuid; mod actor; pub mod backfill; @@ -54,13 +53,14 @@ pub enum SyncMessage { Created, } -pub type DevicePubId = Uuid; +pub use sd_core_prisma_helpers::DevicePubId; + pub type TimestampPerDevice = Arc>>; pub struct SharedState { pub db: Arc, pub emit_messages_flag: Arc, - pub instance: Uuid, + pub device_pub_id: DevicePubId, pub timestamp_per_device: TimestampPerDevice, pub clock: uhlc::HLC, pub active: AtomicBool, @@ -106,7 +106,7 @@ pub fn crdt_op_db(op: &CRDTOperation) -> Result { op.timestamp.as_u64() as i64 } }, - instance: instance::pub_id::equals(op.device_pub_id.as_bytes().to_vec()), + device: device::pub_id::equals(op.device_pub_id.as_bytes().to_vec()), kind: op.kind().to_string(), data: rmp_serde::to_vec(&op.data)?, model: i32::from(op.model_id), @@ -117,7 +117,7 @@ pub fn crdt_op_db(op: &CRDTOperation) -> Result { pub fn crdt_op_unchecked_db( op: &CRDTOperation, - instance_id: i32, + device_pub_id: &DevicePubId, ) -> Result { Ok(crdt_operation::CreateUnchecked { timestamp: { @@ -127,7 +127,7 @@ pub fn crdt_op_unchecked_db( op.timestamp.as_u64() as i64 } }, - instance_id, + device_pub_id: device_pub_id.to_db(), kind: op.kind().to_string(), data: rmp_serde::to_vec(&op.data)?, model: i32::from(op.model_id), diff --git a/core/crates/sync/src/manager.rs b/core/crates/sync/src/manager.rs index 0ab5b76ed..1419d810e 100644 --- a/core/crates/sync/src/manager.rs +++ b/core/crates/sync/src/manager.rs @@ -1,7 +1,8 @@ -use sd_prisma::prisma::{cloud_crdt_operation, crdt_operation, instance, PrismaClient, SortOrder}; +use sd_core_prisma_helpers::DevicePubId; + +use sd_prisma::prisma::{cloud_crdt_operation, crdt_operation, device, PrismaClient, SortOrder}; use sd_sync::{CRDTOperation, OperationFactory}; -use sd_utils::{from_bytes_to_uuid, uuid_to_bytes}; -use tracing::warn; +use sd_utils::from_bytes_to_uuid; use std::{ cmp, fmt, @@ -15,12 +16,13 @@ use std::{ use prisma_client_rust::{and, operator::or}; use tokio::sync::{broadcast, Mutex, Notify, RwLock}; +use tracing::warn; use uhlc::{HLCBuilder, HLC}; use uuid::Uuid; use super::{ crdt_op_db, - db_operation::{cloud_crdt_with_instance, crdt_with_instance}, + db_operation::{into_cloud_ops, into_ops}, ingest, Error, SharedState, SyncMessage, NTP64, }; @@ -40,7 +42,7 @@ impl fmt::Debug for Manager { #[derive(serde::Serialize, serde::Deserialize, Debug, PartialEq, Eq)] pub struct GetOpsArgs { - pub timestamp_per_device: Vec<(Uuid, NTP64)>, + pub timestamp_per_device: Vec<(DevicePubId, NTP64)>, pub count: u32, } @@ -49,17 +51,17 @@ impl Manager { /// Sync messages are received on the returned [`broadcast::Receiver`]. pub async fn new( db: Arc, - current_instance_uuid: Uuid, + current_device_pub_id: &DevicePubId, emit_messages_flag: Arc, actors: Arc, ) -> Result<(Self, broadcast::Receiver), Error> { - let existing_instances = db.instance().find_many(vec![]).exec().await?; + let existing_devices = db.device().find_many(vec![]).exec().await?; - Self::with_existing_instances( + Self::with_existing_devices( db, - current_instance_uuid, + current_device_pub_id, emit_messages_flag, - &existing_instances, + &existing_devices, actors, ) .await @@ -69,33 +71,35 @@ impl Manager { /// Sync messages are received on the returned [`broadcast::Receiver`]. /// /// # Panics - /// Panics if the `current_instance_id` UUID is zeroed. - pub async fn with_existing_instances( + /// Panics if the `current_device_pub_id` UUID is zeroed, which will never happen as we use `UUIDv7` for the + /// device pub id. As this version have a timestamp part, instead of being totally random. So the only + /// possible way to get zero from a `UUIDv7` is to go back in time to 1970 + pub async fn with_existing_devices( db: Arc, - current_instance_uuid: Uuid, + current_device_pub_id: &DevicePubId, emit_messages_flag: Arc, - existing_instances: &[instance::Data], + existing_devices: &[device::Data], actors: Arc, ) -> Result<(Self, broadcast::Receiver), Error> { - let timestamps = db + let latest_timestamp_per_device = db ._batch( - existing_instances + existing_devices .iter() - .map(|i| { + .map(|device| { db.crdt_operation() - .find_first(vec![crdt_operation::instance::is(vec![ - instance::id::equals(i.id), - ])]) + .find_first(vec![crdt_operation::device_pub_id::equals( + device.pub_id.clone(), + )]) .order_by(crdt_operation::timestamp::order(SortOrder::Desc)) }) .collect::>(), ) .await? .into_iter() - .zip(existing_instances) - .map(|(op, i)| { + .zip(existing_devices) + .map(|(op, device)| { ( - from_bytes_to_uuid(&i.pub_id), + DevicePubId::from(&device.pub_id), #[allow(clippy::cast_sign_loss)] // SAFETY: we had to store using i64 due to SQLite limitations NTP64(op.map(|o| o.timestamp).unwrap_or_default() as u64), @@ -107,15 +111,16 @@ impl Manager { let clock = HLCBuilder::new() .with_id(uhlc::ID::from( - NonZeroU128::new(current_instance_uuid.to_u128_le()).expect("Non zero id"), + NonZeroU128::new(Uuid::from(current_device_pub_id).to_u128_le()) + .expect("Non zero id"), )) .build(); let shared = Arc::new(SharedState { db, - instance: current_instance_uuid, + device_pub_id: current_device_pub_id.clone(), clock, - timestamp_per_device: Arc::new(RwLock::new(timestamps)), + timestamp_per_device: Arc::new(RwLock::new(latest_timestamp_per_device)), emit_messages_flag, active: AtomicBool::default(), active_notify: Notify::default(), @@ -168,7 +173,7 @@ impl Manager { .timestamp_per_device .write() .await - .insert(self.instance, last.timestamp); + .insert(self.device_pub_id.clone(), last.timestamp); } if self.tx.send(SyncMessage::Created).is_err() { @@ -216,33 +221,30 @@ impl Manager { .timestamp_per_device .write() .await - .insert(self.instance, op.timestamp); + .insert(self.device_pub_id.clone(), op.timestamp); Ok(ret) } - pub async fn get_instance_ops( + pub async fn get_device_ops( &self, count: u32, - instance_uuid: Uuid, + device_pub_id: DevicePubId, timestamp: NTP64, ) -> Result, Error> { self.db .crdt_operation() .find_many(vec![ - crdt_operation::instance::is(vec![instance::pub_id::equals(uuid_to_bytes( - &instance_uuid, - ))]), + crdt_operation::device::is(vec![device::pub_id::equals(device_pub_id.into())]), #[allow(clippy::cast_possible_wrap)] crdt_operation::timestamp::gt(timestamp.as_u64() as i64), ]) .take(i64::from(count)) .order_by(crdt_operation::timestamp::order(SortOrder::Asc)) - .include(crdt_with_instance::include()) .exec() .await? .into_iter() - .map(crdt_with_instance::Data::into_operation) + .map(into_ops) .collect() } @@ -253,10 +255,10 @@ impl Manager { .find_many(vec![or(args .timestamp_per_device .iter() - .map(|(instance_id, timestamp)| { + .map(|(device_pub_id, timestamp)| { and![ - crdt_operation::instance::is(vec![instance::pub_id::equals( - uuid_to_bytes(instance_id) + crdt_operation::device::is(vec![device::pub_id::equals( + device_pub_id.to_db() )]), crdt_operation::timestamp::gt({ #[allow(clippy::cast_possible_wrap)] @@ -267,46 +269,47 @@ impl Manager { }) ] }) - .chain([crdt_operation::instance::is_not(vec![ - instance::pub_id::in_vec( + .chain([crdt_operation::device::is_not(vec![ + device::pub_id::in_vec( args.timestamp_per_device .iter() - .map(|(instance_id, _)| uuid_to_bytes(instance_id)) + .map(|(device_pub_id, _)| device_pub_id.to_db()) .collect(), ), ])]) .collect())]) .take(i64::from(args.count)) .order_by(crdt_operation::timestamp::order(SortOrder::Asc)) - .include(crdt_with_instance::include()) .exec() .await?; - ops.sort_by(|a, b| match a.timestamp().cmp(&b.timestamp()) { - cmp::Ordering::Equal => a.instance().cmp(&b.instance()), + ops.sort_by(|a, b| match a.timestamp.cmp(&b.timestamp) { + cmp::Ordering::Equal => { + from_bytes_to_uuid(&a.device_pub_id).cmp(&from_bytes_to_uuid(&b.device_pub_id)) + } o => o, }); ops.into_iter() .take(args.count as usize) - .map(crdt_with_instance::Data::into_operation) + .map(into_ops) .collect() } pub async fn get_cloud_ops( &self, args: GetOpsArgs, - ) -> Result, Error> { + ) -> Result, Error> { let mut ops = self .db .cloud_crdt_operation() .find_many(vec![or(args .timestamp_per_device .iter() - .map(|(instance_id, timestamp)| { + .map(|(device_pub_id, timestamp)| { and![ - cloud_crdt_operation::instance::is(vec![instance::pub_id::equals( - uuid_to_bytes(instance_id) + cloud_crdt_operation::device::is(vec![device::pub_id::equals( + device_pub_id.to_db() )]), cloud_crdt_operation::timestamp::gt({ #[allow(clippy::cast_possible_wrap)] @@ -317,29 +320,30 @@ impl Manager { }) ] }) - .chain([cloud_crdt_operation::instance::is_not(vec![ - instance::pub_id::in_vec( + .chain([cloud_crdt_operation::device::is_not(vec![ + device::pub_id::in_vec( args.timestamp_per_device .iter() - .map(|(instance_id, _)| uuid_to_bytes(instance_id)) + .map(|(device_pub_id, _)| device_pub_id.to_db()) .collect(), ), ])]) .collect())]) .take(i64::from(args.count)) .order_by(cloud_crdt_operation::timestamp::order(SortOrder::Asc)) - .include(cloud_crdt_with_instance::include()) .exec() .await?; - ops.sort_by(|a, b| match a.timestamp().cmp(&b.timestamp()) { - cmp::Ordering::Equal => a.instance().cmp(&b.instance()), + ops.sort_by(|a, b| match a.timestamp.cmp(&b.timestamp) { + cmp::Ordering::Equal => { + from_bytes_to_uuid(&a.device_pub_id).cmp(&from_bytes_to_uuid(&b.device_pub_id)) + } o => o, }); ops.into_iter() .take(args.count as usize) - .map(cloud_crdt_with_instance::Data::into_operation) + .map(into_cloud_ops) .collect() } } @@ -349,8 +353,8 @@ impl OperationFactory for Manager { &self.clock } - fn get_device_pub_id(&self) -> Uuid { - self.instance + fn get_device_pub_id(&self) -> sd_sync::DevicePubId { + sd_sync::DevicePubId::from(&self.device_pub_id) } } diff --git a/core/crates/sync/tests/lib.rs b/core/crates/sync/tests/lib.rs index 45c602c51..b2a27b516 100644 --- a/core/crates/sync/tests/lib.rs +++ b/core/crates/sync/tests/lib.rs @@ -6,7 +6,7 @@ use sd_prisma::{prisma::location, prisma_sync}; use sd_sync::*; use sd_utils::{msgpack, uuid_to_bytes}; -use mock_instance::Instance; +use mock_instance::Device; use tracing::info; use tracing_test::traced_test; use uuid::Uuid; @@ -14,7 +14,7 @@ use uuid::Uuid; const MOCK_LOCATION_NAME: &str = "Location 0"; const MOCK_LOCATION_PATH: &str = "/User/Anon/Documents"; -async fn write_test_location(instance: &Instance) -> location::Data { +async fn write_test_location(instance: &Device) -> location::Data { let location_pub_id = Uuid::new_v4(); let location = instance @@ -81,7 +81,7 @@ async fn write_test_location(instance: &Instance) -> location::Data { #[tokio::test] #[traced_test] async fn writes_operations_and_rows_together() -> Result<(), Box> { - let instance = Instance::new(Uuid::new_v4()).await; + let instance = Device::new(Uuid::new_v4()).await; write_test_location(&instance).await; @@ -119,14 +119,14 @@ async fn writes_operations_and_rows_together() -> Result<(), Box Result<(), Box> { - let instance1 = Instance::new(Uuid::new_v4()).await; - let instance2 = Instance::new(Uuid::new_v4()).await; + let instance1 = Device::new(Uuid::new_v4()).await; + let instance2 = Device::new(Uuid::new_v4()).await; let mut instance2_sync_rx = instance2.sync_rx.resubscribe(); info!("Created instances!"); - Instance::pair(&instance1, &instance2).await; + Device::pair(&instance1, &instance2).await; info!("Paired instances!"); @@ -162,12 +162,12 @@ async fn operations_send_and_ingest() -> Result<(), Box> #[tokio::test] async fn no_update_after_delete() -> Result<(), Box> { - let instance1 = Instance::new(Uuid::new_v4()).await; - let instance2 = Instance::new(Uuid::new_v4()).await; + let instance1 = Device::new(Uuid::new_v4()).await; + let instance2 = Device::new(Uuid::new_v4()).await; let mut instance2_sync_rx = instance2.sync_rx.resubscribe(); - Instance::pair(&instance1, &instance2).await; + Device::pair(&instance1, &instance2).await; let location = write_test_location(&instance1).await; diff --git a/core/crates/sync/tests/mock_instance.rs b/core/crates/sync/tests/mock_instance.rs index 591b52a8a..fd1c13700 100644 --- a/core/crates/sync/tests/mock_instance.rs +++ b/core/crates/sync/tests/mock_instance.rs @@ -2,11 +2,9 @@ use sd_core_sync::*; use sd_prisma::prisma; use sd_sync::CompressedCRDTOperationsPerModelPerDevice; -use sd_utils::uuid_to_bytes; use std::sync::{atomic::AtomicBool, Arc}; -use prisma_client_rust::chrono::Utc; use tokio::{fs, spawn, sync::broadcast}; use tracing::{info, instrument, warn, Instrument}; use uuid::Uuid; @@ -16,16 +14,17 @@ fn db_path(id: Uuid) -> String { } #[derive(Clone)] -pub struct Instance { - pub id: Uuid, +pub struct Device { + pub pub_id: DevicePubId, pub db: Arc, pub sync: Arc, pub sync_rx: Arc>, } -impl Instance { +impl Device { pub async fn new(id: Uuid) -> Arc { let url = format!("file:{}", db_path(id)); + let device_pub_id = DevicePubId::from(id); let db = Arc::new( prisma::PrismaClient::_builder() @@ -37,22 +36,15 @@ impl Instance { db._db_push().await.unwrap(); - db.instance() - .create( - uuid_to_bytes(&id), - vec![], - vec![], - Utc::now().into(), - Utc::now().into(), - vec![], - ) + db.device() + .create(device_pub_id.to_db(), vec![]) .exec() .await .unwrap(); let (sync, sync_rx) = sd_core_sync::Manager::new( Arc::clone(&db), - id, + &device_pub_id, Arc::new(AtomicBool::new(true)), Default::default(), ) @@ -60,7 +52,7 @@ impl Instance { .expect("failed to create sync manager"); Arc::new(Self { - id, + pub_id: device_pub_id, db, sync: Arc::new(sync), sync_rx: Arc::new(sync_rx), @@ -68,22 +60,17 @@ impl Instance { } pub async fn teardown(&self) { - fs::remove_file(db_path(self.id)).await.unwrap(); + fs::remove_file(db_path(Uuid::from(&self.pub_id))) + .await + .unwrap(); } pub async fn pair(instance1: &Arc, instance2: &Arc) { #[instrument(skip(left, right))] - async fn half(left: &Arc, right: &Arc, context: &'static str) { + async fn half(left: &Arc, right: &Arc, context: &'static str) { left.db - .instance() - .create( - uuid_to_bytes(&right.id), - vec![], - vec![], - Utc::now().into(), - Utc::now().into(), - vec![], - ) + .device() + .create(right.pub_id.to_db(), vec![]) .exec() .await .unwrap(); @@ -137,7 +124,7 @@ impl Instance { messages, ), has_more: false, - instance_id: left.id, + device_pub_id: left.pub_id.clone(), wait_tx: None, })) .await diff --git a/core/prisma/schema.prisma b/core/prisma/schema.prisma index d1a0d679d..41d698f63 100644 --- a/core/prisma/schema.prisma +++ b/core/prisma/schema.prisma @@ -57,20 +57,20 @@ model CloudCRDTOperation { /// Devices are the owner machines connected to this library /// @shared(id: pub_id, modelId: 12) model Device { - id Int @id @default(autoincrement()) + id Int @id @default(autoincrement()) // uuid v7 - pub_id Bytes @unique - name String + pub_id Bytes @unique + name String? // Not actually NULLABLE, but we have to comply with current sync implementation BS // Enum: sd_cloud_schema::device::DeviceOS - os Int + os Int? // Not actually NULLABLE, but we have to comply with current sync implementation BS // Enum: sd_cloud_schema::device::HardwareModel - hardware_model Int + hardware_model Int? // Not actually NULLABLE, but we have to comply with current sync implementation BS // clock timestamp for sync timestamp BigInt? - date_created DateTime + date_created DateTime? // Not actually NULLABLE, but we have to comply with current sync implementation BS date_deleted DateTime? CRDTOperation CRDTOperation[] @@ -103,6 +103,7 @@ model Instance { // clock timestamp for sync timestamp BigInt? + Location Location[] @@map("instance") } @@ -168,8 +169,12 @@ model Location { scan_state Int @default(0) // Enum: sd_core::location::ScanState - device_pub_id Bytes - device Device @relation(fields: [device_pub_id], references: [pub_id], onDelete: Cascade) + device_pub_id Bytes? + device Device? @relation(fields: [device_pub_id], references: [pub_id], onDelete: Cascade) + + // this should just be a local-only cache but it's too much effort to broadcast online locations rn (@brendan) + instance_id Int? + instance Instance? @relation(fields: [instance_id], references: [id], onDelete: SetNull) file_paths FilePath[] indexer_rules IndexerRulesInLocation[] @@ -585,8 +590,8 @@ model StorageStatistics { total_capacity BigInt @default(0) available_capacity BigInt @default(0) - device_pub_id Bytes @unique - device Device @relation(fields: [device_pub_id], references: [pub_id], onDelete: Cascade) + device_pub_id Bytes? @unique + device Device? @relation(fields: [device_pub_id], references: [pub_id], onDelete: Cascade) @@map("storage_statistics") } diff --git a/core/src/api/cloud/libraries.rs b/core/src/api/cloud/libraries.rs index bbc7d3027..9848de43c 100644 --- a/core/src/api/cloud/libraries.rs +++ b/core/src/api/cloud/libraries.rs @@ -2,7 +2,9 @@ use crate::api::{utils::library, Ctx, R}; use sd_cloud_schema::{auth::AccessToken, devices, libraries}; +use futures_concurrency::future::TryJoin; use rspc::alpha::AlphaRouter; +use serde::Deserialize; use tracing::debug; use super::try_get_cloud_services_client; @@ -42,25 +44,25 @@ pub fn mount() -> AlphaRouter { }) }) .procedure("create", { - #[derive(Debug, serde::Serialize, serde::Deserialize, specta::Type)] - struct LibrariesCreateArgs { - access_token: AccessToken, - device_pub_id: devices::PubId, - } - R.with2(library()) - .mutation(|(node, library), args: LibrariesCreateArgs| async move { - let req = libraries::create::Request { - name: library.config().await.name.to_string(), - access_token: args.access_token, - pub_id: libraries::PubId(library.id), - device_pub_id: args.device_pub_id, - }; + .mutation(|(node, library), access_token: AccessToken| async move { + let (client, name, device_pub_id) = ( + try_get_cloud_services_client(&node), + async { Ok(library.config().await.name.to_string()) }, + async { Ok(devices::PubId(node.config.get().await.id.into())) }, + ) + .try_join() + .await?; + super::handle_comm_error( - try_get_cloud_services_client(&node) - .await? + client .libraries() - .create(req) + .create(libraries::create::Request { + name, + access_token, + pub_id: libraries::PubId(library.id), + device_pub_id, + }) .await, "Failed to create library;", )??; @@ -69,35 +71,59 @@ pub fn mount() -> AlphaRouter { }) }) .procedure("delete", { - R.mutation(|node, req: libraries::delete::Request| async move { - super::handle_comm_error( - try_get_cloud_services_client(&node) - .await? - .libraries() - .delete(req) - .await, - "Failed to delete library;", - )??; + R.with2(library()) + .mutation(|(node, library), access_token: AccessToken| async move { + super::handle_comm_error( + try_get_cloud_services_client(&node) + .await? + .libraries() + .delete(libraries::delete::Request { + access_token, + pub_id: libraries::PubId(library.id), + }) + .await, + "Failed to delete library;", + )??; - debug!("Deleted library"); + debug!("Deleted library"); - Ok(()) - }) + Ok(()) + }) }) .procedure("update", { - R.mutation(|node, req: libraries::update::Request| async move { - super::handle_comm_error( - try_get_cloud_services_client(&node) - .await? - .libraries() - .update(req) - .await, - "Failed to update library;", - )??; + #[derive(Deserialize, specta::Type)] + struct LibrariesUpdateArgs { + access_token: AccessToken, + name: String, + } - debug!("Updated library"); + R.with2(library()).mutation( + |(node, library), + LibrariesUpdateArgs { access_token, name }: LibrariesUpdateArgs| async move { + super::handle_comm_error( + try_get_cloud_services_client(&node) + .await? + .libraries() + .update(libraries::update::Request { + access_token, + pub_id: libraries::PubId(library.id), + name, + }) + .await, + "Failed to update library;", + )??; - Ok(()) - }) + debug!("Updated library"); + + Ok(()) + }, + ) + }) + .procedure("sync", { + R.with2(library()) + .mutation(|(_, library), _: ()| async move { + library.do_cloud_sync(); + Ok(()) + }) }) } diff --git a/core/src/api/cloud/library.rs b/core/src/api/cloud/library.rs deleted file mode 100644 index ca110c397..000000000 --- a/core/src/api/cloud/library.rs +++ /dev/null @@ -1,134 +0,0 @@ -// This file is being deprecated in favor of libraries.rs -// This is due to the migration to the new API system, but the frontend is still using this file - -use crate::{api::utils::library, invalidate_query}; - -use super::*; - -pub fn mount() -> AlphaRouter { - R.router() - .procedure("get", { - R.with2(library()) - .query(|(node, library), _: ()| async move { - // Ok( - // sd_cloud_api::library::get(node.cloud_api_config().await, library.id) - // .await?, - // ) - - Ok(()) - }) - }) - .procedure("list", { - R.query(|node, _: ()| async move { - // Ok(sd_cloud_api::library::list(node.cloud_api_config().await).await?) - Ok(()) - }) - }) - .procedure("create", { - R.with2(library()) - .mutation(|(node, library), _: ()| async move { - // let node_config = node.config.get().await; - // let cloud_library = sd_cloud_api::library::create( - // node.cloud_api_config().await, - // library.id, - // &library.config().await.name, - // library.instance_uuid, - // library.identity.to_remote_identity(), - // node_config.id, - // node_config.identity.to_remote_identity(), - // &node.p2p.peer_metadata(), - // ) - // .await?; - // node.libraries - // .edit( - // library.id, - // None, - // MaybeUndefined::Undefined, - // MaybeUndefined::Value(cloud_library.id), - // None, - // ) - // .await?; - - invalidate_query!(library, "cloud.library.get"); - - Ok(()) - }) - }) - .procedure("join", { - R.mutation(|node, library_id: Uuid| async move { - // let Some(cloud_library) = - // sd_cloud_api::library::get(node.cloud_api_config().await, library_id).await? - // else { - // return Err(rspc::Error::new( - // rspc::ErrorCode::NotFound, - // "Library not found".to_string(), - // )); - // }; - - // let library = node - // .libraries - // .create_with_uuid( - // library_id, - // LibraryName::new(cloud_library.name).map_err(|e| { - // rspc::Error::new(rspc::ErrorCode::InternalServerError, e.to_string()) - // })?, - // None, - // false, - // None, - // &node, - // true, - // ) - // .await?; - // node.libraries - // .edit( - // library.id, - // None, - // MaybeUndefined::Undefined, - // MaybeUndefined::Value(cloud_library.id), - // None, - // ) - // .await?; - - // let node_config = node.config.get().await; - // let instances = sd_cloud_api::library::join( - // node.cloud_api_config().await, - // library_id, - // library.instance_uuid, - // library.identity.to_remote_identity(), - // node_config.id, - // node_config.identity.to_remote_identity(), - // node.p2p.peer_metadata(), - // ) - // .await?; - - // for instance in instances { - // crate::cloud::sync::receive::upsert_instance( - // library.id, - // &library.db, - // &library.sync, - // &node.libraries, - // &instance.uuid, - // instance.identity, - // &instance.node_id, - // RemoteIdentity::from_str(&instance.node_remote_identity) - // .expect("malformed remote identity in the DB"), - // instance.metadata, - // ) - // .await?; - // } - - // invalidate_query!(library, "cloud.library.get"); - // invalidate_query!(library, "cloud.library.list"); - - // Ok(LibraryConfigWrapped::from_library(&library).await) - Ok(()) - }) - }) - .procedure("sync", { - R.with2(library()) - .mutation(|(_, library), _: ()| async move { - library.do_cloud_sync(); - Ok(()) - }) - }) -} diff --git a/core/src/api/cloud/mod.rs b/core/src/api/cloud/mod.rs index bc39cae49..49c5cad36 100644 --- a/core/src/api/cloud/mod.rs +++ b/core/src/api/cloud/mod.rs @@ -1,11 +1,12 @@ use crate::{node::config::NodeConfig, volume::get_volumes, Node}; +use sd_core_cloud_services::{CloudP2P, IrohSecretKey, KeyManager, QuinnConnection, UserResponse}; + use sd_cloud_schema::{ auth, error::{ClientSideError, Error}, users, Client, Service, }; -use sd_core_cloud_services::{CloudP2P, IrohSecretKey, KeyManager, QuinnConnection, UserResponse}; use sd_crypto::{CryptoRng, SeedableRng}; use std::pin::pin; @@ -14,13 +15,11 @@ use async_stream::stream; use futures::StreamExt; use rspc::alpha::AlphaRouter; use tracing::error; -use uuid::Uuid; use super::{Ctx, R}; mod devices; mod libraries; -mod library; mod locations; async fn try_get_cloud_services_client( @@ -69,7 +68,7 @@ pub(crate) fn mount() -> AlphaRouter { let (device_pub_id, name, os) = { let NodeConfig { id, name, os, .. } = node.config.get().await; - (devices::PubId(id), name, os) + (devices::PubId(id.into()), name, os) }; let mut hasher = blake3::Hasher::new(); hasher.update(device_pub_id.0.as_bytes().as_slice()); diff --git a/core/src/api/mod.rs b/core/src/api/mod.rs index 30e55aee7..7a1dd1597 100644 --- a/core/src/api/mod.rs +++ b/core/src/api/mod.rs @@ -3,13 +3,16 @@ use crate::{ library::LibraryId, node::{ config::{is_in_docker, NodeConfig, NodeConfigP2P, NodePreferences}, - get_hardware_model_name, HardwareModel, + HardwareModel, }, old_job::JobProgressEvent, Node, }; use sd_core_heavy_lifting::media_processor::ThumbKey; +use sd_core_sync::DevicePubId; + +use sd_cloud_schema::devices::DeviceOS; use sd_p2p::RemoteIdentity; use sd_prisma::prisma::file_path; @@ -20,7 +23,6 @@ use rspc::{alpha::Rspc, Config, ErrorCode}; use serde::{Deserialize, Serialize}; use specta::Type; use tracing::warn; -use uuid::Uuid; mod backups; mod cloud; @@ -86,13 +88,15 @@ pub enum BackendFeature {} #[derive(Debug, Serialize, Deserialize, Clone, Type)] pub struct SanitizedNodeConfig { /// id is a unique identifier for the current node. Each node has a public identifier (this one) and is given a local id for each library (done within the library code). - pub id: Uuid, + pub id: DevicePubId, /// name is the display name of the current node. This is set by the user and is shown in the UI. // TODO: Length validation so it can fit in DNS record pub name: String, pub identity: RemoteIdentity, pub p2p: NodeConfigP2P, pub features: Vec, pub preferences: NodePreferences, + pub os: DeviceOS, + pub hardware_model: HardwareModel, } impl From for SanitizedNodeConfig { @@ -104,6 +108,8 @@ impl From for SanitizedNodeConfig { p2p: value.p2p, features: value.features, preferences: value.preferences, + os: value.os, + hardware_model: value.hardware_model, } } } @@ -136,12 +142,11 @@ pub(crate) fn mount() -> Arc { }) .procedure("nodeState", { R.query(|node, _: ()| async move { - let device_model = get_hardware_model_name() - .unwrap_or(HardwareModel::Other) - .to_string(); + let config = SanitizedNodeConfig::from(node.config.get().await); Ok(NodeState { - config: node.config.get().await.into(), + device_model: Some(config.hardware_model.to_string()), + config, // We are taking the assumption here that this value is only used on the frontend for display purposes data_path: node .config @@ -149,7 +154,6 @@ pub(crate) fn mount() -> Arc { .to_str() .expect("Found non-UTF-8 path") .to_string(), - device_model: Some(device_model), is_in_docker: is_in_docker(), }) }) diff --git a/core/src/api/nodes.rs b/core/src/api/nodes.rs index 09fb102fb..4ade1b8a8 100644 --- a/core/src/api/nodes.rs +++ b/core/src/api/nodes.rs @@ -5,9 +5,10 @@ use crate::{ node::config::{P2PDiscoveryState, Port}, }; -use sd_prisma::prisma::{instance, location}; +use sd_prisma::prisma::location; use rspc::{alpha::AlphaRouter, ErrorCode}; +use sd_utils::uuid_to_bytes; use serde::Deserialize; use specta::Type; use tracing::error; @@ -88,27 +89,16 @@ pub(crate) fn mount() -> AlphaRouter { .procedure("listLocations", { R.with2(library()) // TODO: I don't like this. `node_id` should probs be a machine hash or something cause `node_id` is dynamic in the context of P2P and what does it mean for removable media to be owned by a node? - .query(|(_, library), node_id: Option| async move { - // Be aware multiple instances can exist on a single node. This is generally an edge case but it's possible. - let instances = library - .db - .instance() - .find_many(vec![node_id - .map(|id| instance::node_id::equals(id.as_bytes().to_vec())) - .unwrap_or(instance::id::equals( - library.config().await.instance_id, - ))]) - .exec() - .await?; - + .query(|(_, library), device_pub_id: Option| async move { Ok(library .db .location() .find_many( - instances - .into_iter() - .map(|i| location::instance_id::equals(Some(i.id))) - .collect(), + device_pub_id + .map(|id| { + vec![location::device_pub_id::equals(Some(uuid_to_bytes(&id)))] + }) + .unwrap_or_default(), ) .exec() .await? diff --git a/core/src/api/search/saved.rs b/core/src/api/search/saved.rs index e2e797765..2cedd712a 100644 --- a/core/src/api/search/saved.rs +++ b/core/src/api/search/saved.rs @@ -66,7 +66,7 @@ pub(crate) fn mount() -> AlphaRouter { |(_, library), args: Args| async move { let Library { db, sync, .. } = library.as_ref(); - let pub_id = Uuid::new_v4().as_bytes().to_vec(); + let pub_id = Uuid::now_v7().as_bytes().to_vec(); let date_created: DateTime = Utc::now().into(); let (sync_params, db_params): (Vec<_>, Vec<_>) = chain_optional_iter( diff --git a/core/src/api/sync.rs b/core/src/api/sync.rs index 00db6277b..42725d0dd 100644 --- a/core/src/api/sync.rs +++ b/core/src/api/sync.rs @@ -46,12 +46,7 @@ pub(crate) fn mount() -> AlphaRouter { return Ok(()); } - sd_core_sync::backfill::backfill_operations( - &library.db, - &library.sync, - library.config().await.instance_id, - ) - .await?; + sd_core_sync::backfill::backfill_operations(&library.db, &library.sync).await?; node.libraries .edit( diff --git a/core/src/api/tags.rs b/core/src/api/tags.rs index b951368f2..98b06ef4c 100644 --- a/core/src/api/tags.rs +++ b/core/src/api/tags.rs @@ -221,7 +221,7 @@ pub(crate) fn mount() -> AlphaRouter { .iter() .filter(|fp| fp.is_dir.unwrap_or_default() && fp.object.is_none()) .map(|fp| { - let id = uuid_to_bytes(&Uuid::new_v4()); + let id = uuid_to_bytes(&Uuid::now_v7()); sync_params.extend(sync.shared_create( prisma_sync::object::SyncId { pub_id: id.clone() }, diff --git a/core/src/cloud/sync/ingest.rs b/core/src/cloud/sync/ingest.rs index dc44ee9e6..78e5c9ae2 100644 --- a/core/src/cloud/sync/ingest.rs +++ b/core/src/cloud/sync/ingest.rs @@ -90,7 +90,7 @@ pub async fn run_actor( sync.ingest .event_tx .send(sd_core_sync::Event::Messages(MessagesEvent { - instance_id: sync.instance, + device_pub_id: sync.device_pub_id.clone(), has_more: ops.len() == OPS_PER_REQUEST as usize, messages: CompressedCRDTOperationsPerModelPerDevice::new(ops), wait_tx: Some(wait_tx) diff --git a/core/src/cloud/sync/receive.rs b/core/src/cloud/sync/receive.rs index fd2bc7d8f..3a8925a37 100644 --- a/core/src/cloud/sync/receive.rs +++ b/core/src/cloud/sync/receive.rs @@ -2,8 +2,9 @@ use crate::{library::Libraries, Node}; use futures::FutureExt; use sd_actors::Stopper; +use sd_core_sync::DevicePubId; use sd_p2p::RemoteIdentity; -use sd_prisma::prisma::{cloud_crdt_operation, instance, PrismaClient}; +use sd_prisma::prisma::{cloud_crdt_operation, device, instance, PrismaClient}; use sd_sync::CRDTOperation; use sd_utils::uuid_to_bytes; @@ -239,7 +240,7 @@ async fn write_cloud_ops_to_db( fn crdt_op_db(op: &CRDTOperation) -> cloud_crdt_operation::Create { cloud_crdt_operation::Create { timestamp: op.timestamp.0 as i64, - instance: instance::pub_id::equals(op.device_pub_id.as_bytes().to_vec()), + device: device::pub_id::equals(uuid_to_bytes(&op.device_pub_id)), kind: op.data.as_kind().to_string(), data: to_vec(&op.data).expect("unable to serialize data"), model: op.model_id as i32, @@ -247,50 +248,3 @@ fn crdt_op_db(op: &CRDTOperation) -> cloud_crdt_operation::Create { _params: vec![], } } - -#[allow(clippy::too_many_arguments)] -pub async fn upsert_instance( - library_id: Uuid, - db: &PrismaClient, - sync: &sd_core_sync::Manager, - libraries: &Libraries, - uuid: &Uuid, - identity: RemoteIdentity, - node_id: &Uuid, - node_remote_identity: RemoteIdentity, - metadata: HashMap, -) -> prisma_client_rust::Result<()> { - db.instance() - .upsert( - instance::pub_id::equals(uuid_to_bytes(uuid)), - instance::create( - uuid_to_bytes(uuid), - identity.get_bytes().to_vec(), - node_id.as_bytes().to_vec(), - Utc::now().into(), - Utc::now().into(), - vec![ - instance::node_remote_identity::set(Some( - node_remote_identity.get_bytes().to_vec(), - )), - instance::metadata::set(Some( - serde_json::to_vec(&metadata).expect("unable to serialize metadata"), - )), - ], - ), - vec![], - ) - .exec() - .await?; - - sync.timestamp_per_device - .write() - .await - .entry(*uuid) - .or_default(); - - // Called again so the new instances are picked up - libraries.update_instances_by_id(library_id).await; - - Ok(()) -} diff --git a/core/src/library/config.rs b/core/src/library/config.rs index 863f744f9..c8cb9db7d 100644 --- a/core/src/library/config.rs +++ b/core/src/library/config.rs @@ -4,7 +4,7 @@ use crate::{ }; use sd_p2p::{Identity, RemoteIdentity}; -use sd_prisma::prisma::{file_path, indexer_rule, instance, location, node, PrismaClient}; +use sd_prisma::prisma::{file_path, indexer_rule, instance, location, PrismaClient}; use sd_utils::{db::maybe_missing, error::FileIOError}; use std::{ @@ -12,7 +12,6 @@ use std::{ sync::{atomic::AtomicBool, Arc}, }; -use chrono::Utc; use int_enum::IntEnum; use prisma_client_rust::not; use serde::{Deserialize, Serialize}; @@ -167,34 +166,8 @@ impl LibraryConfig { } (LibraryConfigVersion::V2, LibraryConfigVersion::V3) => { - // The fact I have to migrate this hurts my soul - if db.node().count(vec![]).exec().await? != 1 { - return Err(LibraryConfigError::TooManyNodes); - } - - db.node() - .update_many( - vec![], - vec![node::pub_id::set(node_config.id.as_bytes().to_vec())], - ) - .exec() - .await?; - - let mut config = serde_json::from_slice::>( - &fs::read(path).await.map_err(|e| { - VersionManagerError::FileIO(FileIOError::from((path, e))) - })?, - ) - .map_err(VersionManagerError::SerdeJson)?; - - config.insert(String::from("node_id"), json!(node_config.id.to_string())); - - fs::write( - path, - &serde_json::to_vec(&config).map_err(VersionManagerError::SerdeJson)?, - ) - .await - .map_err(|e| VersionManagerError::FileIO(FileIOError::from((path, e))))?; + // Removed, can't be automatically updated + return Err(LibraryConfigError::CriticalUpdateError); } (LibraryConfigVersion::V3, LibraryConfigVersion::V4) => { @@ -255,51 +228,8 @@ impl LibraryConfig { }, (LibraryConfigVersion::V5, LibraryConfigVersion::V6) => { - let nodes = db.node().find_many(vec![]).exec().await?; - if nodes.is_empty() { - error!("6 - No nodes found... How did you even get this far? but this is fine we can fix it."); - } else if nodes.len() > 1 { - error!("6 - More than one node found in the DB... This can't be automatically reconciled!"); - return Err(LibraryConfigError::TooManyNodes); - } - - let node = nodes.first(); - let now = Utc::now().fixed_offset(); - let instance_id = Uuid::new_v4(); - - instance::Create { - pub_id: instance_id.as_bytes().to_vec(), - // WARNING: At this stage in the migration this field *should* be an `Identity` not a `RemoteIdentityOrIdentity` (as that was introduced later on). - remote_identity: node - .and_then(|n| n.identity.clone()) - .unwrap_or_else(|| Identity::new().to_bytes()), - node_id: node_config.id.as_bytes().to_vec(), - last_seen: now, - date_created: node.map(|n| n.date_created).unwrap_or_else(|| now), - _params: vec![], - } - .to_query(db) - .exec() - .await?; - - let mut config = serde_json::from_slice::>( - &fs::read(path).await.map_err(|e| { - VersionManagerError::FileIO(FileIOError::from((path, e))) - })?, - ) - .map_err(VersionManagerError::SerdeJson)?; - - config.remove("node_id"); - config.remove("identity"); - - config.insert(String::from("instance_id"), json!(instance_id.to_string())); - - fs::write( - path, - &serde_json::to_vec(&config).map_err(VersionManagerError::SerdeJson)?, - ) - .await - .map_err(|e| VersionManagerError::FileIO(FileIOError::from((path, e))))?; + // Removed, can't be automatically updated + return Err(LibraryConfigError::CriticalUpdateError); } (LibraryConfigVersion::V6, LibraryConfigVersion::V7) => { @@ -344,7 +274,7 @@ impl LibraryConfig { } (LibraryConfigVersion::V7, LibraryConfigVersion::V8) => { - let instances = db.instance().find_many(vec![]).exec().await?; + let instances = db.device().find_many(vec![]).exec().await?; let Some(instance) = instances.first() else { error!("8 - No nodes found... How did you even get this far?!"); return Err(LibraryConfigError::MissingInstance); @@ -498,6 +428,8 @@ pub enum LibraryConfigError { TooManyInstances, #[error("missing instances")] MissingInstance, + #[error("your library version can't be automatically updated, please recreate your library")] + CriticalUpdateError, #[error(transparent)] SerdeJson(#[from] serde_json::Error), diff --git a/core/src/library/manager/error.rs b/core/src/library/manager/error.rs index 3541eabfd..5a12ff221 100644 --- a/core/src/library/manager/error.rs +++ b/core/src/library/manager/error.rs @@ -1,6 +1,7 @@ use crate::{library::LibraryConfigError, location::LocationManagerError}; use sd_core_indexer_rules::seed::SeederError; +use sd_core_sync::DevicePubId; use sd_p2p::IdentityErr; use sd_utils::{ @@ -8,10 +9,9 @@ use sd_utils::{ error::{FileIOError, NonUtf8PathError}, }; -use thiserror::Error; use tracing::error; -#[derive(Error, Debug)] +#[derive(thiserror::Error, Debug)] pub enum LibraryManagerError { #[error("error serializing or deserializing the JSON in the config file: {0}")] Json(#[from] serde_json::Error), @@ -23,8 +23,6 @@ pub enum LibraryManagerError { Uuid(#[from] uuid::Error), #[error("failed to run indexer rules seeder: {0}")] IndexerRulesSeeder(#[from] SeederError), - // #[error("failed to initialize the key manager: {0}")] - // KeyManager(#[from] sd_crypto::Error), #[error("error migrating the library: {0}")] MigrationError(#[from] db::MigrationError), #[error("invalid library configuration: {0}")] @@ -39,6 +37,8 @@ pub enum LibraryManagerError { InvalidIdentity, #[error("current instance with id '{0}' was not found in the database")] CurrentInstanceNotFound(String), + #[error("current device with pub id '{0}' was not found in the database")] + CurrentDeviceNotFound(DevicePubId), #[error("missing-field: {0}")] MissingField(#[from] MissingFieldError), diff --git a/core/src/library/manager/mod.rs b/core/src/library/manager/mod.rs index b4dddb3db..476926e78 100644 --- a/core/src/library/manager/mod.rs +++ b/core/src/library/manager/mod.rs @@ -156,7 +156,7 @@ impl Libraries { description: Option, node: &Arc, ) -> Result, LibraryManagerError> { - self.create_with_uuid(Uuid::new_v4(), name, description, true, None, node, false) + self.create_with_uuid(Uuid::now_v7(), name, description, true, None, node, false) .await } @@ -206,9 +206,9 @@ impl Libraries { Some({ let identity = Identity::new(); let mut create = instance.unwrap_or_else(|| instance::Create { - pub_id: Uuid::new_v4().as_bytes().to_vec(), + pub_id: Uuid::now_v7().as_bytes().to_vec(), remote_identity: identity.to_remote_identity().get_bytes().to_vec(), - node_id: node_cfg.id.as_bytes().to_vec(), + node_id: node_cfg.id.to_db(), last_seen: now, date_created: now, _params: vec![ @@ -458,6 +458,7 @@ impl Libraries { } let node_config = node.config.get().await; + let device_pub_id = node_config.id.clone(); let config = LibraryConfig::load(config_path, &node_config, &db).await?; let instances = db.instance().find_many(vec![]).exec().await?; @@ -470,6 +471,17 @@ impl Libraries { })? .clone(); + let devices = db.device().find_many(vec![]).exec().await?; + + let device_pub_id_to_db = device_pub_id.to_db(); + if devices + .iter() + .find(|device| device.pub_id == device_pub_id_to_db) + .is_none() + { + return Err(LibraryManagerError::CurrentDeviceNotFound(device_pub_id)); + } + let identity = match instance.identity.as_ref() { Some(b) => Arc::new(Identity::from_bytes(b)?), // We are not this instance, so we don't have the private key. @@ -486,7 +498,7 @@ impl Libraries { .node_remote_identity .as_ref() .and_then(|v| RemoteIdentity::from_bytes(v).ok()); - if instance_node_id != node_config.id + if instance_node_id != Uuid::from(&node_config.id) || instance_node_remote_identity != Some(node_config.identity.to_remote_identity()) || curr_metadata != Some(node.p2p.peer_metadata()) { @@ -502,7 +514,7 @@ impl Libraries { .update( instance::id::equals(instance.id), vec![ - instance::node_id::set(node_config.id.as_bytes().to_vec()), + instance::node_id::set(node_config.id.to_db()), instance::node_remote_identity::set(Some( node_config .identity @@ -522,16 +534,13 @@ impl Libraries { // TODO: Move this reconciliation into P2P and do reconciliation of both local and remote nodes. - // let key_manager = Arc::new(KeyManager::new(vec![]).await?); - // seed_keymanager(&db, &key_manager).await?; - let actors = Default::default(); - let (sync, sync_rx) = sync::Manager::with_existing_instances( + let (sync, sync_rx) = sync::Manager::with_existing_devices( Arc::clone(&db), - instance_id, + &device_pub_id, Arc::clone(&config.generate_sync_operations), - &instances, + &devices, Arc::clone(&actors), ) .await?; diff --git a/core/src/location/manager/runner.rs b/core/src/location/manager/runner.rs index 1daa383ce..735d4b6f2 100644 --- a/core/src/location/manager/runner.rs +++ b/core/src/location/manager/runner.rs @@ -3,7 +3,7 @@ use crate::{ Node, }; -use sd_core_prisma_helpers::location_ids_and_path; +use sd_core_prisma_helpers::{location_ids_and_path, DevicePubId}; use sd_prisma::prisma::location; use sd_utils::db::maybe_missing; @@ -38,14 +38,16 @@ type LocationIdAndLibraryId = (location::id::Type, LibraryId); struct Runner { node: Arc, + device_pub_id_to_db: Option>, locations_to_check: HashMap>, locations_watched: HashMap, locations_unwatched: HashMap, forced_unwatch: HashSet, } impl Runner { - fn new(node: Arc) -> Self { + async fn new(node: Arc) -> Self { Self { + device_pub_id_to_db: Some(DevicePubId::from(node.config.get().await.id).to_db()), node, locations_to_check: HashMap::new(), locations_watched: HashMap::new(), @@ -54,13 +56,17 @@ impl Runner { } } + fn check_same_device(&self, location: &location_ids_and_path::Data) -> bool { + location.device_pub_id == self.device_pub_id_to_db + } + async fn add_location( &mut self, location_id: i32, library: Arc, ) -> Result<(), LocationManagerError> { if let Some(location) = get_location(location_id, &library).await? { - check_online(&location, &self.node, &library) + check_online(&location, &self.node, &library, &self.device_pub_id_to_db) .await .and_then(|is_online| { LocationWatcher::new(location, Arc::clone(&library), Arc::clone(&self.node)) @@ -92,8 +98,7 @@ impl Runner { let key = (location_id, library.id); if let Some(location) = get_location(location_id, &library).await? { - // TODO(N): This isn't gonna work with removable media and this will likely permanently break if the DB is restored from a backup. - if location.instance_id == Some(library.config().await.instance_id) { + if self.check_same_device(&location) { self.unwatch_location(location, library.id); self.locations_unwatched.remove(&key); self.forced_unwatch.remove(&key); @@ -101,7 +106,7 @@ impl Runner { self.drop_location( location_id, library.id, - "Dropping location from location manager, because we don't have a `local_path` anymore", + "Dropping location from location manager, because it isn't from this device", ); } } else { @@ -298,9 +303,8 @@ impl Runner { let key = (location_id, library.id); if let Some(location) = get_location(location_id, &library).await? { - // TODO(N): This isn't gonna work with removable media and this will likely permanently break if the DB is restored from a backup. - if location.instance_id == Some(library.config().await.instance_id) { - if check_online(&location, &self.node, &library).await? + if self.check_same_device(&location) { + if check_online(&location, &self.node, &library, &self.device_pub_id_to_db).await? && !self.forced_unwatch.contains(&key) { self.watch_location(location, library.id); @@ -314,7 +318,7 @@ impl Runner { location_id, library.id, "Dropping location from location manager, because \ - it isn't a location in the current node", + it isn't a location in the current device", ); self.forced_unwatch.remove(&key); } @@ -344,7 +348,7 @@ pub(super) async fn run( let mut check_locations_interval = interval(Duration::from_secs(2)); check_locations_interval.set_missed_tick_behavior(MissedTickBehavior::Skip); - let mut runner = Runner::new(node); + let mut runner = Runner::new(node).await; let mut msg_stream = pin!(( location_management_rx.map(StreamMessage::LocationManagementMessage), @@ -410,20 +414,20 @@ async fn get_location( fields(%location_id, library_id = %library.id), err, )] -pub(super) async fn check_online( +async fn check_online( location_ids_and_path::Data { id: location_id, pub_id, - instance_id, + device_pub_id, path, }: &location_ids_and_path::Data, node: &Node, library: &Library, + device_pub_id_to_db: &Option>, ) -> Result { let pub_id = Uuid::from_slice(pub_id)?; - // TODO(N): This isn't gonna work with removable media and this will likely permanently break if the DB is restored from a backup. - if *instance_id == Some(library.config().await.instance_id) { + if *device_pub_id == *device_pub_id_to_db { match fs::metadata(maybe_missing(path, "location.path")?).await { Ok(_) => { node.locations.add_online(pub_id).await; diff --git a/core/src/location/mod.rs b/core/src/location/mod.rs index d613aca4e..0e378dc7c 100644 --- a/core/src/location/mod.rs +++ b/core/src/location/mod.rs @@ -163,7 +163,7 @@ impl LocationCreateArgs { } ); - let uuid = Uuid::new_v4(); + let uuid = Uuid::now_v7(); let location = create_location( library, @@ -246,7 +246,7 @@ impl LocationCreateArgs { }, ); - let uuid = Uuid::new_v4(); + let uuid = Uuid::now_v7(); let location = create_location( library, @@ -1160,7 +1160,7 @@ pub async fn create_file_path( .unzip() }; - let pub_id = sd_utils::uuid_to_bytes(&Uuid::new_v4()); + let pub_id = sd_utils::uuid_to_bytes(&Uuid::now_v7()); let created_path = sync .write_ops( diff --git a/core/src/node/config.rs b/core/src/node/config.rs index 528938770..6f25a1f62 100644 --- a/core/src/node/config.rs +++ b/core/src/node/config.rs @@ -5,6 +5,7 @@ use crate::{ }; use sd_cloud_schema::devices::DeviceOS; +use sd_core_sync::DevicePubId; use sd_p2p::Identity; use sd_utils::error::FileIOError; @@ -27,6 +28,8 @@ use tokio::{ use tracing::error; use uuid::Uuid; +use super::HardwareModel; + /// NODE_STATE_CONFIG_NAME is the name of the file which stores the NodeState pub const NODE_STATE_CONFIG_NAME: &str = "node_state.sdconfig"; @@ -117,7 +120,7 @@ impl Default for NodeConfigP2P { #[derive(Debug, Clone, Serialize, Deserialize)] // If you are adding `specta::Type` on this your probably about to leak the P2P private key pub struct NodeConfig { /// id is a unique identifier for the current node. Each node has a public identifier (this one) and is given a local id for each library (done within the library code). - pub id: Uuid, + pub id: DevicePubId, /// name is the display name of the current node. This is set by the user and is shown in the UI. // TODO: Length validation so it can fit in DNS record pub name: String, /// core level notifications @@ -137,8 +140,10 @@ pub struct NodeConfig { pub features: Vec, /// The aggregation of many different preferences for the node pub preferences: NodePreferences, - // Operating System of the node + /// Operating System of the node pub os: DeviceOS, + /// Hardware model of the node + pub hardware_model: HardwareModel, version: NodeConfigVersion, } @@ -196,9 +201,13 @@ impl ManagedVersion for NodeConfig { name.truncate(255); let os = DeviceOS::from_env(); + let hardware_model = HardwareModel::try_get().unwrap_or_else(|e| { + error!(?e, "Failed to get hardware model"); + HardwareModel::Other + }); Some(Self { - id: Uuid::now_v7(), + id: Uuid::now_v7().into(), name, identity: Identity::default(), p2p: NodeConfigP2P::default(), @@ -207,6 +216,7 @@ impl ManagedVersion for NodeConfig { notifications: vec![], preferences: NodePreferences::default(), os, + hardware_model, }) } } @@ -342,6 +352,13 @@ impl NodeConfig { serde_json::to_value(DeviceOS::from_env()) .map_err(VersionManagerError::SerdeJson)?, ); + config.insert( + String::from("hardware_model"), + serde_json::to_value( + HardwareModel::try_get().unwrap_or(HardwareModel::Other), + ) + .map_err(VersionManagerError::SerdeJson)?, + ); config.remove("features"); config.remove("auth_token"); diff --git a/core/src/node/hardware.rs b/core/src/node/hardware.rs index 1af50b530..04873e51c 100644 --- a/core/src/node/hardware.rs +++ b/core/src/node/hardware.rs @@ -1,151 +1,209 @@ -use std::io::Error; -use std::str; +use std::io; use serde::{Deserialize, Serialize}; use specta::Type; +use strum::IntoEnumIterator; use strum_macros::{Display, EnumIter}; #[repr(i32)] #[derive(Debug, Clone, Display, Copy, EnumIter, Type, Serialize, Deserialize, Eq, PartialEq)] -#[specta(rename = "core_HardwareModel")] +#[specta(rename = "CoreHardwareModel")] pub enum HardwareModel { - Other, - MacStudio, - MacBookAir, - MacBookPro, - MacBook, - MacMini, - MacPro, - IMac, - IMacPro, - IPad, - IPhone, - Simulator, - Android, + Other = 0, + MacStudio = 1, + MacBookAir = 2, + MacBookPro = 3, + MacBook = 4, + MacMini = 5, + MacPro = 6, + IMac = 7, + IMacPro = 8, + IPad = 9, + IPhone = 10, + Simulator = 11, + Android = 12, } -impl HardwareModel { - pub fn from_display_name(name: &str) -> Self { - use strum::IntoEnumIterator; - HardwareModel::iter() +impl From for HardwareModel { + fn from(value: i32) -> Self { + match value { + 1 => Self::MacStudio, + 2 => Self::MacBookAir, + 3 => Self::MacBookPro, + 4 => Self::MacBook, + 5 => Self::MacMini, + 6 => Self::MacPro, + 7 => Self::IMac, + 8 => Self::IMacPro, + 9 => Self::IPad, + 10 => Self::IPhone, + 11 => Self::Simulator, + 12 => Self::Android, + _ => Self::Other, + } + } +} + +impl From for sd_cloud_schema::devices::HardwareModel { + fn from(model: HardwareModel) -> Self { + match model { + HardwareModel::MacStudio => Self::MacStudio, + HardwareModel::MacBookAir => Self::MacBookAir, + HardwareModel::MacBookPro => Self::MacBookPro, + HardwareModel::MacBook => Self::MacBook, + HardwareModel::MacMini => Self::MacMini, + HardwareModel::MacPro => Self::MacPro, + HardwareModel::IMac => Self::IMac, + HardwareModel::IMacPro => Self::IMacPro, + HardwareModel::IPad => Self::IPad, + HardwareModel::IPhone => Self::IPhone, + HardwareModel::Simulator => Self::Simulator, + HardwareModel::Android => Self::Android, + HardwareModel::Other => Self::Other, + } + } +} + +impl From for HardwareModel { + fn from(model: sd_cloud_schema::devices::HardwareModel) -> Self { + match model { + sd_cloud_schema::devices::HardwareModel::MacStudio => Self::MacStudio, + sd_cloud_schema::devices::HardwareModel::MacBookAir => Self::MacBookAir, + sd_cloud_schema::devices::HardwareModel::MacBookPro => Self::MacBookPro, + sd_cloud_schema::devices::HardwareModel::MacBook => Self::MacBook, + sd_cloud_schema::devices::HardwareModel::MacMini => Self::MacMini, + sd_cloud_schema::devices::HardwareModel::MacPro => Self::MacPro, + sd_cloud_schema::devices::HardwareModel::IMac => Self::IMac, + sd_cloud_schema::devices::HardwareModel::IMacPro => Self::IMacPro, + sd_cloud_schema::devices::HardwareModel::IPad => Self::IPad, + sd_cloud_schema::devices::HardwareModel::IPhone => Self::IPhone, + sd_cloud_schema::devices::HardwareModel::Simulator => Self::Simulator, + sd_cloud_schema::devices::HardwareModel::Android => Self::Android, + sd_cloud_schema::devices::HardwareModel::Other => Self::Other, + } + } +} + +impl From<&str> for HardwareModel { + fn from(name: &str) -> Self { + Self::iter() .find(|&model| { model.to_string().to_lowercase().replace(' ', "") == name.to_lowercase().replace(' ', "") }) - .unwrap_or(HardwareModel::Other) + .unwrap_or(Self::Other) } } -pub fn get_hardware_model_name() -> Result { - #[cfg(target_os = "macos")] - { - use std::process::Command; +impl HardwareModel { + pub fn try_get() -> Result { + #[cfg(target_os = "macos")] + { + use std::process::Command; - let output = Command::new("system_profiler") - .arg("SPHardwareDataType") - .output()?; + let output = Command::new("system_profiler") + .arg("SPHardwareDataType") + .output()?; - if output.status.success() { - let output_str = std::str::from_utf8(&output.stdout).unwrap_or_default(); - let hardware_model = output_str - .lines() - .find(|line| line.to_lowercase().contains("model name")) - .and_then(|line| line.split_once(':')) - .map(|(_, model_name)| HardwareModel::from_display_name(model_name.trim())) - .unwrap_or(HardwareModel::Other); + if output.status.success() { + let output_str = std::str::from_utf8(&output.stdout).unwrap_or_default(); + let hardware_model = output_str + .lines() + .find(|line| line.to_lowercase().contains("model name")) + .and_then(|line| line.split_once(':')) + .map(|(_, model_name)| model_name.trim().into()) + .unwrap_or(Self::Other); - Ok(hardware_model) - } else { - Err(Error::new( - std::io::ErrorKind::Other, - format!( - "Failed to get hardware model name: {}", - String::from_utf8_lossy(&output.stderr) - ), - )) - } - } - #[cfg(target_os = "ios")] - { - use std::ffi::CString; - use std::ptr; - - extern "C" { - fn sysctlbyname( - name: *const libc::c_char, - oldp: *mut libc::c_void, - oldlenp: *mut usize, - newp: *mut libc::c_void, - newlen: usize, - ) -> libc::c_int; - } - - fn get_device_type() -> Option { - let mut size: usize = 0; - let name = CString::new("hw.machine").expect("CString::new failed"); - - // First, get the size of the buffer needed - unsafe { - sysctlbyname( - name.as_ptr(), - ptr::null_mut(), - &mut size, - ptr::null_mut(), - 0, - ); - } - - // Allocate a buffer with the correct size - let mut buffer: Vec = vec![0; size]; - - // Get the actual machine type - unsafe { - sysctlbyname( - name.as_ptr(), - buffer.as_mut_ptr() as *mut libc::c_void, - &mut size, - ptr::null_mut(), - 0, - ); - } - - // Convert the buffer to a String - let machine_type = String::from_utf8_lossy(&buffer).trim().to_string(); - - // Check if the device is an iPad or iPhone - if machine_type.starts_with("iPad") { - Some("iPad".to_string()) - } else if machine_type.starts_with("iPhone") { - Some("iPhone".to_string()) - } else if machine_type.starts_with("arm") { - Some("Simulator".to_string()) + Ok(hardware_model) } else { - None + Err(io::Error::new( + io::ErrorKind::Other, + format!( + "Failed to get hardware model name: {}", + String::from_utf8_lossy(&output.stderr) + ), + )) + } + } + #[cfg(target_os = "ios")] + { + use std::ffi::CString; + use std::ptr; + + extern "C" { + fn sysctlbyname( + name: *const libc::c_char, + oldp: *mut libc::c_void, + oldlenp: *mut usize, + newp: *mut libc::c_void, + newlen: usize, + ) -> libc::c_int; + } + + fn get_device_type() -> Option { + let mut size: usize = 0; + let name = CString::new("hw.machine").expect("CString::new failed"); + + // First, get the size of the buffer needed + unsafe { + sysctlbyname( + name.as_ptr(), + ptr::null_mut(), + &mut size, + ptr::null_mut(), + 0, + ); + } + + // Allocate a buffer with the correct size + let mut buffer: Vec = vec![0; size]; + + // Get the actual machine type + unsafe { + sysctlbyname( + name.as_ptr(), + buffer.as_mut_ptr() as *mut libc::c_void, + &mut size, + ptr::null_mut(), + 0, + ); + } + + // Convert the buffer to a String + let machine_type = String::from_utf8_lossy(&buffer).trim().to_string(); + + // Check if the device is an iPad or iPhone + if machine_type.starts_with("iPad") { + Some("iPad".to_string()) + } else if machine_type.starts_with("iPhone") { + Some("iPhone".to_string()) + } else if machine_type.starts_with("arm") { + Some("Simulator".to_string()) + } else { + None + } + } + + if let Some(device_type) = get_device_type() { + let hardware_model = Self::from_display_name(&device_type.as_str()); + + Ok(hardware_model) + } else { + Err(Error::new( + std::io::ErrorKind::Other, + "Failed to get hardware model name", + )) } } - if let Some(device_type) = get_device_type() { - let hardware_model = HardwareModel::from_display_name(&device_type.as_str()); + #[cfg(target_os = "android")] + { + Ok(Self::Android) + } - Ok(hardware_model) - } else { - Err(Error::new( - std::io::ErrorKind::Other, - "Failed to get hardware model name", - )) + #[cfg(not(any(target_os = "macos", target_os = "ios", target_os = "android")))] + { + Ok(Self::Other) } } - - #[cfg(target_os = "android")] - { - Ok(HardwareModel::Android) - } - - #[cfg(not(any(target_os = "macos", target_os = "ios", target_os = "android")))] - { - Err(Error::new( - std::io::ErrorKind::Unsupported, - "Unsupported operating system", - )) - } } diff --git a/core/src/object/tag/mod.rs b/core/src/object/tag/mod.rs index 41b4e88bd..a8d232cf6 100644 --- a/core/src/object/tag/mod.rs +++ b/core/src/object/tag/mod.rs @@ -21,7 +21,7 @@ impl TagCreateArgs { self, Library { db, sync, .. }: &Library, ) -> Result { - let pub_id = Uuid::new_v4().as_bytes().to_vec(); + let pub_id = Uuid::now_v7().as_bytes().to_vec(); let (sync_params, db_params): (Vec<_>, Vec<_>) = [ sync_db_entry!(self.name, tag::name), diff --git a/core/src/p2p/manager.rs b/core/src/p2p/manager.rs index b986e5f3b..96ee7a264 100644 --- a/core/src/p2p/manager.rs +++ b/core/src/p2p/manager.rs @@ -1,7 +1,7 @@ use crate::{ node::{ config::{self, P2PDiscoveryState}, - get_hardware_model_name, HardwareModel, + HardwareModel, }, p2p::{ libraries::libraries_hook, operations, sync::SyncMessage, Header, OperatingSystem, @@ -208,7 +208,7 @@ impl P2PManager { PeerMetadata { name: config.name.clone(), operating_system: Some(OperatingSystem::get_os()), - device_model: Some(get_hardware_model_name().unwrap_or(HardwareModel::Other)), + device_model: Some(HardwareModel::try_get().unwrap_or(HardwareModel::Other)), version: Some(env!("CARGO_PKG_VERSION").to_string()), } .update(&mut self.p2p.metadata_mut()); diff --git a/core/src/p2p/metadata.rs b/core/src/p2p/metadata.rs index 5e03e9c7d..054eea0ca 100644 --- a/core/src/p2p/metadata.rs +++ b/core/src/p2p/metadata.rs @@ -47,7 +47,7 @@ impl PeerMetadata { .get("os") .map(|os| os.parse().map_err(|_| "Unable to parse 'OperationSystem'!")) .transpose()?, - device_model: Some(HardwareModel::from_display_name( + device_model: Some(HardwareModel::from( data.get("device_model") .map(|s| s.as_str()) .unwrap_or("Other"), diff --git a/core/src/p2p/sync/mod.rs b/core/src/p2p/sync/mod.rs index 8233a01c5..ca2ea73a5 100644 --- a/core/src/p2p/sync/mod.rs +++ b/core/src/p2p/sync/mod.rs @@ -245,7 +245,7 @@ mod responder { ingest .event_tx .send(Event::Messages(MessagesEvent { - instance_id: library.sync.instance, + device_pub_id: library.sync.device_pub_id.clone(), has_more: ops.len() == OPS_PER_REQUEST as usize, messages: ops, wait_tx: Some(wait_tx), diff --git a/core/src/volume/mod.rs b/core/src/volume/mod.rs index ada4d4ae3..d746d4d1e 100644 --- a/core/src/volume/mod.rs +++ b/core/src/volume/mod.rs @@ -515,16 +515,15 @@ fn compute_stats<'v>(volumes: impl IntoIterator) -> (u64, u64 async fn update_storage_statistics( db: &PrismaClient, sync: &SyncManager, - instance_pub_id: &Uuid, total_capacity: u64, available_capacity: u64, ) -> Result<(), VolumeError> { - let instance_pub_id = uuid_to_bytes(instance_pub_id); + let device_pub_id = sync.device_pub_id.to_db(); let storage_statistics_pub_id = db .storage_statistics() - .find_unique(storage_statistics::instance_pub_id::equals( - instance_pub_id.clone(), + .find_unique(storage_statistics::device_pub_id::equals( + device_pub_id.clone(), )) .select(storage_statistics::select!({ pub_id })) .exec() @@ -571,7 +570,7 @@ async fn update_storage_statistics( ) .await?; } else { - let new_storage_statistics_id = uuid_to_bytes(&Uuid::new_v4()); + let new_storage_statistics_id = uuid_to_bytes(&Uuid::now_v7()); sync.write_ops( db, @@ -590,8 +589,8 @@ async fn update_storage_statistics( msgpack!(available_capacity), ), ( - storage_statistics::instance_pub_id::NAME, - msgpack!(instance_pub_id), + storage_statistics::device_pub_id::NAME, + msgpack!(device_pub_id), ), ], ), @@ -601,7 +600,7 @@ async fn update_storage_statistics( vec![ storage_statistics::total_capacity::set(total_capacity as i64), storage_statistics::available_capacity::set(available_capacity as i64), - storage_statistics::instance_pub_id::set(Some(instance_pub_id.clone())), + storage_statistics::device_pub_id::set(Some(device_pub_id.clone())), ], ) // We don't need any data here, just the id avoids receiving the entire object @@ -633,14 +632,7 @@ pub fn save_storage_statistics(node: &Node) { .. } = &*library; - update_storage_statistics( - db, - sync, - instance_uuid, - total_capacity, - available_capacity, - ) - .await + update_storage_statistics(db, sync, total_capacity, available_capacity).await }) .collect::>() .join() diff --git a/core/src/volume/watcher.rs b/core/src/volume/watcher.rs index 4c71bade3..efc0556a1 100644 --- a/core/src/volume/watcher.rs +++ b/core/src/volume/watcher.rs @@ -29,7 +29,6 @@ pub fn spawn_volume_watcher(library: Arc) { if let Err(e) = super::update_storage_statistics( &library.db, &library.sync, - &library.instance_uuid, total_capacity, available_capacity, ) diff --git a/crates/sync-generator/src/sync_data.rs b/crates/sync-generator/src/sync_data.rs index 66556e752..9e9fdd937 100644 --- a/crates/sync-generator/src/sync_data.rs +++ b/crates/sync-generator/src/sync_data.rs @@ -61,7 +61,7 @@ pub fn enumerate(models: &[ModelWithSyncType<'_>]) -> TokenStream { impl ModelSyncData { pub fn from_op(op: sd_sync::CRDTOperation) -> Option { - Some(match op.model { + Some(match op.model_id { #(#matches),*, _ => return None }) diff --git a/crates/sync/src/factory.rs b/crates/sync/src/factory.rs index 8973ff14d..b029dd8aa 100644 --- a/crates/sync/src/factory.rs +++ b/crates/sync/src/factory.rs @@ -26,11 +26,9 @@ pub trait OperationFactory { id: &SId, data: CRDTOperationData, ) -> CRDTOperation { - let timestamp = self.get_clock().new_timestamp(); - CRDTOperation { device_pub_id: self.get_device_pub_id(), - timestamp: *timestamp.get_time(), + timestamp: *self.get_clock().new_timestamp().get_time(), model_id: ::MODEL_ID, record_id: msgpack!(id), data, diff --git a/packages/client/src/core.ts b/packages/client/src/core.ts index 00b339a1a..a3aa89a56 100644 --- a/packages/client/src/core.ts +++ b/packages/client/src/core.ts @@ -52,7 +52,6 @@ export type Procedures = { { key: "search.saved.get", input: LibraryArgs, result: SavedSearch | null } | { key: "search.saved.list", input: LibraryArgs, result: SavedSearch[] } | { key: "sync.enabled", input: LibraryArgs, result: boolean } | - { key: "sync.messages", input: LibraryArgs, result: CRDTOperation[] } | { key: "tags.get", input: LibraryArgs, result: Tag | null } | { key: "tags.getForObject", input: LibraryArgs, result: Tag[] } | { key: "tags.getWithObjects", input: LibraryArgs, result: { [key in number]: ({ object: { id: number }; date_created: string | null })[] } } | @@ -66,14 +65,15 @@ export type Procedures = { { key: "cloud.bootstrap", input: [AccessToken, RefreshToken], result: null } | { key: "cloud.devices.delete", input: DeviceDeleteRequest, result: null } | { key: "cloud.devices.update", input: DeviceUpdateRequest, result: null } | - { key: "cloud.libraries.create", input: LibraryArgs, result: null } | - { key: "cloud.libraries.delete", input: LibraryDeleteRequest, result: null } | - { key: "cloud.libraries.update", input: LibraryUpdateRequest, result: null } | + { key: "cloud.libraries.create", input: LibraryArgs, result: null } | + { key: "cloud.libraries.delete", input: LibraryArgs, result: null } | + { key: "cloud.libraries.update", input: LibraryArgs, result: null } | { key: "cloud.library.create", input: LibraryArgs, result: null } | { key: "cloud.library.join", input: string, result: null } | { key: "cloud.library.sync", input: LibraryArgs, result: null } | { key: "cloud.locations.create", input: LocationCreateRequest, result: null } | { key: "cloud.locations.delete", input: LocationDeleteRequest, result: null } | + { key: "cloud.userResponse", input: UserResponse, result: null } | { key: "ephemeralFiles.copyFiles", input: LibraryArgs, result: null } | { key: "ephemeralFiles.createFile", input: LibraryArgs, result: string } | { key: "ephemeralFiles.createFolder", input: LibraryArgs, result: string } | @@ -136,6 +136,7 @@ export type Procedures = { { key: "tags.update", input: LibraryArgs, result: null } | { key: "toggleFeatureFlag", input: BackendFeature, result: null }, subscriptions: + { key: "cloud.listenCloudServicesNotifications", input: never, result: NotifyUser } | { key: "invalidation.listen", input: never, result: InvalidateOperationEvent[] } | { key: "jobs.newFilePathIdentified", input: LibraryArgs, result: number[] } | { key: "jobs.newThumbnail", input: LibraryArgs, result: ThumbKey } | @@ -171,10 +172,6 @@ export type Backup = ({ id: string; timestamp: string; library_id: string; libra export type BuildInfo = { version: string; commit: string } -export type CRDTOperation = { instance: string; timestamp: number; model: number; record_id: JsonValue; data: CRDTOperationData } - -export type CRDTOperationData = { c: { [key in string]: JsonValue } } | { u: { field: string; value: JsonValue } } | "d" - export type CameraData = { device_make: string | null; device_model: string | null; color_space: string | null; color_profile: ColorProfile | null; focal_length: number | null; shutter_speed: number | null; flash: Flash | null; orientation: Orientation; lens_make: string | null; lens_model: string | null; bit_depth: number | null; zoom: number | null; iso: number | null; software: string | null; serial_number: string | null; lens_serial_number: string | null; contrast: number | null; saturation: number | null; sharpness: number | null; composite: Composite | null } export type CasId = string @@ -185,6 +182,10 @@ export type Chapter = { id: number; start: [number, number]; end: [number, numbe export type CloudLocation = { pub_id: LocationPubId; name: string; device: Device | null; library: Library | null; created_at: string; updated_at: string } +export type CloudP2PError = "Rejected" | "UnableToConnect" | "TimedOut" + +export type CloudP2PTicket = bigint + export type Codec = { kind: string | null; sub_kind: string | null; tag: string | null; name: string | null; profile: string | null; bit_rate: number; props: Props | null } export type ColorProfile = "Normal" | "Custom" | "HDRNoOriginal" | "HDRWithOriginal" | "OriginalForHDR" | "Panorama" | "PortraitHDR" | "Portrait" @@ -217,6 +218,12 @@ export type ConvertImageArgs = { location_id: number; file_path_id: number; dele export type ConvertibleExtension = "bmp" | "dib" | "ff" | "gif" | "ico" | "jpg" | "jpeg" | "png" | "pnm" | "qoi" | "tga" | "icb" | "vda" | "vst" | "tiff" | "tif" | "hif" | "heif" | "heifs" | "heic" | "heics" | "avif" | "avci" | "avcs" | "svg" | "svgz" | "pdf" | "webp" +export type CoreDevicePubId = CorePubId + +export type CoreHardwareModel = "Other" | "MacStudio" | "MacBookAir" | "MacBookPro" | "MacBook" | "MacMini" | "MacPro" | "IMac" | "IMacPro" | "IPad" | "IPhone" | "Simulator" | "Android" + +export type CorePubId = { Uuid: string } | { Vec: number[] } + export type CreateEphemeralFileArgs = { path: string; context: EphemeralFileCreateContextTypes; name: string | null } export type CreateEphemeralFolderArgs = { path: string; name: string | null } @@ -395,8 +402,14 @@ export type JobName = "Indexer" | "FileIdentifier" | "MediaProcessor" | "Copy" | export type JobProgressEvent = { id: string; library_id: string; task_count: number; completed_task_count: number; phase: string; message: string; info: string; estimated_completion: string } +export type JoinSyncGroupError = "Communication" | "InternalServer" | "Auth" + +export type JoinSyncGroupResponse = { Accepted: { authorizor_device: Device } } | { Failed: CloudP2PError } | "CriticalError" + export type JsonValue = null | boolean | number | string | JsonValue[] | { [key in string]: JsonValue } +export type KeyHash = string + export type KindStatistic = { kind: number; name: string; count: [number, number]; total_bytes: [number, number] } export type KindStatistics = { statistics: { [key in number]: KindStatistic }; total_identified_files: number; total_unidentified_files: number } @@ -405,7 +418,7 @@ export type Label = { id: number; name: string; date_created: string | null; dat export type LabelWithObjects = { id: number; name: string; date_created: string | null; date_modified: string | null; label_objects: { object: { id: number; file_paths: FilePath[] } }[] } -export type LibrariesCreateArgs = { access_token: AccessToken; device_pub_id: DevicePubId } +export type LibrariesUpdateArgs = { access_token: AccessToken; name: string } export type Library = { pub_id: LibraryPubId; name: string; original_device: Device | null; created_at: string; updated_at: string } @@ -440,8 +453,6 @@ export type LibraryConfigVersion = "V0" | "V1" | "V2" | "V3" | "V4" | "V5" | "V6 export type LibraryConfigWrapped = { uuid: string; instance_id: string; instance_public_key: RemoteIdentity; config: LibraryConfig } -export type LibraryDeleteRequest = { access_token: AccessToken; pub_id: LibraryPubId } - export type LibraryGetRequest = { access_token: AccessToken; pub_id: LibraryPubId; with_device: boolean } export type LibraryListRequest = { access_token: AccessToken; with_device: boolean } @@ -452,15 +463,13 @@ export type LibraryPreferences = { location?: { [key in string]: LocationSetting export type LibraryPubId = string -export type LibraryUpdateRequest = { access_token: AccessToken; pub_id: LibraryPubId; name: string } - export type LightScanArgs = { location_id: number; sub_path: string } export type ListenerState = { type: "Listening" } | { type: "Error"; error: string } | { type: "NotListening" } export type Listeners = { ipv4: ListenerState; ipv6: ListenerState; relay: ListenerState } -export type Location = { id: number; pub_id: number[]; name: string | null; path: string | null; total_capacity: number | null; available_capacity: number | null; size_in_bytes: number[] | null; is_archived: boolean | null; generate_preview_media: boolean | null; sync_preview_media: boolean | null; hidden: boolean | null; date_created: string | null; scan_state: number; instance_id: number | null } +export type Location = { id: number; pub_id: number[]; name: string | null; path: string | null; total_capacity: number | null; available_capacity: number | null; size_in_bytes: number[] | null; is_archived: boolean | null; generate_preview_media: boolean | null; sync_preview_media: boolean | null; hidden: boolean | null; date_created: string | null; scan_state: number; device_pub_id: number[] | null; instance_id: number | null } /** * `LocationCreateArgs` is the argument received from the client using `rspc` to create a new location. @@ -505,7 +514,7 @@ export type MediaLocation = { latitude: number; longitude: number; pluscode: Plu export type Metadata = { album: string | null; album_artist: string | null; artist: string | null; comment: string | null; composer: string | null; copyright: string | null; creation_time: string | null; date: string | null; disc: number | null; encoder: string | null; encoded_by: string | null; filename: string | null; genre: string | null; language: string | null; performer: string | null; publisher: string | null; service_name: string | null; service_provider: string | null; title: string | null; track: number | null; variant_bit_rate: number | null; custom: { [key in string]: string } } -export type MockDevice = { pub_id: DevicePubId; name: string; os: DeviceOS; used_storage: bigint; storage_size: bigint; created_at: string; updated_at: string; device_model: core_HardwareModel } +export type MockDevice = { pub_id: DevicePubId; name: string; os: DeviceOS; used_storage: bigint; storage_size: bigint; created_at: string; updated_at: string; device_model: CoreHardwareModel } export type NodeConfigP2P = { discovery?: P2PDiscoveryState; port: Port; disabled: boolean; disable_ipv6: boolean; disable_relay: boolean; enable_remote_access: boolean; /** @@ -527,11 +536,11 @@ export type NodeState = ({ /** * id is a unique identifier for the current node. Each node has a public identifier (this one) and is given a local id for each library (done within the library code). */ -id: string; +id: CoreDevicePubId; /** * name is the display name of the current node. This is set by the user and is shown in the UI. // TODO: Length validation so it can fit in DNS record */ -name: string; identity: RemoteIdentity; p2p: NodeConfigP2P; features: BackendFeature[]; preferences: NodePreferences }) & { data_path: string; device_model: string | null; is_in_docker: boolean } +name: string; identity: RemoteIdentity; p2p: NodeConfigP2P; features: BackendFeature[]; preferences: NodePreferences; os: DeviceOS; hardware_model: CoreHardwareModel }) & { data_path: string; device_model: string | null; is_in_docker: boolean } export type NonCriticalError = { indexer: NonCriticalIndexerError } | { file_identifier: NonCriticalFileIdentifierError } | { media_processor: NonCriticalMediaProcessorError } @@ -562,6 +571,8 @@ export type NotificationId = { type: "library"; id: [string, number] } | { type: export type NotificationKind = "info" | "success" | "error" | "warning" +export type NotifyUser = { kind: "ReceivedJoinSyncGroupRequest"; data: { ticket: CloudP2PTicket; asking_device: Device; sync_group: SyncGroup } } | { kind: "ReceivedJoinSyncGroupResponse"; data: { response: JoinSyncGroupResponse; sync_group: SyncGroup } } | { kind: "SendingJoinSyncGroupResponseError"; data: { error: JoinSyncGroupError; sync_group: SyncGroup } } | { kind: "TimedOutJoinRequest"; data: { device: Device; succeeded: boolean } } + export type Object = { id: number; pub_id: number[]; kind: number | null; key_id: number | null; hidden: boolean | null; favorite: boolean | null; important: boolean | null; note: string | null; date_created: string | null; date_accessed: string | null } export type ObjectCursor = "none" | { dateAccessed: CursorOrderItem } | { kind: CursorOrderItem } @@ -602,7 +613,7 @@ export type P2PDiscoveryState = "Everyone" | "ContactsOnly" | "Disabled" export type P2PEvent = { type: "PeerChange"; identity: RemoteIdentity; connection: ConnectionMethod; discovery: DiscoveryMethod; metadata: PeerMetadata; addrs: string[] } | { type: "PeerDelete"; identity: RemoteIdentity } | { type: "SpacedropRequest"; id: string; identity: RemoteIdentity; peer_name: string; files: string[] } | { type: "SpacedropProgress"; id: string; percent: number } | { type: "SpacedropTimedOut"; id: string } | { type: "SpacedropRejected"; id: string } -export type PeerMetadata = { name: string; operating_system: OperatingSystem | null; device_model: core_HardwareModel | null; version: string | null } +export type PeerMetadata = { name: string; operating_system: OperatingSystem | null; device_model: CoreHardwareModel | null; version: string | null } export type PlusCode = string @@ -675,6 +686,10 @@ export type Stream = { id: number; name: string | null; codec: Codec | null; asp export type SubtitleProps = { width: number; height: number } +export type SyncGroup = { pub_id: SyncGroupPubId; name: string; latest_key_hash: KeyHash; library: Library; devices: Device[]; created_at: string; updated_at: string } + +export type SyncGroupPubId = string + export type SyncStatus = { ingest: boolean; cloud_send: boolean; cloud_receive: boolean; cloud_ingest: boolean } export type SystemLocations = { desktop: string | null; documents: string | null; downloads: string | null; pictures: string | null; music: string | null; videos: string | null } @@ -699,8 +714,8 @@ export type ThumbKey = { shard_hex: string; cas_id: CasId; base_directory_str: s export type UpdateThumbnailerPreferences = Record +export type UserResponse = { kind: "AcceptDeviceInSyncGroup"; data: { ticket: CloudP2PTicket; accepted: boolean } } + export type VideoProps = { pixel_format: string | null; color_range: string | null; bits_per_channel: number | null; color_space: string | null; color_primaries: string | null; color_transfer: string | null; field_order: string | null; chroma_location: string | null; width: number; height: number; aspect_ratio_num: number | null; aspect_ratio_den: number | null; properties: string[] } export type Volume = { name: string; mount_points: string[]; total_capacity: string; available_capacity: string; disk_type: DiskType; file_system: string | null; is_root_filesystem: boolean } - -export type core_HardwareModel = "Other" | "MacStudio" | "MacBookAir" | "MacBookPro" | "MacBook" | "MacMini" | "MacPro" | "IMac" | "IMacPro" | "IPad" | "IPhone" | "Simulator" | "Android"