mirror of
https://github.com/spacedriveapp/spacedrive.git
synced 2026-05-24 08:22:10 -04:00
Make core compile again
This commit is contained in:
BIN
Cargo.lock
generated
BIN
Cargo.lock
generated
Binary file not shown.
@@ -158,7 +158,7 @@ where
|
||||
JobCtx: JobContext<OuterCtx>,
|
||||
{
|
||||
fn into_job(self) -> Box<dyn DynJob<OuterCtx, JobCtx>> {
|
||||
let id = JobId::new_v4();
|
||||
let id = JobId::now_v7();
|
||||
|
||||
Box::new(JobHolder {
|
||||
id,
|
||||
@@ -333,7 +333,7 @@ where
|
||||
}
|
||||
|
||||
pub fn new(job: J) -> Self {
|
||||
let id = JobId::new_v4();
|
||||
let id = JobId::now_v7();
|
||||
Self {
|
||||
id,
|
||||
job,
|
||||
|
||||
@@ -34,7 +34,6 @@ use sd_utils::{from_bytes_to_uuid, uuid_to_bytes};
|
||||
use std::{borrow::Cow, fmt};
|
||||
|
||||
use serde::{Deserialize, Serialize};
|
||||
use specta::Type;
|
||||
use uuid::Uuid;
|
||||
|
||||
// File Path selectables!
|
||||
@@ -244,7 +243,7 @@ job::select!(job_without_data {
|
||||
location::select!(location_ids_and_path {
|
||||
id
|
||||
pub_id
|
||||
instance_id
|
||||
device_pub_id
|
||||
path
|
||||
});
|
||||
|
||||
@@ -259,6 +258,7 @@ impl From<location_with_indexer_rules::Data> for location::Data {
|
||||
id: data.id,
|
||||
pub_id: data.pub_id,
|
||||
path: data.path,
|
||||
device_pub_id: data.device_pub_id,
|
||||
instance_id: data.instance_id,
|
||||
name: data.name,
|
||||
total_capacity: data.total_capacity,
|
||||
@@ -272,6 +272,7 @@ impl From<location_with_indexer_rules::Data> for location::Data {
|
||||
scan_state: data.scan_state,
|
||||
file_paths: None,
|
||||
indexer_rules: None,
|
||||
device: None,
|
||||
instance: None,
|
||||
}
|
||||
}
|
||||
@@ -283,6 +284,7 @@ impl From<&location_with_indexer_rules::Data> for location::Data {
|
||||
id: data.id,
|
||||
pub_id: data.pub_id.clone(),
|
||||
path: data.path.clone(),
|
||||
device_pub_id: data.device_pub_id.clone(),
|
||||
instance_id: data.instance_id,
|
||||
name: data.name.clone(),
|
||||
total_capacity: data.total_capacity,
|
||||
@@ -296,6 +298,7 @@ impl From<&location_with_indexer_rules::Data> for location::Data {
|
||||
scan_state: data.scan_state,
|
||||
file_paths: None,
|
||||
indexer_rules: None,
|
||||
device: None,
|
||||
instance: None,
|
||||
}
|
||||
}
|
||||
@@ -311,7 +314,7 @@ label::include!((take: i64) => label_with_objects {
|
||||
}
|
||||
});
|
||||
|
||||
#[derive(Debug, Serialize, Deserialize, Hash, PartialEq, Eq, Type)]
|
||||
#[derive(Debug, Serialize, Deserialize, Hash, PartialEq, Eq, specta::Type)]
|
||||
#[serde(transparent)]
|
||||
pub struct CasId<'cas_id>(Cow<'cas_id, str>);
|
||||
|
||||
@@ -374,17 +377,26 @@ impl From<&CasId<'_>> for String {
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Serialize, Deserialize, Hash, PartialEq, Eq, Clone)]
|
||||
#[derive(Debug, Serialize, Deserialize, Hash, PartialEq, Eq, Clone, specta::Type)]
|
||||
#[serde(transparent)]
|
||||
#[repr(transparent)]
|
||||
#[specta(rename = "CoreDevicePubId")]
|
||||
pub struct DevicePubId(PubId);
|
||||
|
||||
#[derive(Debug, Serialize, Deserialize, Hash, PartialEq, Eq, Clone, specta::Type)]
|
||||
#[serde(transparent)]
|
||||
#[repr(transparent)]
|
||||
#[specta(rename = "CoreFilePathPubId")]
|
||||
pub struct FilePathPubId(PubId);
|
||||
|
||||
#[derive(Debug, Serialize, Deserialize, Hash, PartialEq, Eq, Clone)]
|
||||
#[derive(Debug, Serialize, Deserialize, Hash, PartialEq, Eq, Clone, specta::Type)]
|
||||
#[serde(transparent)]
|
||||
#[repr(transparent)]
|
||||
#[specta(rename = "CoreObjectPubId")]
|
||||
pub struct ObjectPubId(PubId);
|
||||
|
||||
#[derive(Debug, Serialize, Deserialize, Hash, PartialEq, Eq, Clone)]
|
||||
#[derive(Debug, Serialize, Deserialize, Hash, PartialEq, Eq, Clone, specta::Type)]
|
||||
#[specta(rename = "CorePubId")]
|
||||
enum PubId {
|
||||
Uuid(Uuid),
|
||||
Vec(Vec<u8>),
|
||||
@@ -392,7 +404,7 @@ enum PubId {
|
||||
|
||||
impl PubId {
|
||||
fn new() -> Self {
|
||||
Self::Uuid(Uuid::new_v4())
|
||||
Self::Uuid(Uuid::now_v7())
|
||||
}
|
||||
|
||||
fn to_db(&self) -> Vec<u8> {
|
||||
@@ -451,6 +463,15 @@ impl From<PubId> for Uuid {
|
||||
}
|
||||
}
|
||||
|
||||
impl From<&PubId> for Uuid {
|
||||
fn from(pub_id: &PubId) -> Self {
|
||||
match pub_id {
|
||||
PubId::Uuid(uuid) => *uuid,
|
||||
PubId::Vec(bytes) => from_bytes_to_uuid(bytes),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl fmt::Display for PubId {
|
||||
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
|
||||
match self {
|
||||
@@ -499,6 +520,12 @@ macro_rules! delegate_pub_id {
|
||||
}
|
||||
}
|
||||
|
||||
impl From<&$type_name> for ::uuid::Uuid {
|
||||
fn from(pub_id: &$type_name) -> Self {
|
||||
(&pub_id.0).into()
|
||||
}
|
||||
}
|
||||
|
||||
impl ::std::fmt::Display for $type_name {
|
||||
fn fmt(&self, f: &mut ::std::fmt::Formatter<'_>) -> ::std::fmt::Result {
|
||||
write!(f, "{}", self.0)
|
||||
@@ -526,4 +553,4 @@ macro_rules! delegate_pub_id {
|
||||
};
|
||||
}
|
||||
|
||||
delegate_pub_id!(FilePathPubId, ObjectPubId);
|
||||
delegate_pub_id!(FilePathPubId, ObjectPubId, DevicePubId);
|
||||
|
||||
@@ -9,6 +9,8 @@ default = []
|
||||
|
||||
[dependencies]
|
||||
# Spacedrive Sub-crates
|
||||
sd-core-prisma-helpers = { path = "../prisma-helpers" }
|
||||
|
||||
sd-actors = { path = "../../../crates/actors" }
|
||||
sd-prisma = { path = "../../../crates/prisma" }
|
||||
sd-sync = { path = "../../../crates/sync" }
|
||||
|
||||
@@ -1,7 +1,7 @@
|
||||
use sd_prisma::{
|
||||
prisma::{
|
||||
crdt_operation, exif_data, file_path, instance, label, label_on_object, location, object,
|
||||
tag, tag_on_object, PrismaClient, SortOrder,
|
||||
crdt_operation, exif_data, file_path, label, label_on_object, location, object, tag,
|
||||
tag_on_object, PrismaClient, SortOrder,
|
||||
},
|
||||
prisma_sync,
|
||||
};
|
||||
@@ -17,11 +17,7 @@ use super::{crdt_op_unchecked_db, Error};
|
||||
|
||||
/// Takes all the syncable data in the database and generates [`CRDTOperations`] for it.
|
||||
/// This is a requirement before the library can sync.
|
||||
pub async fn backfill_operations(
|
||||
db: &PrismaClient,
|
||||
sync: &crate::Manager,
|
||||
instance_id: instance::id::Type,
|
||||
) -> Result<(), Error> {
|
||||
pub async fn backfill_operations(db: &PrismaClient, sync: &crate::Manager) -> Result<(), Error> {
|
||||
let lock = sync.timestamp_lock.lock().await;
|
||||
|
||||
let res = db
|
||||
@@ -31,18 +27,20 @@ pub async fn backfill_operations(
|
||||
debug!("backfill started");
|
||||
let start = Instant::now();
|
||||
db.crdt_operation()
|
||||
.delete_many(vec![crdt_operation::instance_id::equals(instance_id)])
|
||||
.delete_many(vec![crdt_operation::device_pub_id::equals(
|
||||
sync.device_pub_id.to_db(),
|
||||
)])
|
||||
.exec()
|
||||
.await?;
|
||||
|
||||
paginate_tags(&db, sync, instance_id).await?;
|
||||
paginate_locations(&db, sync, instance_id).await?;
|
||||
paginate_objects(&db, sync, instance_id).await?;
|
||||
paginate_exif_datas(&db, sync, instance_id).await?;
|
||||
paginate_file_paths(&db, sync, instance_id).await?;
|
||||
paginate_tags_on_objects(&db, sync, instance_id).await?;
|
||||
paginate_labels(&db, sync, instance_id).await?;
|
||||
paginate_labels_on_objects(&db, sync, instance_id).await?;
|
||||
paginate_tags(&db, sync).await?;
|
||||
paginate_locations(&db, sync).await?;
|
||||
paginate_objects(&db, sync).await?;
|
||||
paginate_exif_datas(&db, sync).await?;
|
||||
paginate_file_paths(&db, sync).await?;
|
||||
paginate_tags_on_objects(&db, sync).await?;
|
||||
paginate_labels(&db, sync).await?;
|
||||
paginate_labels_on_objects(&db, sync).await?;
|
||||
|
||||
debug!(elapsed = ?start.elapsed(), "backfill ended");
|
||||
|
||||
@@ -112,13 +110,11 @@ where
|
||||
}
|
||||
|
||||
#[instrument(skip(db, sync), err)]
|
||||
async fn paginate_tags(
|
||||
db: &PrismaClient,
|
||||
sync: &crate::Manager,
|
||||
instance_id: instance::id::Type,
|
||||
) -> Result<(), Error> {
|
||||
async fn paginate_tags(db: &PrismaClient, sync: &crate::Manager) -> Result<(), Error> {
|
||||
use tag::{color, date_created, date_modified, id, name};
|
||||
|
||||
let device_pub_id = &sync.device_pub_id;
|
||||
|
||||
paginate(
|
||||
|cursor| {
|
||||
db.tag()
|
||||
@@ -143,7 +139,7 @@ async fn paginate_tags(
|
||||
),
|
||||
)
|
||||
})
|
||||
.map(|o| crdt_op_unchecked_db(&o, instance_id))
|
||||
.map(|o| crdt_op_unchecked_db(&o, device_pub_id))
|
||||
.collect::<Result<Vec<_>, _>>()
|
||||
.map(|creates| db.crdt_operation().create_many(creates).exec())
|
||||
},
|
||||
@@ -152,16 +148,14 @@ async fn paginate_tags(
|
||||
}
|
||||
|
||||
#[instrument(skip(db, sync), err)]
|
||||
async fn paginate_locations(
|
||||
db: &PrismaClient,
|
||||
sync: &crate::Manager,
|
||||
instance_id: instance::id::Type,
|
||||
) -> Result<(), Error> {
|
||||
async fn paginate_locations(db: &PrismaClient, sync: &crate::Manager) -> Result<(), Error> {
|
||||
use location::{
|
||||
available_capacity, date_created, generate_preview_media, hidden, id, include, instance,
|
||||
is_archived, name, path, size_in_bytes, sync_preview_media, total_capacity,
|
||||
};
|
||||
|
||||
let device_pub_id = &sync.device_pub_id;
|
||||
|
||||
paginate(
|
||||
|cursor| {
|
||||
db.location()
|
||||
@@ -209,7 +203,7 @@ async fn paginate_locations(
|
||||
),
|
||||
)
|
||||
})
|
||||
.map(|o| crdt_op_unchecked_db(&o, instance_id))
|
||||
.map(|o| crdt_op_unchecked_db(&o, device_pub_id))
|
||||
.collect::<Result<Vec<_>, _>>()
|
||||
.map(|creates| db.crdt_operation().create_many(creates).exec())
|
||||
},
|
||||
@@ -218,13 +212,11 @@ async fn paginate_locations(
|
||||
}
|
||||
|
||||
#[instrument(skip(db, sync), err)]
|
||||
async fn paginate_objects(
|
||||
db: &PrismaClient,
|
||||
sync: &crate::Manager,
|
||||
instance_id: instance::id::Type,
|
||||
) -> Result<(), Error> {
|
||||
async fn paginate_objects(db: &PrismaClient, sync: &crate::Manager) -> Result<(), Error> {
|
||||
use object::{date_accessed, date_created, favorite, hidden, id, important, kind, note};
|
||||
|
||||
let device_pub_id = &sync.device_pub_id;
|
||||
|
||||
paginate(
|
||||
|cursor| {
|
||||
db.object()
|
||||
@@ -254,7 +246,7 @@ async fn paginate_objects(
|
||||
),
|
||||
)
|
||||
})
|
||||
.map(|o| crdt_op_unchecked_db(&o, instance_id))
|
||||
.map(|o| crdt_op_unchecked_db(&o, device_pub_id))
|
||||
.collect::<Result<Vec<_>, _>>()
|
||||
.map(|creates| db.crdt_operation().create_many(creates).exec())
|
||||
},
|
||||
@@ -263,16 +255,14 @@ async fn paginate_objects(
|
||||
}
|
||||
|
||||
#[instrument(skip(db, sync), err)]
|
||||
async fn paginate_exif_datas(
|
||||
db: &PrismaClient,
|
||||
sync: &crate::Manager,
|
||||
instance_id: instance::id::Type,
|
||||
) -> Result<(), Error> {
|
||||
async fn paginate_exif_datas(db: &PrismaClient, sync: &crate::Manager) -> Result<(), Error> {
|
||||
use exif_data::{
|
||||
artist, camera_data, copyright, description, epoch_time, exif_version, id, include,
|
||||
media_date, media_location, resolution,
|
||||
};
|
||||
|
||||
let device_pub_id = &sync.device_pub_id;
|
||||
|
||||
paginate(
|
||||
|cursor| {
|
||||
db.exif_data()
|
||||
@@ -311,7 +301,7 @@ async fn paginate_exif_datas(
|
||||
),
|
||||
)
|
||||
})
|
||||
.map(|o| crdt_op_unchecked_db(&o, instance_id))
|
||||
.map(|o| crdt_op_unchecked_db(&o, device_pub_id))
|
||||
.collect::<Result<Vec<_>, _>>()
|
||||
.map(|creates| db.crdt_operation().create_many(creates).exec())
|
||||
},
|
||||
@@ -320,16 +310,14 @@ async fn paginate_exif_datas(
|
||||
}
|
||||
|
||||
#[instrument(skip(db, sync), err)]
|
||||
async fn paginate_file_paths(
|
||||
db: &PrismaClient,
|
||||
sync: &crate::Manager,
|
||||
instance_id: instance::id::Type,
|
||||
) -> Result<(), Error> {
|
||||
async fn paginate_file_paths(db: &PrismaClient, sync: &crate::Manager) -> Result<(), Error> {
|
||||
use file_path::{
|
||||
cas_id, date_created, date_indexed, date_modified, extension, hidden, id, include, inode,
|
||||
integrity_checksum, is_dir, location, materialized_path, name, object, size_in_bytes_bytes,
|
||||
};
|
||||
|
||||
let device_pub_id = &sync.device_pub_id;
|
||||
|
||||
paginate(
|
||||
|cursor| {
|
||||
db.file_path()
|
||||
@@ -379,7 +367,7 @@ async fn paginate_file_paths(
|
||||
),
|
||||
)
|
||||
})
|
||||
.map(|o| crdt_op_unchecked_db(&o, instance_id))
|
||||
.map(|o| crdt_op_unchecked_db(&o, device_pub_id))
|
||||
.collect::<Result<Vec<_>, _>>()
|
||||
.map(|creates| db.crdt_operation().create_many(creates).exec())
|
||||
},
|
||||
@@ -388,13 +376,11 @@ async fn paginate_file_paths(
|
||||
}
|
||||
|
||||
#[instrument(skip(db, sync), err)]
|
||||
async fn paginate_tags_on_objects(
|
||||
db: &PrismaClient,
|
||||
sync: &crate::Manager,
|
||||
instance_id: instance::id::Type,
|
||||
) -> Result<(), Error> {
|
||||
async fn paginate_tags_on_objects(db: &PrismaClient, sync: &crate::Manager) -> Result<(), Error> {
|
||||
use tag_on_object::{date_created, include, object_id, tag_id};
|
||||
|
||||
let device_pub_id = &sync.device_pub_id;
|
||||
|
||||
paginate_relation(
|
||||
|group_id, item_id| {
|
||||
db.tag_on_object()
|
||||
@@ -427,7 +413,7 @@ async fn paginate_tags_on_objects(
|
||||
),
|
||||
)
|
||||
})
|
||||
.map(|o| crdt_op_unchecked_db(&o, instance_id))
|
||||
.map(|o| crdt_op_unchecked_db(&o, device_pub_id))
|
||||
.collect::<Result<Vec<_>, _>>()
|
||||
.map(|creates| db.crdt_operation().create_many(creates).exec())
|
||||
},
|
||||
@@ -436,13 +422,11 @@ async fn paginate_tags_on_objects(
|
||||
}
|
||||
|
||||
#[instrument(skip(db, sync), err)]
|
||||
async fn paginate_labels(
|
||||
db: &PrismaClient,
|
||||
sync: &crate::Manager,
|
||||
instance_id: instance::id::Type,
|
||||
) -> Result<(), Error> {
|
||||
async fn paginate_labels(db: &PrismaClient, sync: &crate::Manager) -> Result<(), Error> {
|
||||
use label::{date_created, date_modified, id};
|
||||
|
||||
let device_pub_id = &sync.device_pub_id;
|
||||
|
||||
paginate(
|
||||
|cursor| {
|
||||
db.label()
|
||||
@@ -466,7 +450,7 @@ async fn paginate_labels(
|
||||
),
|
||||
)
|
||||
})
|
||||
.map(|o| crdt_op_unchecked_db(&o, instance_id))
|
||||
.map(|o| crdt_op_unchecked_db(&o, device_pub_id))
|
||||
.collect::<Result<Vec<_>, _>>()
|
||||
.map(|creates| db.crdt_operation().create_many(creates).exec())
|
||||
},
|
||||
@@ -475,13 +459,11 @@ async fn paginate_labels(
|
||||
}
|
||||
|
||||
#[instrument(skip(db, sync), err)]
|
||||
async fn paginate_labels_on_objects(
|
||||
db: &PrismaClient,
|
||||
sync: &crate::Manager,
|
||||
instance_id: instance::id::Type,
|
||||
) -> Result<(), Error> {
|
||||
async fn paginate_labels_on_objects(db: &PrismaClient, sync: &crate::Manager) -> Result<(), Error> {
|
||||
use label_on_object::{date_created, include, label_id, object_id};
|
||||
|
||||
let device_pub_id = &sync.device_pub_id;
|
||||
|
||||
paginate_relation(
|
||||
|group_id, item_id| {
|
||||
db.label_on_object()
|
||||
@@ -511,7 +493,7 @@ async fn paginate_labels_on_objects(
|
||||
[sync_entry!(l_o.date_created, date_created)],
|
||||
)
|
||||
})
|
||||
.map(|o| crdt_op_unchecked_db(&o, instance_id))
|
||||
.map(|o| crdt_op_unchecked_db(&o, device_pub_id))
|
||||
.collect::<Result<Vec<_>, _>>()
|
||||
.map(|creates| db.crdt_operation().create_many(creates).exec())
|
||||
},
|
||||
|
||||
@@ -1,82 +1,13 @@
|
||||
use sd_prisma::prisma::{cloud_crdt_operation, crdt_operation, instance, PrismaClient};
|
||||
use sd_core_prisma_helpers::DevicePubId;
|
||||
|
||||
use sd_prisma::prisma::{cloud_crdt_operation, crdt_operation, device, PrismaClient};
|
||||
use sd_sync::CRDTOperation;
|
||||
use sd_utils::from_bytes_to_uuid;
|
||||
|
||||
use tracing::instrument;
|
||||
use uhlc::NTP64;
|
||||
use uuid::Uuid;
|
||||
|
||||
use super::Error;
|
||||
|
||||
crdt_operation::include!(crdt_with_instance {
|
||||
instance: select { pub_id }
|
||||
});
|
||||
|
||||
cloud_crdt_operation::include!(cloud_crdt_with_instance {
|
||||
instance: select { pub_id }
|
||||
});
|
||||
|
||||
impl crdt_with_instance::Data {
|
||||
#[allow(clippy::cast_sign_loss)] // SAFETY: we had to store using i64 due to SQLite limitations
|
||||
pub const fn timestamp(&self) -> NTP64 {
|
||||
NTP64(self.timestamp as u64)
|
||||
}
|
||||
|
||||
pub fn instance(&self) -> Uuid {
|
||||
from_bytes_to_uuid(&self.instance.pub_id)
|
||||
}
|
||||
|
||||
pub fn into_operation(self) -> Result<CRDTOperation, Error> {
|
||||
Ok(CRDTOperation {
|
||||
device_pub_id: self.instance(),
|
||||
timestamp: self.timestamp(),
|
||||
record_id: rmp_serde::from_slice(&self.record_id)?,
|
||||
|
||||
model_id: {
|
||||
#[allow(clippy::cast_possible_truncation, clippy::cast_sign_loss)]
|
||||
// SAFETY: we will not have more than 2^16 models and we had to store using signed
|
||||
// integers due to SQLite limitations
|
||||
{
|
||||
self.model as u16
|
||||
}
|
||||
},
|
||||
data: rmp_serde::from_slice(&self.data)?,
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
impl cloud_crdt_with_instance::Data {
|
||||
#[allow(clippy::cast_sign_loss)] // SAFETY: we had to store using i64 due to SQLite limitations
|
||||
pub const fn timestamp(&self) -> NTP64 {
|
||||
NTP64(self.timestamp as u64)
|
||||
}
|
||||
|
||||
pub fn instance(&self) -> Uuid {
|
||||
from_bytes_to_uuid(&self.instance.pub_id)
|
||||
}
|
||||
|
||||
#[instrument(skip(self), err)]
|
||||
pub fn into_operation(self) -> Result<(i32, CRDTOperation), Error> {
|
||||
Ok((
|
||||
self.id,
|
||||
CRDTOperation {
|
||||
device_pub_id: self.instance(),
|
||||
timestamp: self.timestamp(),
|
||||
record_id: rmp_serde::from_slice(&self.record_id)?,
|
||||
model_id: {
|
||||
#[allow(clippy::cast_possible_truncation, clippy::cast_sign_loss)]
|
||||
// SAFETY: we will not have more than 2^16 models and we had to store using signed
|
||||
// integers due to SQLite limitations
|
||||
{
|
||||
self.model as u16
|
||||
}
|
||||
},
|
||||
data: rmp_serde::from_slice(&self.data)?,
|
||||
},
|
||||
))
|
||||
}
|
||||
}
|
||||
|
||||
#[instrument(skip(op, db), err)]
|
||||
pub async fn write_crdt_op_to_db(op: &CRDTOperation, db: &PrismaClient) -> Result<(), Error> {
|
||||
crdt_operation::Create {
|
||||
@@ -87,7 +18,7 @@ pub async fn write_crdt_op_to_db(op: &CRDTOperation, db: &PrismaClient) -> Resul
|
||||
op.timestamp.0 as i64
|
||||
}
|
||||
},
|
||||
instance: instance::pub_id::equals(op.device_pub_id.as_bytes().to_vec()),
|
||||
device: device::pub_id::equals(op.device_pub_id.as_bytes().to_vec()),
|
||||
kind: op.kind().to_string(),
|
||||
data: rmp_serde::to_vec(&op.data)?,
|
||||
model: i32::from(op.model_id),
|
||||
@@ -100,3 +31,71 @@ pub async fn write_crdt_op_to_db(op: &CRDTOperation, db: &PrismaClient) -> Resul
|
||||
.await
|
||||
.map_or_else(|e| Err(e.into()), |_| Ok(()))
|
||||
}
|
||||
|
||||
pub fn into_ops(
|
||||
crdt_operation::Data {
|
||||
timestamp,
|
||||
model,
|
||||
record_id,
|
||||
data,
|
||||
device_pub_id,
|
||||
..
|
||||
}: crdt_operation::Data,
|
||||
) -> Result<CRDTOperation, Error> {
|
||||
Ok(CRDTOperation {
|
||||
device_pub_id: DevicePubId::from(device_pub_id).into(),
|
||||
timestamp: {
|
||||
#[allow(clippy::cast_sign_loss)]
|
||||
{
|
||||
// SAFETY: we had to store using i64 due to SQLite limitations
|
||||
NTP64(timestamp as u64)
|
||||
}
|
||||
},
|
||||
model_id: {
|
||||
#[allow(clippy::cast_possible_truncation, clippy::cast_sign_loss)]
|
||||
{
|
||||
// SAFETY: we will not have more than 2^16 models and we had to store using signed
|
||||
// integers due to SQLite limitations
|
||||
model as u16
|
||||
}
|
||||
},
|
||||
record_id: rmp_serde::from_slice(&record_id)?,
|
||||
data: rmp_serde::from_slice(&data)?,
|
||||
})
|
||||
}
|
||||
|
||||
pub fn into_cloud_ops(
|
||||
cloud_crdt_operation::Data {
|
||||
id,
|
||||
timestamp,
|
||||
model,
|
||||
record_id,
|
||||
data,
|
||||
device_pub_id,
|
||||
..
|
||||
}: cloud_crdt_operation::Data,
|
||||
) -> Result<(cloud_crdt_operation::id::Type, CRDTOperation), Error> {
|
||||
Ok((
|
||||
id,
|
||||
CRDTOperation {
|
||||
device_pub_id: DevicePubId::from(device_pub_id).into(),
|
||||
timestamp: {
|
||||
#[allow(clippy::cast_sign_loss)]
|
||||
{
|
||||
// SAFETY: we had to store using i64 due to SQLite limitations
|
||||
NTP64(timestamp as u64)
|
||||
}
|
||||
},
|
||||
model_id: {
|
||||
#[allow(clippy::cast_possible_truncation, clippy::cast_sign_loss)]
|
||||
{
|
||||
// SAFETY: we will not have more than 2^16 models and we had to store using signed
|
||||
// integers due to SQLite limitations
|
||||
model as u16
|
||||
}
|
||||
},
|
||||
record_id: rmp_serde::from_slice(&record_id)?,
|
||||
data: rmp_serde::from_slice(&data)?,
|
||||
},
|
||||
))
|
||||
}
|
||||
|
||||
@@ -1,10 +1,12 @@
|
||||
use sd_core_prisma_helpers::DevicePubId;
|
||||
|
||||
use sd_prisma::{
|
||||
prisma::{crdt_operation, PrismaClient, SortOrder},
|
||||
prisma_sync::ModelSyncData,
|
||||
};
|
||||
use sd_sync::{
|
||||
CRDTOperation, CRDTOperationData, CompressedCRDTOperation,
|
||||
CompressedCRDTOperationsPerModelPerDevice, OperationKind,
|
||||
CompressedCRDTOperationsPerModelPerDevice, ModelId, OperationKind,
|
||||
};
|
||||
|
||||
use std::{
|
||||
@@ -40,7 +42,7 @@ use super::{
|
||||
/// Stuff that can be handled outside the actor
|
||||
pub enum Request {
|
||||
Messages {
|
||||
timestamps: Vec<(Uuid, NTP64)>,
|
||||
timestamps: Vec<(DevicePubId, NTP64)>,
|
||||
tx: oneshot::Sender<()>,
|
||||
},
|
||||
FinishedIngesting,
|
||||
@@ -137,7 +139,7 @@ impl Actor {
|
||||
.read()
|
||||
.await
|
||||
.iter()
|
||||
.map(|(&uid, ×tamp)| (uid, timestamp))
|
||||
.map(|(uid, ×tamp)| (uid.clone(), timestamp))
|
||||
.collect();
|
||||
|
||||
if self
|
||||
@@ -193,11 +195,11 @@ impl Actor {
|
||||
"Ingesting operations;",
|
||||
);
|
||||
|
||||
for (instance, data) in event.messages.0 {
|
||||
for (device_pub_id, data) in event.messages.0 {
|
||||
for (model, data) in data {
|
||||
for (record, ops) in data {
|
||||
if let Err(e) = self
|
||||
.process_crdt_operations(instance, model, record, ops)
|
||||
.process_crdt_operations(device_pub_id.into(), model, record, ops)
|
||||
.await
|
||||
{
|
||||
error!(?e, "Failed to ingest CRDT operations;");
|
||||
@@ -268,7 +270,7 @@ impl Actor {
|
||||
#[instrument(skip(self, ops), fields(operations_count = %ops.len()), err)]
|
||||
async fn process_crdt_operations(
|
||||
&mut self,
|
||||
instance: Uuid,
|
||||
device_pub_id: DevicePubId,
|
||||
model: u16,
|
||||
record_id: rmpv::Value,
|
||||
mut ops: Vec<CompressedCRDTOperation>,
|
||||
@@ -284,7 +286,9 @@ impl Actor {
|
||||
self.clock
|
||||
.update_with_timestamp(&Timestamp::new(
|
||||
new_timestamp,
|
||||
uhlc::ID::from(NonZeroU128::new(instance.to_u128_le()).expect("Non zero id")),
|
||||
uhlc::ID::from(
|
||||
NonZeroU128::new(Uuid::from(&device_pub_id).to_u128_le()).expect("Non zero id"),
|
||||
),
|
||||
))
|
||||
.expect("timestamp has too much drift!");
|
||||
|
||||
@@ -293,7 +297,7 @@ impl Actor {
|
||||
.timestamp_per_device
|
||||
.read()
|
||||
.await
|
||||
.get(&instance)
|
||||
.get(&device_pub_id)
|
||||
.copied();
|
||||
|
||||
// Delete - ignores all other messages
|
||||
@@ -303,7 +307,7 @@ impl Actor {
|
||||
.find(|op| matches!(op.data, CRDTOperationData::Delete))
|
||||
{
|
||||
trace!("Deleting operation");
|
||||
handle_crdt_deletion(db, instance, model, record_id, delete_op).await?;
|
||||
handle_crdt_deletion(db, &device_pub_id, model, record_id, delete_op).await?;
|
||||
}
|
||||
// Create + > 0 Update - overwrites the create's data with the updates
|
||||
else if let Some(timestamp) = ops
|
||||
@@ -330,7 +334,8 @@ impl Actor {
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
handle_crdt_create_and_updates(db, instance, model, record_id, ops, timestamp).await?;
|
||||
handle_crdt_create_and_updates(db, &device_pub_id, model, record_id, ops, timestamp)
|
||||
.await?;
|
||||
}
|
||||
// > 0 Update - batches updates with a fake Create op
|
||||
else {
|
||||
@@ -387,7 +392,7 @@ impl Actor {
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
handle_crdt_updates(db, instance, model, record_id, data, updates).await?;
|
||||
handle_crdt_updates(db, &device_pub_id, model, record_id, data, updates).await?;
|
||||
}
|
||||
|
||||
// update the stored timestamp for this instance - will be derived from the crdt operations table on restart
|
||||
@@ -396,7 +401,7 @@ impl Actor {
|
||||
self.timestamp_per_device
|
||||
.write()
|
||||
.await
|
||||
.insert(instance, new_ts);
|
||||
.insert(device_pub_id, new_ts);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
@@ -404,13 +409,14 @@ impl Actor {
|
||||
|
||||
async fn handle_crdt_updates(
|
||||
db: &PrismaClient,
|
||||
instance: Uuid,
|
||||
device_pub_id: &DevicePubId,
|
||||
model: u16,
|
||||
record_id: rmpv::Value,
|
||||
mut data: BTreeMap<String, (rmpv::Value, NTP64)>,
|
||||
updates: Vec<Option<crdt_operation::Data>>,
|
||||
) -> Result<(), Error> {
|
||||
let keys = data.keys().cloned().collect::<Vec<_>>();
|
||||
let device_pub_id = sd_sync::DevicePubId::from(device_pub_id);
|
||||
|
||||
// does the same thing as processing ops one-by-one and returning early if a newer op was found
|
||||
for (update, key) in updates.into_iter().zip(keys) {
|
||||
@@ -424,7 +430,7 @@ async fn handle_crdt_updates(
|
||||
.run(|db| async move {
|
||||
// fake operation to batch them all at once
|
||||
ModelSyncData::from_op(CRDTOperation {
|
||||
device_pub_id: instance,
|
||||
device_pub_id,
|
||||
model_id: model,
|
||||
record_id: record_id.clone(),
|
||||
timestamp: NTP64(0),
|
||||
@@ -447,7 +453,7 @@ async fn handle_crdt_updates(
|
||||
async move {
|
||||
write_crdt_op_to_db(
|
||||
&CRDTOperation {
|
||||
device_pub_id: instance,
|
||||
device_pub_id,
|
||||
model_id: model,
|
||||
record_id,
|
||||
timestamp,
|
||||
@@ -468,23 +474,24 @@ async fn handle_crdt_updates(
|
||||
|
||||
async fn handle_crdt_create_and_updates(
|
||||
db: &PrismaClient,
|
||||
instance: Uuid,
|
||||
model: u16,
|
||||
device_pub_id: &DevicePubId,
|
||||
model_id: ModelId,
|
||||
record_id: rmpv::Value,
|
||||
ops: Vec<CompressedCRDTOperation>,
|
||||
timestamp: NTP64,
|
||||
) -> Result<(), Error> {
|
||||
let mut data = BTreeMap::new();
|
||||
let device_pub_id = sd_sync::DevicePubId::from(device_pub_id);
|
||||
|
||||
let mut applied_ops = vec![];
|
||||
|
||||
// search for all Updates until a Create is found
|
||||
for op in ops.iter().rev() {
|
||||
for op in ops.into_iter().rev() {
|
||||
match &op.data {
|
||||
CRDTOperationData::Delete => unreachable!("Delete can't exist here!"),
|
||||
CRDTOperationData::Create(create_data) => {
|
||||
for (k, v) in create_data {
|
||||
data.entry(k).or_insert(v);
|
||||
data.entry(k.clone()).or_insert_with(|| v.clone());
|
||||
}
|
||||
|
||||
applied_ops.push(op);
|
||||
@@ -492,8 +499,8 @@ async fn handle_crdt_create_and_updates(
|
||||
break;
|
||||
}
|
||||
CRDTOperationData::Update { field, value } => {
|
||||
data.insert(field.clone(), value.clone());
|
||||
applied_ops.push(op);
|
||||
data.insert(field, value);
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -503,35 +510,33 @@ async fn handle_crdt_create_and_updates(
|
||||
.run(|db| async move {
|
||||
// fake a create with a bunch of data rather than individual insert
|
||||
ModelSyncData::from_op(CRDTOperation {
|
||||
device_pub_id: instance,
|
||||
model_id: model,
|
||||
device_pub_id,
|
||||
model_id,
|
||||
record_id: record_id.clone(),
|
||||
timestamp,
|
||||
data: CRDTOperationData::Create(
|
||||
data.into_iter()
|
||||
.map(|(k, v)| (k.clone(), v.clone()))
|
||||
.collect(),
|
||||
),
|
||||
data: CRDTOperationData::Create(data),
|
||||
})
|
||||
.ok_or(Error::InvalidModelId(model))?
|
||||
.ok_or(Error::InvalidModelId(model_id))?
|
||||
.exec(&db)
|
||||
.await?;
|
||||
|
||||
applied_ops
|
||||
.into_iter()
|
||||
.map(|op| {
|
||||
.map(|CompressedCRDTOperation { timestamp, data }| {
|
||||
let record_id = record_id.clone();
|
||||
let db = &db;
|
||||
async move {
|
||||
let operation = CRDTOperation {
|
||||
device_pub_id: instance,
|
||||
model_id: model,
|
||||
record_id,
|
||||
timestamp: op.timestamp,
|
||||
data: op.data.clone(),
|
||||
};
|
||||
|
||||
write_crdt_op_to_db(&operation, db).await
|
||||
write_crdt_op_to_db(
|
||||
&CRDTOperation {
|
||||
device_pub_id,
|
||||
timestamp,
|
||||
model_id,
|
||||
record_id,
|
||||
data,
|
||||
},
|
||||
db,
|
||||
)
|
||||
.await
|
||||
}
|
||||
})
|
||||
.collect::<Vec<_>>()
|
||||
@@ -544,14 +549,14 @@ async fn handle_crdt_create_and_updates(
|
||||
|
||||
async fn handle_crdt_deletion(
|
||||
db: &PrismaClient,
|
||||
instance: Uuid,
|
||||
device_pub_id: &DevicePubId,
|
||||
model: u16,
|
||||
record_id: rmpv::Value,
|
||||
delete_op: &CompressedCRDTOperation,
|
||||
) -> Result<(), Error> {
|
||||
// deletes are the be all and end all, no need to check anything
|
||||
let op = CRDTOperation {
|
||||
device_pub_id: instance,
|
||||
device_pub_id: device_pub_id.into(),
|
||||
model_id: model,
|
||||
record_id,
|
||||
timestamp: delete_op.timestamp,
|
||||
@@ -586,7 +591,7 @@ pub struct Handler {
|
||||
|
||||
#[derive(Debug)]
|
||||
pub struct MessagesEvent {
|
||||
pub instance_id: Uuid,
|
||||
pub device_pub_id: DevicePubId,
|
||||
pub messages: CompressedCRDTOperationsPerModelPerDevice,
|
||||
pub has_more: bool,
|
||||
pub wait_tx: Option<oneshot::Sender<()>>,
|
||||
@@ -608,13 +613,13 @@ mod test {
|
||||
use super::*;
|
||||
|
||||
async fn new_actor() -> (Handler, Arc<SharedState>) {
|
||||
let instance = Uuid::new_v4();
|
||||
let device_pub_id = Uuid::now_v7();
|
||||
let shared = Arc::new(SharedState {
|
||||
db: sd_prisma::test_db().await,
|
||||
instance,
|
||||
device_pub_id: device_pub_id.into(),
|
||||
clock: HLCBuilder::new()
|
||||
.with_id(uhlc::ID::from(
|
||||
NonZeroU128::new(instance.to_u128_le()).expect("Non zero id"),
|
||||
NonZeroU128::new(device_pub_id.to_u128_le()).expect("Non zero id"),
|
||||
))
|
||||
.build(),
|
||||
timestamp_per_device: Arc::default(),
|
||||
|
||||
@@ -27,7 +27,7 @@
|
||||
#![forbid(deprecated_in_future)]
|
||||
#![allow(clippy::missing_errors_doc, clippy::module_name_repetitions)]
|
||||
|
||||
use sd_prisma::prisma::{crdt_operation, instance, PrismaClient};
|
||||
use sd_prisma::prisma::{crdt_operation, device, PrismaClient};
|
||||
use sd_sync::CRDTOperation;
|
||||
|
||||
use std::{
|
||||
@@ -36,7 +36,6 @@ use std::{
|
||||
};
|
||||
|
||||
use tokio::sync::{Notify, RwLock};
|
||||
use uuid::Uuid;
|
||||
|
||||
mod actor;
|
||||
pub mod backfill;
|
||||
@@ -54,13 +53,14 @@ pub enum SyncMessage {
|
||||
Created,
|
||||
}
|
||||
|
||||
pub type DevicePubId = Uuid;
|
||||
pub use sd_core_prisma_helpers::DevicePubId;
|
||||
|
||||
pub type TimestampPerDevice = Arc<RwLock<HashMap<DevicePubId, NTP64>>>;
|
||||
|
||||
pub struct SharedState {
|
||||
pub db: Arc<PrismaClient>,
|
||||
pub emit_messages_flag: Arc<AtomicBool>,
|
||||
pub instance: Uuid,
|
||||
pub device_pub_id: DevicePubId,
|
||||
pub timestamp_per_device: TimestampPerDevice,
|
||||
pub clock: uhlc::HLC,
|
||||
pub active: AtomicBool,
|
||||
@@ -106,7 +106,7 @@ pub fn crdt_op_db(op: &CRDTOperation) -> Result<crdt_operation::Create, Error> {
|
||||
op.timestamp.as_u64() as i64
|
||||
}
|
||||
},
|
||||
instance: instance::pub_id::equals(op.device_pub_id.as_bytes().to_vec()),
|
||||
device: device::pub_id::equals(op.device_pub_id.as_bytes().to_vec()),
|
||||
kind: op.kind().to_string(),
|
||||
data: rmp_serde::to_vec(&op.data)?,
|
||||
model: i32::from(op.model_id),
|
||||
@@ -117,7 +117,7 @@ pub fn crdt_op_db(op: &CRDTOperation) -> Result<crdt_operation::Create, Error> {
|
||||
|
||||
pub fn crdt_op_unchecked_db(
|
||||
op: &CRDTOperation,
|
||||
instance_id: i32,
|
||||
device_pub_id: &DevicePubId,
|
||||
) -> Result<crdt_operation::CreateUnchecked, Error> {
|
||||
Ok(crdt_operation::CreateUnchecked {
|
||||
timestamp: {
|
||||
@@ -127,7 +127,7 @@ pub fn crdt_op_unchecked_db(
|
||||
op.timestamp.as_u64() as i64
|
||||
}
|
||||
},
|
||||
instance_id,
|
||||
device_pub_id: device_pub_id.to_db(),
|
||||
kind: op.kind().to_string(),
|
||||
data: rmp_serde::to_vec(&op.data)?,
|
||||
model: i32::from(op.model_id),
|
||||
|
||||
@@ -1,7 +1,8 @@
|
||||
use sd_prisma::prisma::{cloud_crdt_operation, crdt_operation, instance, PrismaClient, SortOrder};
|
||||
use sd_core_prisma_helpers::DevicePubId;
|
||||
|
||||
use sd_prisma::prisma::{cloud_crdt_operation, crdt_operation, device, PrismaClient, SortOrder};
|
||||
use sd_sync::{CRDTOperation, OperationFactory};
|
||||
use sd_utils::{from_bytes_to_uuid, uuid_to_bytes};
|
||||
use tracing::warn;
|
||||
use sd_utils::from_bytes_to_uuid;
|
||||
|
||||
use std::{
|
||||
cmp, fmt,
|
||||
@@ -15,12 +16,13 @@ use std::{
|
||||
|
||||
use prisma_client_rust::{and, operator::or};
|
||||
use tokio::sync::{broadcast, Mutex, Notify, RwLock};
|
||||
use tracing::warn;
|
||||
use uhlc::{HLCBuilder, HLC};
|
||||
use uuid::Uuid;
|
||||
|
||||
use super::{
|
||||
crdt_op_db,
|
||||
db_operation::{cloud_crdt_with_instance, crdt_with_instance},
|
||||
db_operation::{into_cloud_ops, into_ops},
|
||||
ingest, Error, SharedState, SyncMessage, NTP64,
|
||||
};
|
||||
|
||||
@@ -40,7 +42,7 @@ impl fmt::Debug for Manager {
|
||||
|
||||
#[derive(serde::Serialize, serde::Deserialize, Debug, PartialEq, Eq)]
|
||||
pub struct GetOpsArgs {
|
||||
pub timestamp_per_device: Vec<(Uuid, NTP64)>,
|
||||
pub timestamp_per_device: Vec<(DevicePubId, NTP64)>,
|
||||
pub count: u32,
|
||||
}
|
||||
|
||||
@@ -49,17 +51,17 @@ impl Manager {
|
||||
/// Sync messages are received on the returned [`broadcast::Receiver<SyncMessage>`].
|
||||
pub async fn new(
|
||||
db: Arc<PrismaClient>,
|
||||
current_instance_uuid: Uuid,
|
||||
current_device_pub_id: &DevicePubId,
|
||||
emit_messages_flag: Arc<AtomicBool>,
|
||||
actors: Arc<sd_actors::Actors>,
|
||||
) -> Result<(Self, broadcast::Receiver<SyncMessage>), Error> {
|
||||
let existing_instances = db.instance().find_many(vec![]).exec().await?;
|
||||
let existing_devices = db.device().find_many(vec![]).exec().await?;
|
||||
|
||||
Self::with_existing_instances(
|
||||
Self::with_existing_devices(
|
||||
db,
|
||||
current_instance_uuid,
|
||||
current_device_pub_id,
|
||||
emit_messages_flag,
|
||||
&existing_instances,
|
||||
&existing_devices,
|
||||
actors,
|
||||
)
|
||||
.await
|
||||
@@ -69,33 +71,35 @@ impl Manager {
|
||||
/// Sync messages are received on the returned [`broadcast::Receiver<SyncMessage>`].
|
||||
///
|
||||
/// # Panics
|
||||
/// Panics if the `current_instance_id` UUID is zeroed.
|
||||
pub async fn with_existing_instances(
|
||||
/// Panics if the `current_device_pub_id` UUID is zeroed, which will never happen as we use `UUIDv7` for the
|
||||
/// device pub id. As this version have a timestamp part, instead of being totally random. So the only
|
||||
/// possible way to get zero from a `UUIDv7` is to go back in time to 1970
|
||||
pub async fn with_existing_devices(
|
||||
db: Arc<PrismaClient>,
|
||||
current_instance_uuid: Uuid,
|
||||
current_device_pub_id: &DevicePubId,
|
||||
emit_messages_flag: Arc<AtomicBool>,
|
||||
existing_instances: &[instance::Data],
|
||||
existing_devices: &[device::Data],
|
||||
actors: Arc<sd_actors::Actors>,
|
||||
) -> Result<(Self, broadcast::Receiver<SyncMessage>), Error> {
|
||||
let timestamps = db
|
||||
let latest_timestamp_per_device = db
|
||||
._batch(
|
||||
existing_instances
|
||||
existing_devices
|
||||
.iter()
|
||||
.map(|i| {
|
||||
.map(|device| {
|
||||
db.crdt_operation()
|
||||
.find_first(vec![crdt_operation::instance::is(vec![
|
||||
instance::id::equals(i.id),
|
||||
])])
|
||||
.find_first(vec![crdt_operation::device_pub_id::equals(
|
||||
device.pub_id.clone(),
|
||||
)])
|
||||
.order_by(crdt_operation::timestamp::order(SortOrder::Desc))
|
||||
})
|
||||
.collect::<Vec<_>>(),
|
||||
)
|
||||
.await?
|
||||
.into_iter()
|
||||
.zip(existing_instances)
|
||||
.map(|(op, i)| {
|
||||
.zip(existing_devices)
|
||||
.map(|(op, device)| {
|
||||
(
|
||||
from_bytes_to_uuid(&i.pub_id),
|
||||
DevicePubId::from(&device.pub_id),
|
||||
#[allow(clippy::cast_sign_loss)]
|
||||
// SAFETY: we had to store using i64 due to SQLite limitations
|
||||
NTP64(op.map(|o| o.timestamp).unwrap_or_default() as u64),
|
||||
@@ -107,15 +111,16 @@ impl Manager {
|
||||
|
||||
let clock = HLCBuilder::new()
|
||||
.with_id(uhlc::ID::from(
|
||||
NonZeroU128::new(current_instance_uuid.to_u128_le()).expect("Non zero id"),
|
||||
NonZeroU128::new(Uuid::from(current_device_pub_id).to_u128_le())
|
||||
.expect("Non zero id"),
|
||||
))
|
||||
.build();
|
||||
|
||||
let shared = Arc::new(SharedState {
|
||||
db,
|
||||
instance: current_instance_uuid,
|
||||
device_pub_id: current_device_pub_id.clone(),
|
||||
clock,
|
||||
timestamp_per_device: Arc::new(RwLock::new(timestamps)),
|
||||
timestamp_per_device: Arc::new(RwLock::new(latest_timestamp_per_device)),
|
||||
emit_messages_flag,
|
||||
active: AtomicBool::default(),
|
||||
active_notify: Notify::default(),
|
||||
@@ -168,7 +173,7 @@ impl Manager {
|
||||
.timestamp_per_device
|
||||
.write()
|
||||
.await
|
||||
.insert(self.instance, last.timestamp);
|
||||
.insert(self.device_pub_id.clone(), last.timestamp);
|
||||
}
|
||||
|
||||
if self.tx.send(SyncMessage::Created).is_err() {
|
||||
@@ -216,33 +221,30 @@ impl Manager {
|
||||
.timestamp_per_device
|
||||
.write()
|
||||
.await
|
||||
.insert(self.instance, op.timestamp);
|
||||
.insert(self.device_pub_id.clone(), op.timestamp);
|
||||
|
||||
Ok(ret)
|
||||
}
|
||||
|
||||
pub async fn get_instance_ops(
|
||||
pub async fn get_device_ops(
|
||||
&self,
|
||||
count: u32,
|
||||
instance_uuid: Uuid,
|
||||
device_pub_id: DevicePubId,
|
||||
timestamp: NTP64,
|
||||
) -> Result<Vec<CRDTOperation>, Error> {
|
||||
self.db
|
||||
.crdt_operation()
|
||||
.find_many(vec![
|
||||
crdt_operation::instance::is(vec![instance::pub_id::equals(uuid_to_bytes(
|
||||
&instance_uuid,
|
||||
))]),
|
||||
crdt_operation::device::is(vec![device::pub_id::equals(device_pub_id.into())]),
|
||||
#[allow(clippy::cast_possible_wrap)]
|
||||
crdt_operation::timestamp::gt(timestamp.as_u64() as i64),
|
||||
])
|
||||
.take(i64::from(count))
|
||||
.order_by(crdt_operation::timestamp::order(SortOrder::Asc))
|
||||
.include(crdt_with_instance::include())
|
||||
.exec()
|
||||
.await?
|
||||
.into_iter()
|
||||
.map(crdt_with_instance::Data::into_operation)
|
||||
.map(into_ops)
|
||||
.collect()
|
||||
}
|
||||
|
||||
@@ -253,10 +255,10 @@ impl Manager {
|
||||
.find_many(vec![or(args
|
||||
.timestamp_per_device
|
||||
.iter()
|
||||
.map(|(instance_id, timestamp)| {
|
||||
.map(|(device_pub_id, timestamp)| {
|
||||
and![
|
||||
crdt_operation::instance::is(vec![instance::pub_id::equals(
|
||||
uuid_to_bytes(instance_id)
|
||||
crdt_operation::device::is(vec![device::pub_id::equals(
|
||||
device_pub_id.to_db()
|
||||
)]),
|
||||
crdt_operation::timestamp::gt({
|
||||
#[allow(clippy::cast_possible_wrap)]
|
||||
@@ -267,46 +269,47 @@ impl Manager {
|
||||
})
|
||||
]
|
||||
})
|
||||
.chain([crdt_operation::instance::is_not(vec![
|
||||
instance::pub_id::in_vec(
|
||||
.chain([crdt_operation::device::is_not(vec![
|
||||
device::pub_id::in_vec(
|
||||
args.timestamp_per_device
|
||||
.iter()
|
||||
.map(|(instance_id, _)| uuid_to_bytes(instance_id))
|
||||
.map(|(device_pub_id, _)| device_pub_id.to_db())
|
||||
.collect(),
|
||||
),
|
||||
])])
|
||||
.collect())])
|
||||
.take(i64::from(args.count))
|
||||
.order_by(crdt_operation::timestamp::order(SortOrder::Asc))
|
||||
.include(crdt_with_instance::include())
|
||||
.exec()
|
||||
.await?;
|
||||
|
||||
ops.sort_by(|a, b| match a.timestamp().cmp(&b.timestamp()) {
|
||||
cmp::Ordering::Equal => a.instance().cmp(&b.instance()),
|
||||
ops.sort_by(|a, b| match a.timestamp.cmp(&b.timestamp) {
|
||||
cmp::Ordering::Equal => {
|
||||
from_bytes_to_uuid(&a.device_pub_id).cmp(&from_bytes_to_uuid(&b.device_pub_id))
|
||||
}
|
||||
o => o,
|
||||
});
|
||||
|
||||
ops.into_iter()
|
||||
.take(args.count as usize)
|
||||
.map(crdt_with_instance::Data::into_operation)
|
||||
.map(into_ops)
|
||||
.collect()
|
||||
}
|
||||
|
||||
pub async fn get_cloud_ops(
|
||||
&self,
|
||||
args: GetOpsArgs,
|
||||
) -> Result<Vec<(i32, CRDTOperation)>, Error> {
|
||||
) -> Result<Vec<(cloud_crdt_operation::id::Type, CRDTOperation)>, Error> {
|
||||
let mut ops = self
|
||||
.db
|
||||
.cloud_crdt_operation()
|
||||
.find_many(vec![or(args
|
||||
.timestamp_per_device
|
||||
.iter()
|
||||
.map(|(instance_id, timestamp)| {
|
||||
.map(|(device_pub_id, timestamp)| {
|
||||
and![
|
||||
cloud_crdt_operation::instance::is(vec![instance::pub_id::equals(
|
||||
uuid_to_bytes(instance_id)
|
||||
cloud_crdt_operation::device::is(vec![device::pub_id::equals(
|
||||
device_pub_id.to_db()
|
||||
)]),
|
||||
cloud_crdt_operation::timestamp::gt({
|
||||
#[allow(clippy::cast_possible_wrap)]
|
||||
@@ -317,29 +320,30 @@ impl Manager {
|
||||
})
|
||||
]
|
||||
})
|
||||
.chain([cloud_crdt_operation::instance::is_not(vec![
|
||||
instance::pub_id::in_vec(
|
||||
.chain([cloud_crdt_operation::device::is_not(vec![
|
||||
device::pub_id::in_vec(
|
||||
args.timestamp_per_device
|
||||
.iter()
|
||||
.map(|(instance_id, _)| uuid_to_bytes(instance_id))
|
||||
.map(|(device_pub_id, _)| device_pub_id.to_db())
|
||||
.collect(),
|
||||
),
|
||||
])])
|
||||
.collect())])
|
||||
.take(i64::from(args.count))
|
||||
.order_by(cloud_crdt_operation::timestamp::order(SortOrder::Asc))
|
||||
.include(cloud_crdt_with_instance::include())
|
||||
.exec()
|
||||
.await?;
|
||||
|
||||
ops.sort_by(|a, b| match a.timestamp().cmp(&b.timestamp()) {
|
||||
cmp::Ordering::Equal => a.instance().cmp(&b.instance()),
|
||||
ops.sort_by(|a, b| match a.timestamp.cmp(&b.timestamp) {
|
||||
cmp::Ordering::Equal => {
|
||||
from_bytes_to_uuid(&a.device_pub_id).cmp(&from_bytes_to_uuid(&b.device_pub_id))
|
||||
}
|
||||
o => o,
|
||||
});
|
||||
|
||||
ops.into_iter()
|
||||
.take(args.count as usize)
|
||||
.map(cloud_crdt_with_instance::Data::into_operation)
|
||||
.map(into_cloud_ops)
|
||||
.collect()
|
||||
}
|
||||
}
|
||||
@@ -349,8 +353,8 @@ impl OperationFactory for Manager {
|
||||
&self.clock
|
||||
}
|
||||
|
||||
fn get_device_pub_id(&self) -> Uuid {
|
||||
self.instance
|
||||
fn get_device_pub_id(&self) -> sd_sync::DevicePubId {
|
||||
sd_sync::DevicePubId::from(&self.device_pub_id)
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -6,7 +6,7 @@ use sd_prisma::{prisma::location, prisma_sync};
|
||||
use sd_sync::*;
|
||||
use sd_utils::{msgpack, uuid_to_bytes};
|
||||
|
||||
use mock_instance::Instance;
|
||||
use mock_instance::Device;
|
||||
use tracing::info;
|
||||
use tracing_test::traced_test;
|
||||
use uuid::Uuid;
|
||||
@@ -14,7 +14,7 @@ use uuid::Uuid;
|
||||
const MOCK_LOCATION_NAME: &str = "Location 0";
|
||||
const MOCK_LOCATION_PATH: &str = "/User/Anon/Documents";
|
||||
|
||||
async fn write_test_location(instance: &Instance) -> location::Data {
|
||||
async fn write_test_location(instance: &Device) -> location::Data {
|
||||
let location_pub_id = Uuid::new_v4();
|
||||
|
||||
let location = instance
|
||||
@@ -81,7 +81,7 @@ async fn write_test_location(instance: &Instance) -> location::Data {
|
||||
#[tokio::test]
|
||||
#[traced_test]
|
||||
async fn writes_operations_and_rows_together() -> Result<(), Box<dyn std::error::Error>> {
|
||||
let instance = Instance::new(Uuid::new_v4()).await;
|
||||
let instance = Device::new(Uuid::new_v4()).await;
|
||||
|
||||
write_test_location(&instance).await;
|
||||
|
||||
@@ -119,14 +119,14 @@ async fn writes_operations_and_rows_together() -> Result<(), Box<dyn std::error:
|
||||
#[tokio::test]
|
||||
#[traced_test]
|
||||
async fn operations_send_and_ingest() -> Result<(), Box<dyn std::error::Error>> {
|
||||
let instance1 = Instance::new(Uuid::new_v4()).await;
|
||||
let instance2 = Instance::new(Uuid::new_v4()).await;
|
||||
let instance1 = Device::new(Uuid::new_v4()).await;
|
||||
let instance2 = Device::new(Uuid::new_v4()).await;
|
||||
|
||||
let mut instance2_sync_rx = instance2.sync_rx.resubscribe();
|
||||
|
||||
info!("Created instances!");
|
||||
|
||||
Instance::pair(&instance1, &instance2).await;
|
||||
Device::pair(&instance1, &instance2).await;
|
||||
|
||||
info!("Paired instances!");
|
||||
|
||||
@@ -162,12 +162,12 @@ async fn operations_send_and_ingest() -> Result<(), Box<dyn std::error::Error>>
|
||||
|
||||
#[tokio::test]
|
||||
async fn no_update_after_delete() -> Result<(), Box<dyn std::error::Error>> {
|
||||
let instance1 = Instance::new(Uuid::new_v4()).await;
|
||||
let instance2 = Instance::new(Uuid::new_v4()).await;
|
||||
let instance1 = Device::new(Uuid::new_v4()).await;
|
||||
let instance2 = Device::new(Uuid::new_v4()).await;
|
||||
|
||||
let mut instance2_sync_rx = instance2.sync_rx.resubscribe();
|
||||
|
||||
Instance::pair(&instance1, &instance2).await;
|
||||
Device::pair(&instance1, &instance2).await;
|
||||
|
||||
let location = write_test_location(&instance1).await;
|
||||
|
||||
|
||||
@@ -2,11 +2,9 @@ use sd_core_sync::*;
|
||||
|
||||
use sd_prisma::prisma;
|
||||
use sd_sync::CompressedCRDTOperationsPerModelPerDevice;
|
||||
use sd_utils::uuid_to_bytes;
|
||||
|
||||
use std::sync::{atomic::AtomicBool, Arc};
|
||||
|
||||
use prisma_client_rust::chrono::Utc;
|
||||
use tokio::{fs, spawn, sync::broadcast};
|
||||
use tracing::{info, instrument, warn, Instrument};
|
||||
use uuid::Uuid;
|
||||
@@ -16,16 +14,17 @@ fn db_path(id: Uuid) -> String {
|
||||
}
|
||||
|
||||
#[derive(Clone)]
|
||||
pub struct Instance {
|
||||
pub id: Uuid,
|
||||
pub struct Device {
|
||||
pub pub_id: DevicePubId,
|
||||
pub db: Arc<prisma::PrismaClient>,
|
||||
pub sync: Arc<sd_core_sync::Manager>,
|
||||
pub sync_rx: Arc<broadcast::Receiver<SyncMessage>>,
|
||||
}
|
||||
|
||||
impl Instance {
|
||||
impl Device {
|
||||
pub async fn new(id: Uuid) -> Arc<Self> {
|
||||
let url = format!("file:{}", db_path(id));
|
||||
let device_pub_id = DevicePubId::from(id);
|
||||
|
||||
let db = Arc::new(
|
||||
prisma::PrismaClient::_builder()
|
||||
@@ -37,22 +36,15 @@ impl Instance {
|
||||
|
||||
db._db_push().await.unwrap();
|
||||
|
||||
db.instance()
|
||||
.create(
|
||||
uuid_to_bytes(&id),
|
||||
vec![],
|
||||
vec![],
|
||||
Utc::now().into(),
|
||||
Utc::now().into(),
|
||||
vec![],
|
||||
)
|
||||
db.device()
|
||||
.create(device_pub_id.to_db(), vec![])
|
||||
.exec()
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
let (sync, sync_rx) = sd_core_sync::Manager::new(
|
||||
Arc::clone(&db),
|
||||
id,
|
||||
&device_pub_id,
|
||||
Arc::new(AtomicBool::new(true)),
|
||||
Default::default(),
|
||||
)
|
||||
@@ -60,7 +52,7 @@ impl Instance {
|
||||
.expect("failed to create sync manager");
|
||||
|
||||
Arc::new(Self {
|
||||
id,
|
||||
pub_id: device_pub_id,
|
||||
db,
|
||||
sync: Arc::new(sync),
|
||||
sync_rx: Arc::new(sync_rx),
|
||||
@@ -68,22 +60,17 @@ impl Instance {
|
||||
}
|
||||
|
||||
pub async fn teardown(&self) {
|
||||
fs::remove_file(db_path(self.id)).await.unwrap();
|
||||
fs::remove_file(db_path(Uuid::from(&self.pub_id)))
|
||||
.await
|
||||
.unwrap();
|
||||
}
|
||||
|
||||
pub async fn pair(instance1: &Arc<Self>, instance2: &Arc<Self>) {
|
||||
#[instrument(skip(left, right))]
|
||||
async fn half(left: &Arc<Instance>, right: &Arc<Instance>, context: &'static str) {
|
||||
async fn half(left: &Arc<Device>, right: &Arc<Device>, context: &'static str) {
|
||||
left.db
|
||||
.instance()
|
||||
.create(
|
||||
uuid_to_bytes(&right.id),
|
||||
vec![],
|
||||
vec![],
|
||||
Utc::now().into(),
|
||||
Utc::now().into(),
|
||||
vec![],
|
||||
)
|
||||
.device()
|
||||
.create(right.pub_id.to_db(), vec![])
|
||||
.exec()
|
||||
.await
|
||||
.unwrap();
|
||||
@@ -137,7 +124,7 @@ impl Instance {
|
||||
messages,
|
||||
),
|
||||
has_more: false,
|
||||
instance_id: left.id,
|
||||
device_pub_id: left.pub_id.clone(),
|
||||
wait_tx: None,
|
||||
}))
|
||||
.await
|
||||
|
||||
@@ -57,20 +57,20 @@ model CloudCRDTOperation {
|
||||
/// Devices are the owner machines connected to this library
|
||||
/// @shared(id: pub_id, modelId: 12)
|
||||
model Device {
|
||||
id Int @id @default(autoincrement())
|
||||
id Int @id @default(autoincrement())
|
||||
// uuid v7
|
||||
pub_id Bytes @unique
|
||||
name String
|
||||
pub_id Bytes @unique
|
||||
name String? // Not actually NULLABLE, but we have to comply with current sync implementation BS
|
||||
|
||||
// Enum: sd_cloud_schema::device::DeviceOS
|
||||
os Int
|
||||
os Int? // Not actually NULLABLE, but we have to comply with current sync implementation BS
|
||||
// Enum: sd_cloud_schema::device::HardwareModel
|
||||
hardware_model Int
|
||||
hardware_model Int? // Not actually NULLABLE, but we have to comply with current sync implementation BS
|
||||
|
||||
// clock timestamp for sync
|
||||
timestamp BigInt?
|
||||
|
||||
date_created DateTime
|
||||
date_created DateTime? // Not actually NULLABLE, but we have to comply with current sync implementation BS
|
||||
date_deleted DateTime?
|
||||
|
||||
CRDTOperation CRDTOperation[]
|
||||
@@ -103,6 +103,7 @@ model Instance {
|
||||
|
||||
// clock timestamp for sync
|
||||
timestamp BigInt?
|
||||
Location Location[]
|
||||
|
||||
@@map("instance")
|
||||
}
|
||||
@@ -168,8 +169,12 @@ model Location {
|
||||
|
||||
scan_state Int @default(0) // Enum: sd_core::location::ScanState
|
||||
|
||||
device_pub_id Bytes
|
||||
device Device @relation(fields: [device_pub_id], references: [pub_id], onDelete: Cascade)
|
||||
device_pub_id Bytes?
|
||||
device Device? @relation(fields: [device_pub_id], references: [pub_id], onDelete: Cascade)
|
||||
|
||||
// this should just be a local-only cache but it's too much effort to broadcast online locations rn (@brendan)
|
||||
instance_id Int?
|
||||
instance Instance? @relation(fields: [instance_id], references: [id], onDelete: SetNull)
|
||||
|
||||
file_paths FilePath[]
|
||||
indexer_rules IndexerRulesInLocation[]
|
||||
@@ -585,8 +590,8 @@ model StorageStatistics {
|
||||
total_capacity BigInt @default(0)
|
||||
available_capacity BigInt @default(0)
|
||||
|
||||
device_pub_id Bytes @unique
|
||||
device Device @relation(fields: [device_pub_id], references: [pub_id], onDelete: Cascade)
|
||||
device_pub_id Bytes? @unique
|
||||
device Device? @relation(fields: [device_pub_id], references: [pub_id], onDelete: Cascade)
|
||||
|
||||
@@map("storage_statistics")
|
||||
}
|
||||
|
||||
@@ -2,7 +2,9 @@ use crate::api::{utils::library, Ctx, R};
|
||||
|
||||
use sd_cloud_schema::{auth::AccessToken, devices, libraries};
|
||||
|
||||
use futures_concurrency::future::TryJoin;
|
||||
use rspc::alpha::AlphaRouter;
|
||||
use serde::Deserialize;
|
||||
use tracing::debug;
|
||||
|
||||
use super::try_get_cloud_services_client;
|
||||
@@ -42,25 +44,25 @@ pub fn mount() -> AlphaRouter<Ctx> {
|
||||
})
|
||||
})
|
||||
.procedure("create", {
|
||||
#[derive(Debug, serde::Serialize, serde::Deserialize, specta::Type)]
|
||||
struct LibrariesCreateArgs {
|
||||
access_token: AccessToken,
|
||||
device_pub_id: devices::PubId,
|
||||
}
|
||||
|
||||
R.with2(library())
|
||||
.mutation(|(node, library), args: LibrariesCreateArgs| async move {
|
||||
let req = libraries::create::Request {
|
||||
name: library.config().await.name.to_string(),
|
||||
access_token: args.access_token,
|
||||
pub_id: libraries::PubId(library.id),
|
||||
device_pub_id: args.device_pub_id,
|
||||
};
|
||||
.mutation(|(node, library), access_token: AccessToken| async move {
|
||||
let (client, name, device_pub_id) = (
|
||||
try_get_cloud_services_client(&node),
|
||||
async { Ok(library.config().await.name.to_string()) },
|
||||
async { Ok(devices::PubId(node.config.get().await.id.into())) },
|
||||
)
|
||||
.try_join()
|
||||
.await?;
|
||||
|
||||
super::handle_comm_error(
|
||||
try_get_cloud_services_client(&node)
|
||||
.await?
|
||||
client
|
||||
.libraries()
|
||||
.create(req)
|
||||
.create(libraries::create::Request {
|
||||
name,
|
||||
access_token,
|
||||
pub_id: libraries::PubId(library.id),
|
||||
device_pub_id,
|
||||
})
|
||||
.await,
|
||||
"Failed to create library;",
|
||||
)??;
|
||||
@@ -69,35 +71,59 @@ pub fn mount() -> AlphaRouter<Ctx> {
|
||||
})
|
||||
})
|
||||
.procedure("delete", {
|
||||
R.mutation(|node, req: libraries::delete::Request| async move {
|
||||
super::handle_comm_error(
|
||||
try_get_cloud_services_client(&node)
|
||||
.await?
|
||||
.libraries()
|
||||
.delete(req)
|
||||
.await,
|
||||
"Failed to delete library;",
|
||||
)??;
|
||||
R.with2(library())
|
||||
.mutation(|(node, library), access_token: AccessToken| async move {
|
||||
super::handle_comm_error(
|
||||
try_get_cloud_services_client(&node)
|
||||
.await?
|
||||
.libraries()
|
||||
.delete(libraries::delete::Request {
|
||||
access_token,
|
||||
pub_id: libraries::PubId(library.id),
|
||||
})
|
||||
.await,
|
||||
"Failed to delete library;",
|
||||
)??;
|
||||
|
||||
debug!("Deleted library");
|
||||
debug!("Deleted library");
|
||||
|
||||
Ok(())
|
||||
})
|
||||
Ok(())
|
||||
})
|
||||
})
|
||||
.procedure("update", {
|
||||
R.mutation(|node, req: libraries::update::Request| async move {
|
||||
super::handle_comm_error(
|
||||
try_get_cloud_services_client(&node)
|
||||
.await?
|
||||
.libraries()
|
||||
.update(req)
|
||||
.await,
|
||||
"Failed to update library;",
|
||||
)??;
|
||||
#[derive(Deserialize, specta::Type)]
|
||||
struct LibrariesUpdateArgs {
|
||||
access_token: AccessToken,
|
||||
name: String,
|
||||
}
|
||||
|
||||
debug!("Updated library");
|
||||
R.with2(library()).mutation(
|
||||
|(node, library),
|
||||
LibrariesUpdateArgs { access_token, name }: LibrariesUpdateArgs| async move {
|
||||
super::handle_comm_error(
|
||||
try_get_cloud_services_client(&node)
|
||||
.await?
|
||||
.libraries()
|
||||
.update(libraries::update::Request {
|
||||
access_token,
|
||||
pub_id: libraries::PubId(library.id),
|
||||
name,
|
||||
})
|
||||
.await,
|
||||
"Failed to update library;",
|
||||
)??;
|
||||
|
||||
Ok(())
|
||||
})
|
||||
debug!("Updated library");
|
||||
|
||||
Ok(())
|
||||
},
|
||||
)
|
||||
})
|
||||
.procedure("sync", {
|
||||
R.with2(library())
|
||||
.mutation(|(_, library), _: ()| async move {
|
||||
library.do_cloud_sync();
|
||||
Ok(())
|
||||
})
|
||||
})
|
||||
}
|
||||
|
||||
@@ -1,134 +0,0 @@
|
||||
// This file is being deprecated in favor of libraries.rs
|
||||
// This is due to the migration to the new API system, but the frontend is still using this file
|
||||
|
||||
use crate::{api::utils::library, invalidate_query};
|
||||
|
||||
use super::*;
|
||||
|
||||
pub fn mount() -> AlphaRouter<Ctx> {
|
||||
R.router()
|
||||
.procedure("get", {
|
||||
R.with2(library())
|
||||
.query(|(node, library), _: ()| async move {
|
||||
// Ok(
|
||||
// sd_cloud_api::library::get(node.cloud_api_config().await, library.id)
|
||||
// .await?,
|
||||
// )
|
||||
|
||||
Ok(())
|
||||
})
|
||||
})
|
||||
.procedure("list", {
|
||||
R.query(|node, _: ()| async move {
|
||||
// Ok(sd_cloud_api::library::list(node.cloud_api_config().await).await?)
|
||||
Ok(())
|
||||
})
|
||||
})
|
||||
.procedure("create", {
|
||||
R.with2(library())
|
||||
.mutation(|(node, library), _: ()| async move {
|
||||
// let node_config = node.config.get().await;
|
||||
// let cloud_library = sd_cloud_api::library::create(
|
||||
// node.cloud_api_config().await,
|
||||
// library.id,
|
||||
// &library.config().await.name,
|
||||
// library.instance_uuid,
|
||||
// library.identity.to_remote_identity(),
|
||||
// node_config.id,
|
||||
// node_config.identity.to_remote_identity(),
|
||||
// &node.p2p.peer_metadata(),
|
||||
// )
|
||||
// .await?;
|
||||
// node.libraries
|
||||
// .edit(
|
||||
// library.id,
|
||||
// None,
|
||||
// MaybeUndefined::Undefined,
|
||||
// MaybeUndefined::Value(cloud_library.id),
|
||||
// None,
|
||||
// )
|
||||
// .await?;
|
||||
|
||||
invalidate_query!(library, "cloud.library.get");
|
||||
|
||||
Ok(())
|
||||
})
|
||||
})
|
||||
.procedure("join", {
|
||||
R.mutation(|node, library_id: Uuid| async move {
|
||||
// let Some(cloud_library) =
|
||||
// sd_cloud_api::library::get(node.cloud_api_config().await, library_id).await?
|
||||
// else {
|
||||
// return Err(rspc::Error::new(
|
||||
// rspc::ErrorCode::NotFound,
|
||||
// "Library not found".to_string(),
|
||||
// ));
|
||||
// };
|
||||
|
||||
// let library = node
|
||||
// .libraries
|
||||
// .create_with_uuid(
|
||||
// library_id,
|
||||
// LibraryName::new(cloud_library.name).map_err(|e| {
|
||||
// rspc::Error::new(rspc::ErrorCode::InternalServerError, e.to_string())
|
||||
// })?,
|
||||
// None,
|
||||
// false,
|
||||
// None,
|
||||
// &node,
|
||||
// true,
|
||||
// )
|
||||
// .await?;
|
||||
// node.libraries
|
||||
// .edit(
|
||||
// library.id,
|
||||
// None,
|
||||
// MaybeUndefined::Undefined,
|
||||
// MaybeUndefined::Value(cloud_library.id),
|
||||
// None,
|
||||
// )
|
||||
// .await?;
|
||||
|
||||
// let node_config = node.config.get().await;
|
||||
// let instances = sd_cloud_api::library::join(
|
||||
// node.cloud_api_config().await,
|
||||
// library_id,
|
||||
// library.instance_uuid,
|
||||
// library.identity.to_remote_identity(),
|
||||
// node_config.id,
|
||||
// node_config.identity.to_remote_identity(),
|
||||
// node.p2p.peer_metadata(),
|
||||
// )
|
||||
// .await?;
|
||||
|
||||
// for instance in instances {
|
||||
// crate::cloud::sync::receive::upsert_instance(
|
||||
// library.id,
|
||||
// &library.db,
|
||||
// &library.sync,
|
||||
// &node.libraries,
|
||||
// &instance.uuid,
|
||||
// instance.identity,
|
||||
// &instance.node_id,
|
||||
// RemoteIdentity::from_str(&instance.node_remote_identity)
|
||||
// .expect("malformed remote identity in the DB"),
|
||||
// instance.metadata,
|
||||
// )
|
||||
// .await?;
|
||||
// }
|
||||
|
||||
// invalidate_query!(library, "cloud.library.get");
|
||||
// invalidate_query!(library, "cloud.library.list");
|
||||
|
||||
// Ok(LibraryConfigWrapped::from_library(&library).await)
|
||||
Ok(())
|
||||
})
|
||||
})
|
||||
.procedure("sync", {
|
||||
R.with2(library())
|
||||
.mutation(|(_, library), _: ()| async move {
|
||||
library.do_cloud_sync();
|
||||
Ok(())
|
||||
})
|
||||
})
|
||||
}
|
||||
@@ -1,11 +1,12 @@
|
||||
use crate::{node::config::NodeConfig, volume::get_volumes, Node};
|
||||
|
||||
use sd_core_cloud_services::{CloudP2P, IrohSecretKey, KeyManager, QuinnConnection, UserResponse};
|
||||
|
||||
use sd_cloud_schema::{
|
||||
auth,
|
||||
error::{ClientSideError, Error},
|
||||
users, Client, Service,
|
||||
};
|
||||
use sd_core_cloud_services::{CloudP2P, IrohSecretKey, KeyManager, QuinnConnection, UserResponse};
|
||||
use sd_crypto::{CryptoRng, SeedableRng};
|
||||
|
||||
use std::pin::pin;
|
||||
@@ -14,13 +15,11 @@ use async_stream::stream;
|
||||
use futures::StreamExt;
|
||||
use rspc::alpha::AlphaRouter;
|
||||
use tracing::error;
|
||||
use uuid::Uuid;
|
||||
|
||||
use super::{Ctx, R};
|
||||
|
||||
mod devices;
|
||||
mod libraries;
|
||||
mod library;
|
||||
mod locations;
|
||||
|
||||
async fn try_get_cloud_services_client(
|
||||
@@ -69,7 +68,7 @@ pub(crate) fn mount() -> AlphaRouter<Ctx> {
|
||||
|
||||
let (device_pub_id, name, os) = {
|
||||
let NodeConfig { id, name, os, .. } = node.config.get().await;
|
||||
(devices::PubId(id), name, os)
|
||||
(devices::PubId(id.into()), name, os)
|
||||
};
|
||||
let mut hasher = blake3::Hasher::new();
|
||||
hasher.update(device_pub_id.0.as_bytes().as_slice());
|
||||
|
||||
@@ -3,13 +3,16 @@ use crate::{
|
||||
library::LibraryId,
|
||||
node::{
|
||||
config::{is_in_docker, NodeConfig, NodeConfigP2P, NodePreferences},
|
||||
get_hardware_model_name, HardwareModel,
|
||||
HardwareModel,
|
||||
},
|
||||
old_job::JobProgressEvent,
|
||||
Node,
|
||||
};
|
||||
|
||||
use sd_core_heavy_lifting::media_processor::ThumbKey;
|
||||
use sd_core_sync::DevicePubId;
|
||||
|
||||
use sd_cloud_schema::devices::DeviceOS;
|
||||
use sd_p2p::RemoteIdentity;
|
||||
use sd_prisma::prisma::file_path;
|
||||
|
||||
@@ -20,7 +23,6 @@ use rspc::{alpha::Rspc, Config, ErrorCode};
|
||||
use serde::{Deserialize, Serialize};
|
||||
use specta::Type;
|
||||
use tracing::warn;
|
||||
use uuid::Uuid;
|
||||
|
||||
mod backups;
|
||||
mod cloud;
|
||||
@@ -86,13 +88,15 @@ pub enum BackendFeature {}
|
||||
#[derive(Debug, Serialize, Deserialize, Clone, Type)]
|
||||
pub struct SanitizedNodeConfig {
|
||||
/// id is a unique identifier for the current node. Each node has a public identifier (this one) and is given a local id for each library (done within the library code).
|
||||
pub id: Uuid,
|
||||
pub id: DevicePubId,
|
||||
/// name is the display name of the current node. This is set by the user and is shown in the UI. // TODO: Length validation so it can fit in DNS record
|
||||
pub name: String,
|
||||
pub identity: RemoteIdentity,
|
||||
pub p2p: NodeConfigP2P,
|
||||
pub features: Vec<BackendFeature>,
|
||||
pub preferences: NodePreferences,
|
||||
pub os: DeviceOS,
|
||||
pub hardware_model: HardwareModel,
|
||||
}
|
||||
|
||||
impl From<NodeConfig> for SanitizedNodeConfig {
|
||||
@@ -104,6 +108,8 @@ impl From<NodeConfig> for SanitizedNodeConfig {
|
||||
p2p: value.p2p,
|
||||
features: value.features,
|
||||
preferences: value.preferences,
|
||||
os: value.os,
|
||||
hardware_model: value.hardware_model,
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -136,12 +142,11 @@ pub(crate) fn mount() -> Arc<Router> {
|
||||
})
|
||||
.procedure("nodeState", {
|
||||
R.query(|node, _: ()| async move {
|
||||
let device_model = get_hardware_model_name()
|
||||
.unwrap_or(HardwareModel::Other)
|
||||
.to_string();
|
||||
let config = SanitizedNodeConfig::from(node.config.get().await);
|
||||
|
||||
Ok(NodeState {
|
||||
config: node.config.get().await.into(),
|
||||
device_model: Some(config.hardware_model.to_string()),
|
||||
config,
|
||||
// We are taking the assumption here that this value is only used on the frontend for display purposes
|
||||
data_path: node
|
||||
.config
|
||||
@@ -149,7 +154,6 @@ pub(crate) fn mount() -> Arc<Router> {
|
||||
.to_str()
|
||||
.expect("Found non-UTF-8 path")
|
||||
.to_string(),
|
||||
device_model: Some(device_model),
|
||||
is_in_docker: is_in_docker(),
|
||||
})
|
||||
})
|
||||
|
||||
@@ -5,9 +5,10 @@ use crate::{
|
||||
node::config::{P2PDiscoveryState, Port},
|
||||
};
|
||||
|
||||
use sd_prisma::prisma::{instance, location};
|
||||
use sd_prisma::prisma::location;
|
||||
|
||||
use rspc::{alpha::AlphaRouter, ErrorCode};
|
||||
use sd_utils::uuid_to_bytes;
|
||||
use serde::Deserialize;
|
||||
use specta::Type;
|
||||
use tracing::error;
|
||||
@@ -88,27 +89,16 @@ pub(crate) fn mount() -> AlphaRouter<Ctx> {
|
||||
.procedure("listLocations", {
|
||||
R.with2(library())
|
||||
// TODO: I don't like this. `node_id` should probs be a machine hash or something cause `node_id` is dynamic in the context of P2P and what does it mean for removable media to be owned by a node?
|
||||
.query(|(_, library), node_id: Option<Uuid>| async move {
|
||||
// Be aware multiple instances can exist on a single node. This is generally an edge case but it's possible.
|
||||
let instances = library
|
||||
.db
|
||||
.instance()
|
||||
.find_many(vec![node_id
|
||||
.map(|id| instance::node_id::equals(id.as_bytes().to_vec()))
|
||||
.unwrap_or(instance::id::equals(
|
||||
library.config().await.instance_id,
|
||||
))])
|
||||
.exec()
|
||||
.await?;
|
||||
|
||||
.query(|(_, library), device_pub_id: Option<Uuid>| async move {
|
||||
Ok(library
|
||||
.db
|
||||
.location()
|
||||
.find_many(
|
||||
instances
|
||||
.into_iter()
|
||||
.map(|i| location::instance_id::equals(Some(i.id)))
|
||||
.collect(),
|
||||
device_pub_id
|
||||
.map(|id| {
|
||||
vec![location::device_pub_id::equals(Some(uuid_to_bytes(&id)))]
|
||||
})
|
||||
.unwrap_or_default(),
|
||||
)
|
||||
.exec()
|
||||
.await?
|
||||
|
||||
@@ -66,7 +66,7 @@ pub(crate) fn mount() -> AlphaRouter<Ctx> {
|
||||
|
||||
|(_, library), args: Args| async move {
|
||||
let Library { db, sync, .. } = library.as_ref();
|
||||
let pub_id = Uuid::new_v4().as_bytes().to_vec();
|
||||
let pub_id = Uuid::now_v7().as_bytes().to_vec();
|
||||
let date_created: DateTime<FixedOffset> = Utc::now().into();
|
||||
|
||||
let (sync_params, db_params): (Vec<_>, Vec<_>) = chain_optional_iter(
|
||||
|
||||
@@ -46,12 +46,7 @@ pub(crate) fn mount() -> AlphaRouter<Ctx> {
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
sd_core_sync::backfill::backfill_operations(
|
||||
&library.db,
|
||||
&library.sync,
|
||||
library.config().await.instance_id,
|
||||
)
|
||||
.await?;
|
||||
sd_core_sync::backfill::backfill_operations(&library.db, &library.sync).await?;
|
||||
|
||||
node.libraries
|
||||
.edit(
|
||||
|
||||
@@ -221,7 +221,7 @@ pub(crate) fn mount() -> AlphaRouter<Ctx> {
|
||||
.iter()
|
||||
.filter(|fp| fp.is_dir.unwrap_or_default() && fp.object.is_none())
|
||||
.map(|fp| {
|
||||
let id = uuid_to_bytes(&Uuid::new_v4());
|
||||
let id = uuid_to_bytes(&Uuid::now_v7());
|
||||
|
||||
sync_params.extend(sync.shared_create(
|
||||
prisma_sync::object::SyncId { pub_id: id.clone() },
|
||||
|
||||
@@ -90,7 +90,7 @@ pub async fn run_actor(
|
||||
sync.ingest
|
||||
.event_tx
|
||||
.send(sd_core_sync::Event::Messages(MessagesEvent {
|
||||
instance_id: sync.instance,
|
||||
device_pub_id: sync.device_pub_id.clone(),
|
||||
has_more: ops.len() == OPS_PER_REQUEST as usize,
|
||||
messages: CompressedCRDTOperationsPerModelPerDevice::new(ops),
|
||||
wait_tx: Some(wait_tx)
|
||||
|
||||
@@ -2,8 +2,9 @@ use crate::{library::Libraries, Node};
|
||||
|
||||
use futures::FutureExt;
|
||||
use sd_actors::Stopper;
|
||||
use sd_core_sync::DevicePubId;
|
||||
use sd_p2p::RemoteIdentity;
|
||||
use sd_prisma::prisma::{cloud_crdt_operation, instance, PrismaClient};
|
||||
use sd_prisma::prisma::{cloud_crdt_operation, device, instance, PrismaClient};
|
||||
use sd_sync::CRDTOperation;
|
||||
use sd_utils::uuid_to_bytes;
|
||||
|
||||
@@ -239,7 +240,7 @@ async fn write_cloud_ops_to_db(
|
||||
fn crdt_op_db(op: &CRDTOperation) -> cloud_crdt_operation::Create {
|
||||
cloud_crdt_operation::Create {
|
||||
timestamp: op.timestamp.0 as i64,
|
||||
instance: instance::pub_id::equals(op.device_pub_id.as_bytes().to_vec()),
|
||||
device: device::pub_id::equals(uuid_to_bytes(&op.device_pub_id)),
|
||||
kind: op.data.as_kind().to_string(),
|
||||
data: to_vec(&op.data).expect("unable to serialize data"),
|
||||
model: op.model_id as i32,
|
||||
@@ -247,50 +248,3 @@ fn crdt_op_db(op: &CRDTOperation) -> cloud_crdt_operation::Create {
|
||||
_params: vec![],
|
||||
}
|
||||
}
|
||||
|
||||
#[allow(clippy::too_many_arguments)]
|
||||
pub async fn upsert_instance(
|
||||
library_id: Uuid,
|
||||
db: &PrismaClient,
|
||||
sync: &sd_core_sync::Manager,
|
||||
libraries: &Libraries,
|
||||
uuid: &Uuid,
|
||||
identity: RemoteIdentity,
|
||||
node_id: &Uuid,
|
||||
node_remote_identity: RemoteIdentity,
|
||||
metadata: HashMap<String, String>,
|
||||
) -> prisma_client_rust::Result<()> {
|
||||
db.instance()
|
||||
.upsert(
|
||||
instance::pub_id::equals(uuid_to_bytes(uuid)),
|
||||
instance::create(
|
||||
uuid_to_bytes(uuid),
|
||||
identity.get_bytes().to_vec(),
|
||||
node_id.as_bytes().to_vec(),
|
||||
Utc::now().into(),
|
||||
Utc::now().into(),
|
||||
vec![
|
||||
instance::node_remote_identity::set(Some(
|
||||
node_remote_identity.get_bytes().to_vec(),
|
||||
)),
|
||||
instance::metadata::set(Some(
|
||||
serde_json::to_vec(&metadata).expect("unable to serialize metadata"),
|
||||
)),
|
||||
],
|
||||
),
|
||||
vec![],
|
||||
)
|
||||
.exec()
|
||||
.await?;
|
||||
|
||||
sync.timestamp_per_device
|
||||
.write()
|
||||
.await
|
||||
.entry(*uuid)
|
||||
.or_default();
|
||||
|
||||
// Called again so the new instances are picked up
|
||||
libraries.update_instances_by_id(library_id).await;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@@ -4,7 +4,7 @@ use crate::{
|
||||
};
|
||||
|
||||
use sd_p2p::{Identity, RemoteIdentity};
|
||||
use sd_prisma::prisma::{file_path, indexer_rule, instance, location, node, PrismaClient};
|
||||
use sd_prisma::prisma::{file_path, indexer_rule, instance, location, PrismaClient};
|
||||
use sd_utils::{db::maybe_missing, error::FileIOError};
|
||||
|
||||
use std::{
|
||||
@@ -12,7 +12,6 @@ use std::{
|
||||
sync::{atomic::AtomicBool, Arc},
|
||||
};
|
||||
|
||||
use chrono::Utc;
|
||||
use int_enum::IntEnum;
|
||||
use prisma_client_rust::not;
|
||||
use serde::{Deserialize, Serialize};
|
||||
@@ -167,34 +166,8 @@ impl LibraryConfig {
|
||||
}
|
||||
|
||||
(LibraryConfigVersion::V2, LibraryConfigVersion::V3) => {
|
||||
// The fact I have to migrate this hurts my soul
|
||||
if db.node().count(vec![]).exec().await? != 1 {
|
||||
return Err(LibraryConfigError::TooManyNodes);
|
||||
}
|
||||
|
||||
db.node()
|
||||
.update_many(
|
||||
vec![],
|
||||
vec![node::pub_id::set(node_config.id.as_bytes().to_vec())],
|
||||
)
|
||||
.exec()
|
||||
.await?;
|
||||
|
||||
let mut config = serde_json::from_slice::<Map<String, Value>>(
|
||||
&fs::read(path).await.map_err(|e| {
|
||||
VersionManagerError::FileIO(FileIOError::from((path, e)))
|
||||
})?,
|
||||
)
|
||||
.map_err(VersionManagerError::SerdeJson)?;
|
||||
|
||||
config.insert(String::from("node_id"), json!(node_config.id.to_string()));
|
||||
|
||||
fs::write(
|
||||
path,
|
||||
&serde_json::to_vec(&config).map_err(VersionManagerError::SerdeJson)?,
|
||||
)
|
||||
.await
|
||||
.map_err(|e| VersionManagerError::FileIO(FileIOError::from((path, e))))?;
|
||||
// Removed, can't be automatically updated
|
||||
return Err(LibraryConfigError::CriticalUpdateError);
|
||||
}
|
||||
|
||||
(LibraryConfigVersion::V3, LibraryConfigVersion::V4) => {
|
||||
@@ -255,51 +228,8 @@ impl LibraryConfig {
|
||||
},
|
||||
|
||||
(LibraryConfigVersion::V5, LibraryConfigVersion::V6) => {
|
||||
let nodes = db.node().find_many(vec![]).exec().await?;
|
||||
if nodes.is_empty() {
|
||||
error!("6 - No nodes found... How did you even get this far? but this is fine we can fix it.");
|
||||
} else if nodes.len() > 1 {
|
||||
error!("6 - More than one node found in the DB... This can't be automatically reconciled!");
|
||||
return Err(LibraryConfigError::TooManyNodes);
|
||||
}
|
||||
|
||||
let node = nodes.first();
|
||||
let now = Utc::now().fixed_offset();
|
||||
let instance_id = Uuid::new_v4();
|
||||
|
||||
instance::Create {
|
||||
pub_id: instance_id.as_bytes().to_vec(),
|
||||
// WARNING: At this stage in the migration this field *should* be an `Identity` not a `RemoteIdentityOrIdentity` (as that was introduced later on).
|
||||
remote_identity: node
|
||||
.and_then(|n| n.identity.clone())
|
||||
.unwrap_or_else(|| Identity::new().to_bytes()),
|
||||
node_id: node_config.id.as_bytes().to_vec(),
|
||||
last_seen: now,
|
||||
date_created: node.map(|n| n.date_created).unwrap_or_else(|| now),
|
||||
_params: vec![],
|
||||
}
|
||||
.to_query(db)
|
||||
.exec()
|
||||
.await?;
|
||||
|
||||
let mut config = serde_json::from_slice::<Map<String, Value>>(
|
||||
&fs::read(path).await.map_err(|e| {
|
||||
VersionManagerError::FileIO(FileIOError::from((path, e)))
|
||||
})?,
|
||||
)
|
||||
.map_err(VersionManagerError::SerdeJson)?;
|
||||
|
||||
config.remove("node_id");
|
||||
config.remove("identity");
|
||||
|
||||
config.insert(String::from("instance_id"), json!(instance_id.to_string()));
|
||||
|
||||
fs::write(
|
||||
path,
|
||||
&serde_json::to_vec(&config).map_err(VersionManagerError::SerdeJson)?,
|
||||
)
|
||||
.await
|
||||
.map_err(|e| VersionManagerError::FileIO(FileIOError::from((path, e))))?;
|
||||
// Removed, can't be automatically updated
|
||||
return Err(LibraryConfigError::CriticalUpdateError);
|
||||
}
|
||||
|
||||
(LibraryConfigVersion::V6, LibraryConfigVersion::V7) => {
|
||||
@@ -344,7 +274,7 @@ impl LibraryConfig {
|
||||
}
|
||||
|
||||
(LibraryConfigVersion::V7, LibraryConfigVersion::V8) => {
|
||||
let instances = db.instance().find_many(vec![]).exec().await?;
|
||||
let instances = db.device().find_many(vec![]).exec().await?;
|
||||
let Some(instance) = instances.first() else {
|
||||
error!("8 - No nodes found... How did you even get this far?!");
|
||||
return Err(LibraryConfigError::MissingInstance);
|
||||
@@ -498,6 +428,8 @@ pub enum LibraryConfigError {
|
||||
TooManyInstances,
|
||||
#[error("missing instances")]
|
||||
MissingInstance,
|
||||
#[error("your library version can't be automatically updated, please recreate your library")]
|
||||
CriticalUpdateError,
|
||||
|
||||
#[error(transparent)]
|
||||
SerdeJson(#[from] serde_json::Error),
|
||||
|
||||
@@ -1,6 +1,7 @@
|
||||
use crate::{library::LibraryConfigError, location::LocationManagerError};
|
||||
|
||||
use sd_core_indexer_rules::seed::SeederError;
|
||||
use sd_core_sync::DevicePubId;
|
||||
|
||||
use sd_p2p::IdentityErr;
|
||||
use sd_utils::{
|
||||
@@ -8,10 +9,9 @@ use sd_utils::{
|
||||
error::{FileIOError, NonUtf8PathError},
|
||||
};
|
||||
|
||||
use thiserror::Error;
|
||||
use tracing::error;
|
||||
|
||||
#[derive(Error, Debug)]
|
||||
#[derive(thiserror::Error, Debug)]
|
||||
pub enum LibraryManagerError {
|
||||
#[error("error serializing or deserializing the JSON in the config file: {0}")]
|
||||
Json(#[from] serde_json::Error),
|
||||
@@ -23,8 +23,6 @@ pub enum LibraryManagerError {
|
||||
Uuid(#[from] uuid::Error),
|
||||
#[error("failed to run indexer rules seeder: {0}")]
|
||||
IndexerRulesSeeder(#[from] SeederError),
|
||||
// #[error("failed to initialize the key manager: {0}")]
|
||||
// KeyManager(#[from] sd_crypto::Error),
|
||||
#[error("error migrating the library: {0}")]
|
||||
MigrationError(#[from] db::MigrationError),
|
||||
#[error("invalid library configuration: {0}")]
|
||||
@@ -39,6 +37,8 @@ pub enum LibraryManagerError {
|
||||
InvalidIdentity,
|
||||
#[error("current instance with id '{0}' was not found in the database")]
|
||||
CurrentInstanceNotFound(String),
|
||||
#[error("current device with pub id '{0}' was not found in the database")]
|
||||
CurrentDeviceNotFound(DevicePubId),
|
||||
#[error("missing-field: {0}")]
|
||||
MissingField(#[from] MissingFieldError),
|
||||
|
||||
|
||||
@@ -156,7 +156,7 @@ impl Libraries {
|
||||
description: Option<String>,
|
||||
node: &Arc<Node>,
|
||||
) -> Result<Arc<Library>, LibraryManagerError> {
|
||||
self.create_with_uuid(Uuid::new_v4(), name, description, true, None, node, false)
|
||||
self.create_with_uuid(Uuid::now_v7(), name, description, true, None, node, false)
|
||||
.await
|
||||
}
|
||||
|
||||
@@ -206,9 +206,9 @@ impl Libraries {
|
||||
Some({
|
||||
let identity = Identity::new();
|
||||
let mut create = instance.unwrap_or_else(|| instance::Create {
|
||||
pub_id: Uuid::new_v4().as_bytes().to_vec(),
|
||||
pub_id: Uuid::now_v7().as_bytes().to_vec(),
|
||||
remote_identity: identity.to_remote_identity().get_bytes().to_vec(),
|
||||
node_id: node_cfg.id.as_bytes().to_vec(),
|
||||
node_id: node_cfg.id.to_db(),
|
||||
last_seen: now,
|
||||
date_created: now,
|
||||
_params: vec![
|
||||
@@ -458,6 +458,7 @@ impl Libraries {
|
||||
}
|
||||
|
||||
let node_config = node.config.get().await;
|
||||
let device_pub_id = node_config.id.clone();
|
||||
let config = LibraryConfig::load(config_path, &node_config, &db).await?;
|
||||
|
||||
let instances = db.instance().find_many(vec![]).exec().await?;
|
||||
@@ -470,6 +471,17 @@ impl Libraries {
|
||||
})?
|
||||
.clone();
|
||||
|
||||
let devices = db.device().find_many(vec![]).exec().await?;
|
||||
|
||||
let device_pub_id_to_db = device_pub_id.to_db();
|
||||
if devices
|
||||
.iter()
|
||||
.find(|device| device.pub_id == device_pub_id_to_db)
|
||||
.is_none()
|
||||
{
|
||||
return Err(LibraryManagerError::CurrentDeviceNotFound(device_pub_id));
|
||||
}
|
||||
|
||||
let identity = match instance.identity.as_ref() {
|
||||
Some(b) => Arc::new(Identity::from_bytes(b)?),
|
||||
// We are not this instance, so we don't have the private key.
|
||||
@@ -486,7 +498,7 @@ impl Libraries {
|
||||
.node_remote_identity
|
||||
.as_ref()
|
||||
.and_then(|v| RemoteIdentity::from_bytes(v).ok());
|
||||
if instance_node_id != node_config.id
|
||||
if instance_node_id != Uuid::from(&node_config.id)
|
||||
|| instance_node_remote_identity != Some(node_config.identity.to_remote_identity())
|
||||
|| curr_metadata != Some(node.p2p.peer_metadata())
|
||||
{
|
||||
@@ -502,7 +514,7 @@ impl Libraries {
|
||||
.update(
|
||||
instance::id::equals(instance.id),
|
||||
vec![
|
||||
instance::node_id::set(node_config.id.as_bytes().to_vec()),
|
||||
instance::node_id::set(node_config.id.to_db()),
|
||||
instance::node_remote_identity::set(Some(
|
||||
node_config
|
||||
.identity
|
||||
@@ -522,16 +534,13 @@ impl Libraries {
|
||||
|
||||
// TODO: Move this reconciliation into P2P and do reconciliation of both local and remote nodes.
|
||||
|
||||
// let key_manager = Arc::new(KeyManager::new(vec![]).await?);
|
||||
// seed_keymanager(&db, &key_manager).await?;
|
||||
|
||||
let actors = Default::default();
|
||||
|
||||
let (sync, sync_rx) = sync::Manager::with_existing_instances(
|
||||
let (sync, sync_rx) = sync::Manager::with_existing_devices(
|
||||
Arc::clone(&db),
|
||||
instance_id,
|
||||
&device_pub_id,
|
||||
Arc::clone(&config.generate_sync_operations),
|
||||
&instances,
|
||||
&devices,
|
||||
Arc::clone(&actors),
|
||||
)
|
||||
.await?;
|
||||
|
||||
@@ -3,7 +3,7 @@ use crate::{
|
||||
Node,
|
||||
};
|
||||
|
||||
use sd_core_prisma_helpers::location_ids_and_path;
|
||||
use sd_core_prisma_helpers::{location_ids_and_path, DevicePubId};
|
||||
|
||||
use sd_prisma::prisma::location;
|
||||
use sd_utils::db::maybe_missing;
|
||||
@@ -38,14 +38,16 @@ type LocationIdAndLibraryId = (location::id::Type, LibraryId);
|
||||
|
||||
struct Runner {
|
||||
node: Arc<Node>,
|
||||
device_pub_id_to_db: Option<Vec<u8>>,
|
||||
locations_to_check: HashMap<location::id::Type, Arc<Library>>,
|
||||
locations_watched: HashMap<LocationIdAndLibraryId, LocationWatcher>,
|
||||
locations_unwatched: HashMap<LocationIdAndLibraryId, LocationWatcher>,
|
||||
forced_unwatch: HashSet<LocationIdAndLibraryId>,
|
||||
}
|
||||
impl Runner {
|
||||
fn new(node: Arc<Node>) -> Self {
|
||||
async fn new(node: Arc<Node>) -> Self {
|
||||
Self {
|
||||
device_pub_id_to_db: Some(DevicePubId::from(node.config.get().await.id).to_db()),
|
||||
node,
|
||||
locations_to_check: HashMap::new(),
|
||||
locations_watched: HashMap::new(),
|
||||
@@ -54,13 +56,17 @@ impl Runner {
|
||||
}
|
||||
}
|
||||
|
||||
fn check_same_device(&self, location: &location_ids_and_path::Data) -> bool {
|
||||
location.device_pub_id == self.device_pub_id_to_db
|
||||
}
|
||||
|
||||
async fn add_location(
|
||||
&mut self,
|
||||
location_id: i32,
|
||||
library: Arc<Library>,
|
||||
) -> Result<(), LocationManagerError> {
|
||||
if let Some(location) = get_location(location_id, &library).await? {
|
||||
check_online(&location, &self.node, &library)
|
||||
check_online(&location, &self.node, &library, &self.device_pub_id_to_db)
|
||||
.await
|
||||
.and_then(|is_online| {
|
||||
LocationWatcher::new(location, Arc::clone(&library), Arc::clone(&self.node))
|
||||
@@ -92,8 +98,7 @@ impl Runner {
|
||||
let key = (location_id, library.id);
|
||||
|
||||
if let Some(location) = get_location(location_id, &library).await? {
|
||||
// TODO(N): This isn't gonna work with removable media and this will likely permanently break if the DB is restored from a backup.
|
||||
if location.instance_id == Some(library.config().await.instance_id) {
|
||||
if self.check_same_device(&location) {
|
||||
self.unwatch_location(location, library.id);
|
||||
self.locations_unwatched.remove(&key);
|
||||
self.forced_unwatch.remove(&key);
|
||||
@@ -101,7 +106,7 @@ impl Runner {
|
||||
self.drop_location(
|
||||
location_id,
|
||||
library.id,
|
||||
"Dropping location from location manager, because we don't have a `local_path` anymore",
|
||||
"Dropping location from location manager, because it isn't from this device",
|
||||
);
|
||||
}
|
||||
} else {
|
||||
@@ -298,9 +303,8 @@ impl Runner {
|
||||
let key = (location_id, library.id);
|
||||
|
||||
if let Some(location) = get_location(location_id, &library).await? {
|
||||
// TODO(N): This isn't gonna work with removable media and this will likely permanently break if the DB is restored from a backup.
|
||||
if location.instance_id == Some(library.config().await.instance_id) {
|
||||
if check_online(&location, &self.node, &library).await?
|
||||
if self.check_same_device(&location) {
|
||||
if check_online(&location, &self.node, &library, &self.device_pub_id_to_db).await?
|
||||
&& !self.forced_unwatch.contains(&key)
|
||||
{
|
||||
self.watch_location(location, library.id);
|
||||
@@ -314,7 +318,7 @@ impl Runner {
|
||||
location_id,
|
||||
library.id,
|
||||
"Dropping location from location manager, because \
|
||||
it isn't a location in the current node",
|
||||
it isn't a location in the current device",
|
||||
);
|
||||
self.forced_unwatch.remove(&key);
|
||||
}
|
||||
@@ -344,7 +348,7 @@ pub(super) async fn run(
|
||||
let mut check_locations_interval = interval(Duration::from_secs(2));
|
||||
check_locations_interval.set_missed_tick_behavior(MissedTickBehavior::Skip);
|
||||
|
||||
let mut runner = Runner::new(node);
|
||||
let mut runner = Runner::new(node).await;
|
||||
|
||||
let mut msg_stream = pin!((
|
||||
location_management_rx.map(StreamMessage::LocationManagementMessage),
|
||||
@@ -410,20 +414,20 @@ async fn get_location(
|
||||
fields(%location_id, library_id = %library.id),
|
||||
err,
|
||||
)]
|
||||
pub(super) async fn check_online(
|
||||
async fn check_online(
|
||||
location_ids_and_path::Data {
|
||||
id: location_id,
|
||||
pub_id,
|
||||
instance_id,
|
||||
device_pub_id,
|
||||
path,
|
||||
}: &location_ids_and_path::Data,
|
||||
node: &Node,
|
||||
library: &Library,
|
||||
device_pub_id_to_db: &Option<Vec<u8>>,
|
||||
) -> Result<bool, LocationManagerError> {
|
||||
let pub_id = Uuid::from_slice(pub_id)?;
|
||||
|
||||
// TODO(N): This isn't gonna work with removable media and this will likely permanently break if the DB is restored from a backup.
|
||||
if *instance_id == Some(library.config().await.instance_id) {
|
||||
if *device_pub_id == *device_pub_id_to_db {
|
||||
match fs::metadata(maybe_missing(path, "location.path")?).await {
|
||||
Ok(_) => {
|
||||
node.locations.add_online(pub_id).await;
|
||||
|
||||
@@ -163,7 +163,7 @@ impl LocationCreateArgs {
|
||||
}
|
||||
);
|
||||
|
||||
let uuid = Uuid::new_v4();
|
||||
let uuid = Uuid::now_v7();
|
||||
|
||||
let location = create_location(
|
||||
library,
|
||||
@@ -246,7 +246,7 @@ impl LocationCreateArgs {
|
||||
},
|
||||
);
|
||||
|
||||
let uuid = Uuid::new_v4();
|
||||
let uuid = Uuid::now_v7();
|
||||
|
||||
let location = create_location(
|
||||
library,
|
||||
@@ -1160,7 +1160,7 @@ pub async fn create_file_path(
|
||||
.unzip()
|
||||
};
|
||||
|
||||
let pub_id = sd_utils::uuid_to_bytes(&Uuid::new_v4());
|
||||
let pub_id = sd_utils::uuid_to_bytes(&Uuid::now_v7());
|
||||
|
||||
let created_path = sync
|
||||
.write_ops(
|
||||
|
||||
@@ -5,6 +5,7 @@ use crate::{
|
||||
};
|
||||
|
||||
use sd_cloud_schema::devices::DeviceOS;
|
||||
use sd_core_sync::DevicePubId;
|
||||
use sd_p2p::Identity;
|
||||
use sd_utils::error::FileIOError;
|
||||
|
||||
@@ -27,6 +28,8 @@ use tokio::{
|
||||
use tracing::error;
|
||||
use uuid::Uuid;
|
||||
|
||||
use super::HardwareModel;
|
||||
|
||||
/// NODE_STATE_CONFIG_NAME is the name of the file which stores the NodeState
|
||||
pub const NODE_STATE_CONFIG_NAME: &str = "node_state.sdconfig";
|
||||
|
||||
@@ -117,7 +120,7 @@ impl Default for NodeConfigP2P {
|
||||
#[derive(Debug, Clone, Serialize, Deserialize)] // If you are adding `specta::Type` on this your probably about to leak the P2P private key
|
||||
pub struct NodeConfig {
|
||||
/// id is a unique identifier for the current node. Each node has a public identifier (this one) and is given a local id for each library (done within the library code).
|
||||
pub id: Uuid,
|
||||
pub id: DevicePubId,
|
||||
/// name is the display name of the current node. This is set by the user and is shown in the UI. // TODO: Length validation so it can fit in DNS record
|
||||
pub name: String,
|
||||
/// core level notifications
|
||||
@@ -137,8 +140,10 @@ pub struct NodeConfig {
|
||||
pub features: Vec<BackendFeature>,
|
||||
/// The aggregation of many different preferences for the node
|
||||
pub preferences: NodePreferences,
|
||||
// Operating System of the node
|
||||
/// Operating System of the node
|
||||
pub os: DeviceOS,
|
||||
/// Hardware model of the node
|
||||
pub hardware_model: HardwareModel,
|
||||
|
||||
version: NodeConfigVersion,
|
||||
}
|
||||
@@ -196,9 +201,13 @@ impl ManagedVersion<NodeConfigVersion> for NodeConfig {
|
||||
name.truncate(255);
|
||||
|
||||
let os = DeviceOS::from_env();
|
||||
let hardware_model = HardwareModel::try_get().unwrap_or_else(|e| {
|
||||
error!(?e, "Failed to get hardware model");
|
||||
HardwareModel::Other
|
||||
});
|
||||
|
||||
Some(Self {
|
||||
id: Uuid::now_v7(),
|
||||
id: Uuid::now_v7().into(),
|
||||
name,
|
||||
identity: Identity::default(),
|
||||
p2p: NodeConfigP2P::default(),
|
||||
@@ -207,6 +216,7 @@ impl ManagedVersion<NodeConfigVersion> for NodeConfig {
|
||||
notifications: vec![],
|
||||
preferences: NodePreferences::default(),
|
||||
os,
|
||||
hardware_model,
|
||||
})
|
||||
}
|
||||
}
|
||||
@@ -342,6 +352,13 @@ impl NodeConfig {
|
||||
serde_json::to_value(DeviceOS::from_env())
|
||||
.map_err(VersionManagerError::SerdeJson)?,
|
||||
);
|
||||
config.insert(
|
||||
String::from("hardware_model"),
|
||||
serde_json::to_value(
|
||||
HardwareModel::try_get().unwrap_or(HardwareModel::Other),
|
||||
)
|
||||
.map_err(VersionManagerError::SerdeJson)?,
|
||||
);
|
||||
|
||||
config.remove("features");
|
||||
config.remove("auth_token");
|
||||
|
||||
@@ -1,151 +1,209 @@
|
||||
use std::io::Error;
|
||||
use std::str;
|
||||
use std::io;
|
||||
|
||||
use serde::{Deserialize, Serialize};
|
||||
use specta::Type;
|
||||
use strum::IntoEnumIterator;
|
||||
use strum_macros::{Display, EnumIter};
|
||||
|
||||
#[repr(i32)]
|
||||
#[derive(Debug, Clone, Display, Copy, EnumIter, Type, Serialize, Deserialize, Eq, PartialEq)]
|
||||
#[specta(rename = "core_HardwareModel")]
|
||||
#[specta(rename = "CoreHardwareModel")]
|
||||
pub enum HardwareModel {
|
||||
Other,
|
||||
MacStudio,
|
||||
MacBookAir,
|
||||
MacBookPro,
|
||||
MacBook,
|
||||
MacMini,
|
||||
MacPro,
|
||||
IMac,
|
||||
IMacPro,
|
||||
IPad,
|
||||
IPhone,
|
||||
Simulator,
|
||||
Android,
|
||||
Other = 0,
|
||||
MacStudio = 1,
|
||||
MacBookAir = 2,
|
||||
MacBookPro = 3,
|
||||
MacBook = 4,
|
||||
MacMini = 5,
|
||||
MacPro = 6,
|
||||
IMac = 7,
|
||||
IMacPro = 8,
|
||||
IPad = 9,
|
||||
IPhone = 10,
|
||||
Simulator = 11,
|
||||
Android = 12,
|
||||
}
|
||||
|
||||
impl HardwareModel {
|
||||
pub fn from_display_name(name: &str) -> Self {
|
||||
use strum::IntoEnumIterator;
|
||||
HardwareModel::iter()
|
||||
impl From<i32> for HardwareModel {
|
||||
fn from(value: i32) -> Self {
|
||||
match value {
|
||||
1 => Self::MacStudio,
|
||||
2 => Self::MacBookAir,
|
||||
3 => Self::MacBookPro,
|
||||
4 => Self::MacBook,
|
||||
5 => Self::MacMini,
|
||||
6 => Self::MacPro,
|
||||
7 => Self::IMac,
|
||||
8 => Self::IMacPro,
|
||||
9 => Self::IPad,
|
||||
10 => Self::IPhone,
|
||||
11 => Self::Simulator,
|
||||
12 => Self::Android,
|
||||
_ => Self::Other,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl From<HardwareModel> for sd_cloud_schema::devices::HardwareModel {
|
||||
fn from(model: HardwareModel) -> Self {
|
||||
match model {
|
||||
HardwareModel::MacStudio => Self::MacStudio,
|
||||
HardwareModel::MacBookAir => Self::MacBookAir,
|
||||
HardwareModel::MacBookPro => Self::MacBookPro,
|
||||
HardwareModel::MacBook => Self::MacBook,
|
||||
HardwareModel::MacMini => Self::MacMini,
|
||||
HardwareModel::MacPro => Self::MacPro,
|
||||
HardwareModel::IMac => Self::IMac,
|
||||
HardwareModel::IMacPro => Self::IMacPro,
|
||||
HardwareModel::IPad => Self::IPad,
|
||||
HardwareModel::IPhone => Self::IPhone,
|
||||
HardwareModel::Simulator => Self::Simulator,
|
||||
HardwareModel::Android => Self::Android,
|
||||
HardwareModel::Other => Self::Other,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl From<sd_cloud_schema::devices::HardwareModel> for HardwareModel {
|
||||
fn from(model: sd_cloud_schema::devices::HardwareModel) -> Self {
|
||||
match model {
|
||||
sd_cloud_schema::devices::HardwareModel::MacStudio => Self::MacStudio,
|
||||
sd_cloud_schema::devices::HardwareModel::MacBookAir => Self::MacBookAir,
|
||||
sd_cloud_schema::devices::HardwareModel::MacBookPro => Self::MacBookPro,
|
||||
sd_cloud_schema::devices::HardwareModel::MacBook => Self::MacBook,
|
||||
sd_cloud_schema::devices::HardwareModel::MacMini => Self::MacMini,
|
||||
sd_cloud_schema::devices::HardwareModel::MacPro => Self::MacPro,
|
||||
sd_cloud_schema::devices::HardwareModel::IMac => Self::IMac,
|
||||
sd_cloud_schema::devices::HardwareModel::IMacPro => Self::IMacPro,
|
||||
sd_cloud_schema::devices::HardwareModel::IPad => Self::IPad,
|
||||
sd_cloud_schema::devices::HardwareModel::IPhone => Self::IPhone,
|
||||
sd_cloud_schema::devices::HardwareModel::Simulator => Self::Simulator,
|
||||
sd_cloud_schema::devices::HardwareModel::Android => Self::Android,
|
||||
sd_cloud_schema::devices::HardwareModel::Other => Self::Other,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl From<&str> for HardwareModel {
|
||||
fn from(name: &str) -> Self {
|
||||
Self::iter()
|
||||
.find(|&model| {
|
||||
model.to_string().to_lowercase().replace(' ', "")
|
||||
== name.to_lowercase().replace(' ', "")
|
||||
})
|
||||
.unwrap_or(HardwareModel::Other)
|
||||
.unwrap_or(Self::Other)
|
||||
}
|
||||
}
|
||||
|
||||
pub fn get_hardware_model_name() -> Result<HardwareModel, Error> {
|
||||
#[cfg(target_os = "macos")]
|
||||
{
|
||||
use std::process::Command;
|
||||
impl HardwareModel {
|
||||
pub fn try_get() -> Result<Self, io::Error> {
|
||||
#[cfg(target_os = "macos")]
|
||||
{
|
||||
use std::process::Command;
|
||||
|
||||
let output = Command::new("system_profiler")
|
||||
.arg("SPHardwareDataType")
|
||||
.output()?;
|
||||
let output = Command::new("system_profiler")
|
||||
.arg("SPHardwareDataType")
|
||||
.output()?;
|
||||
|
||||
if output.status.success() {
|
||||
let output_str = std::str::from_utf8(&output.stdout).unwrap_or_default();
|
||||
let hardware_model = output_str
|
||||
.lines()
|
||||
.find(|line| line.to_lowercase().contains("model name"))
|
||||
.and_then(|line| line.split_once(':'))
|
||||
.map(|(_, model_name)| HardwareModel::from_display_name(model_name.trim()))
|
||||
.unwrap_or(HardwareModel::Other);
|
||||
if output.status.success() {
|
||||
let output_str = std::str::from_utf8(&output.stdout).unwrap_or_default();
|
||||
let hardware_model = output_str
|
||||
.lines()
|
||||
.find(|line| line.to_lowercase().contains("model name"))
|
||||
.and_then(|line| line.split_once(':'))
|
||||
.map(|(_, model_name)| model_name.trim().into())
|
||||
.unwrap_or(Self::Other);
|
||||
|
||||
Ok(hardware_model)
|
||||
} else {
|
||||
Err(Error::new(
|
||||
std::io::ErrorKind::Other,
|
||||
format!(
|
||||
"Failed to get hardware model name: {}",
|
||||
String::from_utf8_lossy(&output.stderr)
|
||||
),
|
||||
))
|
||||
}
|
||||
}
|
||||
#[cfg(target_os = "ios")]
|
||||
{
|
||||
use std::ffi::CString;
|
||||
use std::ptr;
|
||||
|
||||
extern "C" {
|
||||
fn sysctlbyname(
|
||||
name: *const libc::c_char,
|
||||
oldp: *mut libc::c_void,
|
||||
oldlenp: *mut usize,
|
||||
newp: *mut libc::c_void,
|
||||
newlen: usize,
|
||||
) -> libc::c_int;
|
||||
}
|
||||
|
||||
fn get_device_type() -> Option<String> {
|
||||
let mut size: usize = 0;
|
||||
let name = CString::new("hw.machine").expect("CString::new failed");
|
||||
|
||||
// First, get the size of the buffer needed
|
||||
unsafe {
|
||||
sysctlbyname(
|
||||
name.as_ptr(),
|
||||
ptr::null_mut(),
|
||||
&mut size,
|
||||
ptr::null_mut(),
|
||||
0,
|
||||
);
|
||||
}
|
||||
|
||||
// Allocate a buffer with the correct size
|
||||
let mut buffer: Vec<u8> = vec![0; size];
|
||||
|
||||
// Get the actual machine type
|
||||
unsafe {
|
||||
sysctlbyname(
|
||||
name.as_ptr(),
|
||||
buffer.as_mut_ptr() as *mut libc::c_void,
|
||||
&mut size,
|
||||
ptr::null_mut(),
|
||||
0,
|
||||
);
|
||||
}
|
||||
|
||||
// Convert the buffer to a String
|
||||
let machine_type = String::from_utf8_lossy(&buffer).trim().to_string();
|
||||
|
||||
// Check if the device is an iPad or iPhone
|
||||
if machine_type.starts_with("iPad") {
|
||||
Some("iPad".to_string())
|
||||
} else if machine_type.starts_with("iPhone") {
|
||||
Some("iPhone".to_string())
|
||||
} else if machine_type.starts_with("arm") {
|
||||
Some("Simulator".to_string())
|
||||
Ok(hardware_model)
|
||||
} else {
|
||||
None
|
||||
Err(io::Error::new(
|
||||
io::ErrorKind::Other,
|
||||
format!(
|
||||
"Failed to get hardware model name: {}",
|
||||
String::from_utf8_lossy(&output.stderr)
|
||||
),
|
||||
))
|
||||
}
|
||||
}
|
||||
#[cfg(target_os = "ios")]
|
||||
{
|
||||
use std::ffi::CString;
|
||||
use std::ptr;
|
||||
|
||||
extern "C" {
|
||||
fn sysctlbyname(
|
||||
name: *const libc::c_char,
|
||||
oldp: *mut libc::c_void,
|
||||
oldlenp: *mut usize,
|
||||
newp: *mut libc::c_void,
|
||||
newlen: usize,
|
||||
) -> libc::c_int;
|
||||
}
|
||||
|
||||
fn get_device_type() -> Option<String> {
|
||||
let mut size: usize = 0;
|
||||
let name = CString::new("hw.machine").expect("CString::new failed");
|
||||
|
||||
// First, get the size of the buffer needed
|
||||
unsafe {
|
||||
sysctlbyname(
|
||||
name.as_ptr(),
|
||||
ptr::null_mut(),
|
||||
&mut size,
|
||||
ptr::null_mut(),
|
||||
0,
|
||||
);
|
||||
}
|
||||
|
||||
// Allocate a buffer with the correct size
|
||||
let mut buffer: Vec<u8> = vec![0; size];
|
||||
|
||||
// Get the actual machine type
|
||||
unsafe {
|
||||
sysctlbyname(
|
||||
name.as_ptr(),
|
||||
buffer.as_mut_ptr() as *mut libc::c_void,
|
||||
&mut size,
|
||||
ptr::null_mut(),
|
||||
0,
|
||||
);
|
||||
}
|
||||
|
||||
// Convert the buffer to a String
|
||||
let machine_type = String::from_utf8_lossy(&buffer).trim().to_string();
|
||||
|
||||
// Check if the device is an iPad or iPhone
|
||||
if machine_type.starts_with("iPad") {
|
||||
Some("iPad".to_string())
|
||||
} else if machine_type.starts_with("iPhone") {
|
||||
Some("iPhone".to_string())
|
||||
} else if machine_type.starts_with("arm") {
|
||||
Some("Simulator".to_string())
|
||||
} else {
|
||||
None
|
||||
}
|
||||
}
|
||||
|
||||
if let Some(device_type) = get_device_type() {
|
||||
let hardware_model = Self::from_display_name(&device_type.as_str());
|
||||
|
||||
Ok(hardware_model)
|
||||
} else {
|
||||
Err(Error::new(
|
||||
std::io::ErrorKind::Other,
|
||||
"Failed to get hardware model name",
|
||||
))
|
||||
}
|
||||
}
|
||||
|
||||
if let Some(device_type) = get_device_type() {
|
||||
let hardware_model = HardwareModel::from_display_name(&device_type.as_str());
|
||||
#[cfg(target_os = "android")]
|
||||
{
|
||||
Ok(Self::Android)
|
||||
}
|
||||
|
||||
Ok(hardware_model)
|
||||
} else {
|
||||
Err(Error::new(
|
||||
std::io::ErrorKind::Other,
|
||||
"Failed to get hardware model name",
|
||||
))
|
||||
#[cfg(not(any(target_os = "macos", target_os = "ios", target_os = "android")))]
|
||||
{
|
||||
Ok(Self::Other)
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(target_os = "android")]
|
||||
{
|
||||
Ok(HardwareModel::Android)
|
||||
}
|
||||
|
||||
#[cfg(not(any(target_os = "macos", target_os = "ios", target_os = "android")))]
|
||||
{
|
||||
Err(Error::new(
|
||||
std::io::ErrorKind::Unsupported,
|
||||
"Unsupported operating system",
|
||||
))
|
||||
}
|
||||
}
|
||||
|
||||
@@ -21,7 +21,7 @@ impl TagCreateArgs {
|
||||
self,
|
||||
Library { db, sync, .. }: &Library,
|
||||
) -> Result<tag::Data, sd_core_sync::Error> {
|
||||
let pub_id = Uuid::new_v4().as_bytes().to_vec();
|
||||
let pub_id = Uuid::now_v7().as_bytes().to_vec();
|
||||
|
||||
let (sync_params, db_params): (Vec<_>, Vec<_>) = [
|
||||
sync_db_entry!(self.name, tag::name),
|
||||
|
||||
@@ -1,7 +1,7 @@
|
||||
use crate::{
|
||||
node::{
|
||||
config::{self, P2PDiscoveryState},
|
||||
get_hardware_model_name, HardwareModel,
|
||||
HardwareModel,
|
||||
},
|
||||
p2p::{
|
||||
libraries::libraries_hook, operations, sync::SyncMessage, Header, OperatingSystem,
|
||||
@@ -208,7 +208,7 @@ impl P2PManager {
|
||||
PeerMetadata {
|
||||
name: config.name.clone(),
|
||||
operating_system: Some(OperatingSystem::get_os()),
|
||||
device_model: Some(get_hardware_model_name().unwrap_or(HardwareModel::Other)),
|
||||
device_model: Some(HardwareModel::try_get().unwrap_or(HardwareModel::Other)),
|
||||
version: Some(env!("CARGO_PKG_VERSION").to_string()),
|
||||
}
|
||||
.update(&mut self.p2p.metadata_mut());
|
||||
|
||||
@@ -47,7 +47,7 @@ impl PeerMetadata {
|
||||
.get("os")
|
||||
.map(|os| os.parse().map_err(|_| "Unable to parse 'OperationSystem'!"))
|
||||
.transpose()?,
|
||||
device_model: Some(HardwareModel::from_display_name(
|
||||
device_model: Some(HardwareModel::from(
|
||||
data.get("device_model")
|
||||
.map(|s| s.as_str())
|
||||
.unwrap_or("Other"),
|
||||
|
||||
@@ -245,7 +245,7 @@ mod responder {
|
||||
ingest
|
||||
.event_tx
|
||||
.send(Event::Messages(MessagesEvent {
|
||||
instance_id: library.sync.instance,
|
||||
device_pub_id: library.sync.device_pub_id.clone(),
|
||||
has_more: ops.len() == OPS_PER_REQUEST as usize,
|
||||
messages: ops,
|
||||
wait_tx: Some(wait_tx),
|
||||
|
||||
@@ -515,16 +515,15 @@ fn compute_stats<'v>(volumes: impl IntoIterator<Item = &'v Volume>) -> (u64, u64
|
||||
async fn update_storage_statistics(
|
||||
db: &PrismaClient,
|
||||
sync: &SyncManager,
|
||||
instance_pub_id: &Uuid,
|
||||
total_capacity: u64,
|
||||
available_capacity: u64,
|
||||
) -> Result<(), VolumeError> {
|
||||
let instance_pub_id = uuid_to_bytes(instance_pub_id);
|
||||
let device_pub_id = sync.device_pub_id.to_db();
|
||||
|
||||
let storage_statistics_pub_id = db
|
||||
.storage_statistics()
|
||||
.find_unique(storage_statistics::instance_pub_id::equals(
|
||||
instance_pub_id.clone(),
|
||||
.find_unique(storage_statistics::device_pub_id::equals(
|
||||
device_pub_id.clone(),
|
||||
))
|
||||
.select(storage_statistics::select!({ pub_id }))
|
||||
.exec()
|
||||
@@ -571,7 +570,7 @@ async fn update_storage_statistics(
|
||||
)
|
||||
.await?;
|
||||
} else {
|
||||
let new_storage_statistics_id = uuid_to_bytes(&Uuid::new_v4());
|
||||
let new_storage_statistics_id = uuid_to_bytes(&Uuid::now_v7());
|
||||
|
||||
sync.write_ops(
|
||||
db,
|
||||
@@ -590,8 +589,8 @@ async fn update_storage_statistics(
|
||||
msgpack!(available_capacity),
|
||||
),
|
||||
(
|
||||
storage_statistics::instance_pub_id::NAME,
|
||||
msgpack!(instance_pub_id),
|
||||
storage_statistics::device_pub_id::NAME,
|
||||
msgpack!(device_pub_id),
|
||||
),
|
||||
],
|
||||
),
|
||||
@@ -601,7 +600,7 @@ async fn update_storage_statistics(
|
||||
vec![
|
||||
storage_statistics::total_capacity::set(total_capacity as i64),
|
||||
storage_statistics::available_capacity::set(available_capacity as i64),
|
||||
storage_statistics::instance_pub_id::set(Some(instance_pub_id.clone())),
|
||||
storage_statistics::device_pub_id::set(Some(device_pub_id.clone())),
|
||||
],
|
||||
)
|
||||
// We don't need any data here, just the id avoids receiving the entire object
|
||||
@@ -633,14 +632,7 @@ pub fn save_storage_statistics(node: &Node) {
|
||||
..
|
||||
} = &*library;
|
||||
|
||||
update_storage_statistics(
|
||||
db,
|
||||
sync,
|
||||
instance_uuid,
|
||||
total_capacity,
|
||||
available_capacity,
|
||||
)
|
||||
.await
|
||||
update_storage_statistics(db, sync, total_capacity, available_capacity).await
|
||||
})
|
||||
.collect::<Vec<_>>()
|
||||
.join()
|
||||
|
||||
@@ -29,7 +29,6 @@ pub fn spawn_volume_watcher(library: Arc<Library>) {
|
||||
if let Err(e) = super::update_storage_statistics(
|
||||
&library.db,
|
||||
&library.sync,
|
||||
&library.instance_uuid,
|
||||
total_capacity,
|
||||
available_capacity,
|
||||
)
|
||||
|
||||
@@ -61,7 +61,7 @@ pub fn enumerate(models: &[ModelWithSyncType<'_>]) -> TokenStream {
|
||||
|
||||
impl ModelSyncData {
|
||||
pub fn from_op(op: sd_sync::CRDTOperation) -> Option<Self> {
|
||||
Some(match op.model {
|
||||
Some(match op.model_id {
|
||||
#(#matches),*,
|
||||
_ => return None
|
||||
})
|
||||
|
||||
@@ -26,11 +26,9 @@ pub trait OperationFactory {
|
||||
id: &SId,
|
||||
data: CRDTOperationData,
|
||||
) -> CRDTOperation {
|
||||
let timestamp = self.get_clock().new_timestamp();
|
||||
|
||||
CRDTOperation {
|
||||
device_pub_id: self.get_device_pub_id(),
|
||||
timestamp: *timestamp.get_time(),
|
||||
timestamp: *self.get_clock().new_timestamp().get_time(),
|
||||
model_id: <SId::Model as SyncModel>::MODEL_ID,
|
||||
record_id: msgpack!(id),
|
||||
data,
|
||||
|
||||
55
packages/client/src/core.ts
generated
55
packages/client/src/core.ts
generated
@@ -52,7 +52,6 @@ export type Procedures = {
|
||||
{ key: "search.saved.get", input: LibraryArgs<number>, result: SavedSearch | null } |
|
||||
{ key: "search.saved.list", input: LibraryArgs<null>, result: SavedSearch[] } |
|
||||
{ key: "sync.enabled", input: LibraryArgs<null>, result: boolean } |
|
||||
{ key: "sync.messages", input: LibraryArgs<null>, result: CRDTOperation[] } |
|
||||
{ key: "tags.get", input: LibraryArgs<number>, result: Tag | null } |
|
||||
{ key: "tags.getForObject", input: LibraryArgs<number>, result: Tag[] } |
|
||||
{ key: "tags.getWithObjects", input: LibraryArgs<number[]>, result: { [key in number]: ({ object: { id: number }; date_created: string | null })[] } } |
|
||||
@@ -66,14 +65,15 @@ export type Procedures = {
|
||||
{ key: "cloud.bootstrap", input: [AccessToken, RefreshToken], result: null } |
|
||||
{ key: "cloud.devices.delete", input: DeviceDeleteRequest, result: null } |
|
||||
{ key: "cloud.devices.update", input: DeviceUpdateRequest, result: null } |
|
||||
{ key: "cloud.libraries.create", input: LibraryArgs<LibrariesCreateArgs>, result: null } |
|
||||
{ key: "cloud.libraries.delete", input: LibraryDeleteRequest, result: null } |
|
||||
{ key: "cloud.libraries.update", input: LibraryUpdateRequest, result: null } |
|
||||
{ key: "cloud.libraries.create", input: LibraryArgs<AccessToken>, result: null } |
|
||||
{ key: "cloud.libraries.delete", input: LibraryArgs<AccessToken>, result: null } |
|
||||
{ key: "cloud.libraries.update", input: LibraryArgs<LibrariesUpdateArgs>, result: null } |
|
||||
{ key: "cloud.library.create", input: LibraryArgs<null>, result: null } |
|
||||
{ key: "cloud.library.join", input: string, result: null } |
|
||||
{ key: "cloud.library.sync", input: LibraryArgs<null>, result: null } |
|
||||
{ key: "cloud.locations.create", input: LocationCreateRequest, result: null } |
|
||||
{ key: "cloud.locations.delete", input: LocationDeleteRequest, result: null } |
|
||||
{ key: "cloud.userResponse", input: UserResponse, result: null } |
|
||||
{ key: "ephemeralFiles.copyFiles", input: LibraryArgs<EphemeralFileSystemOps>, result: null } |
|
||||
{ key: "ephemeralFiles.createFile", input: LibraryArgs<CreateEphemeralFileArgs>, result: string } |
|
||||
{ key: "ephemeralFiles.createFolder", input: LibraryArgs<CreateEphemeralFolderArgs>, result: string } |
|
||||
@@ -136,6 +136,7 @@ export type Procedures = {
|
||||
{ key: "tags.update", input: LibraryArgs<TagUpdateArgs>, result: null } |
|
||||
{ key: "toggleFeatureFlag", input: BackendFeature, result: null },
|
||||
subscriptions:
|
||||
{ key: "cloud.listenCloudServicesNotifications", input: never, result: NotifyUser } |
|
||||
{ key: "invalidation.listen", input: never, result: InvalidateOperationEvent[] } |
|
||||
{ key: "jobs.newFilePathIdentified", input: LibraryArgs<null>, result: number[] } |
|
||||
{ key: "jobs.newThumbnail", input: LibraryArgs<null>, result: ThumbKey } |
|
||||
@@ -171,10 +172,6 @@ export type Backup = ({ id: string; timestamp: string; library_id: string; libra
|
||||
|
||||
export type BuildInfo = { version: string; commit: string }
|
||||
|
||||
export type CRDTOperation = { instance: string; timestamp: number; model: number; record_id: JsonValue; data: CRDTOperationData }
|
||||
|
||||
export type CRDTOperationData = { c: { [key in string]: JsonValue } } | { u: { field: string; value: JsonValue } } | "d"
|
||||
|
||||
export type CameraData = { device_make: string | null; device_model: string | null; color_space: string | null; color_profile: ColorProfile | null; focal_length: number | null; shutter_speed: number | null; flash: Flash | null; orientation: Orientation; lens_make: string | null; lens_model: string | null; bit_depth: number | null; zoom: number | null; iso: number | null; software: string | null; serial_number: string | null; lens_serial_number: string | null; contrast: number | null; saturation: number | null; sharpness: number | null; composite: Composite | null }
|
||||
|
||||
export type CasId = string
|
||||
@@ -185,6 +182,10 @@ export type Chapter = { id: number; start: [number, number]; end: [number, numbe
|
||||
|
||||
export type CloudLocation = { pub_id: LocationPubId; name: string; device: Device | null; library: Library | null; created_at: string; updated_at: string }
|
||||
|
||||
export type CloudP2PError = "Rejected" | "UnableToConnect" | "TimedOut"
|
||||
|
||||
export type CloudP2PTicket = bigint
|
||||
|
||||
export type Codec = { kind: string | null; sub_kind: string | null; tag: string | null; name: string | null; profile: string | null; bit_rate: number; props: Props | null }
|
||||
|
||||
export type ColorProfile = "Normal" | "Custom" | "HDRNoOriginal" | "HDRWithOriginal" | "OriginalForHDR" | "Panorama" | "PortraitHDR" | "Portrait"
|
||||
@@ -217,6 +218,12 @@ export type ConvertImageArgs = { location_id: number; file_path_id: number; dele
|
||||
|
||||
export type ConvertibleExtension = "bmp" | "dib" | "ff" | "gif" | "ico" | "jpg" | "jpeg" | "png" | "pnm" | "qoi" | "tga" | "icb" | "vda" | "vst" | "tiff" | "tif" | "hif" | "heif" | "heifs" | "heic" | "heics" | "avif" | "avci" | "avcs" | "svg" | "svgz" | "pdf" | "webp"
|
||||
|
||||
export type CoreDevicePubId = CorePubId
|
||||
|
||||
export type CoreHardwareModel = "Other" | "MacStudio" | "MacBookAir" | "MacBookPro" | "MacBook" | "MacMini" | "MacPro" | "IMac" | "IMacPro" | "IPad" | "IPhone" | "Simulator" | "Android"
|
||||
|
||||
export type CorePubId = { Uuid: string } | { Vec: number[] }
|
||||
|
||||
export type CreateEphemeralFileArgs = { path: string; context: EphemeralFileCreateContextTypes; name: string | null }
|
||||
|
||||
export type CreateEphemeralFolderArgs = { path: string; name: string | null }
|
||||
@@ -395,8 +402,14 @@ export type JobName = "Indexer" | "FileIdentifier" | "MediaProcessor" | "Copy" |
|
||||
|
||||
export type JobProgressEvent = { id: string; library_id: string; task_count: number; completed_task_count: number; phase: string; message: string; info: string; estimated_completion: string }
|
||||
|
||||
export type JoinSyncGroupError = "Communication" | "InternalServer" | "Auth"
|
||||
|
||||
export type JoinSyncGroupResponse = { Accepted: { authorizor_device: Device } } | { Failed: CloudP2PError } | "CriticalError"
|
||||
|
||||
export type JsonValue = null | boolean | number | string | JsonValue[] | { [key in string]: JsonValue }
|
||||
|
||||
export type KeyHash = string
|
||||
|
||||
export type KindStatistic = { kind: number; name: string; count: [number, number]; total_bytes: [number, number] }
|
||||
|
||||
export type KindStatistics = { statistics: { [key in number]: KindStatistic }; total_identified_files: number; total_unidentified_files: number }
|
||||
@@ -405,7 +418,7 @@ export type Label = { id: number; name: string; date_created: string | null; dat
|
||||
|
||||
export type LabelWithObjects = { id: number; name: string; date_created: string | null; date_modified: string | null; label_objects: { object: { id: number; file_paths: FilePath[] } }[] }
|
||||
|
||||
export type LibrariesCreateArgs = { access_token: AccessToken; device_pub_id: DevicePubId }
|
||||
export type LibrariesUpdateArgs = { access_token: AccessToken; name: string }
|
||||
|
||||
export type Library = { pub_id: LibraryPubId; name: string; original_device: Device | null; created_at: string; updated_at: string }
|
||||
|
||||
@@ -440,8 +453,6 @@ export type LibraryConfigVersion = "V0" | "V1" | "V2" | "V3" | "V4" | "V5" | "V6
|
||||
|
||||
export type LibraryConfigWrapped = { uuid: string; instance_id: string; instance_public_key: RemoteIdentity; config: LibraryConfig }
|
||||
|
||||
export type LibraryDeleteRequest = { access_token: AccessToken; pub_id: LibraryPubId }
|
||||
|
||||
export type LibraryGetRequest = { access_token: AccessToken; pub_id: LibraryPubId; with_device: boolean }
|
||||
|
||||
export type LibraryListRequest = { access_token: AccessToken; with_device: boolean }
|
||||
@@ -452,15 +463,13 @@ export type LibraryPreferences = { location?: { [key in string]: LocationSetting
|
||||
|
||||
export type LibraryPubId = string
|
||||
|
||||
export type LibraryUpdateRequest = { access_token: AccessToken; pub_id: LibraryPubId; name: string }
|
||||
|
||||
export type LightScanArgs = { location_id: number; sub_path: string }
|
||||
|
||||
export type ListenerState = { type: "Listening" } | { type: "Error"; error: string } | { type: "NotListening" }
|
||||
|
||||
export type Listeners = { ipv4: ListenerState; ipv6: ListenerState; relay: ListenerState }
|
||||
|
||||
export type Location = { id: number; pub_id: number[]; name: string | null; path: string | null; total_capacity: number | null; available_capacity: number | null; size_in_bytes: number[] | null; is_archived: boolean | null; generate_preview_media: boolean | null; sync_preview_media: boolean | null; hidden: boolean | null; date_created: string | null; scan_state: number; instance_id: number | null }
|
||||
export type Location = { id: number; pub_id: number[]; name: string | null; path: string | null; total_capacity: number | null; available_capacity: number | null; size_in_bytes: number[] | null; is_archived: boolean | null; generate_preview_media: boolean | null; sync_preview_media: boolean | null; hidden: boolean | null; date_created: string | null; scan_state: number; device_pub_id: number[] | null; instance_id: number | null }
|
||||
|
||||
/**
|
||||
* `LocationCreateArgs` is the argument received from the client using `rspc` to create a new location.
|
||||
@@ -505,7 +514,7 @@ export type MediaLocation = { latitude: number; longitude: number; pluscode: Plu
|
||||
|
||||
export type Metadata = { album: string | null; album_artist: string | null; artist: string | null; comment: string | null; composer: string | null; copyright: string | null; creation_time: string | null; date: string | null; disc: number | null; encoder: string | null; encoded_by: string | null; filename: string | null; genre: string | null; language: string | null; performer: string | null; publisher: string | null; service_name: string | null; service_provider: string | null; title: string | null; track: number | null; variant_bit_rate: number | null; custom: { [key in string]: string } }
|
||||
|
||||
export type MockDevice = { pub_id: DevicePubId; name: string; os: DeviceOS; used_storage: bigint; storage_size: bigint; created_at: string; updated_at: string; device_model: core_HardwareModel }
|
||||
export type MockDevice = { pub_id: DevicePubId; name: string; os: DeviceOS; used_storage: bigint; storage_size: bigint; created_at: string; updated_at: string; device_model: CoreHardwareModel }
|
||||
|
||||
export type NodeConfigP2P = { discovery?: P2PDiscoveryState; port: Port; disabled: boolean; disable_ipv6: boolean; disable_relay: boolean; enable_remote_access: boolean;
|
||||
/**
|
||||
@@ -527,11 +536,11 @@ export type NodeState = ({
|
||||
/**
|
||||
* id is a unique identifier for the current node. Each node has a public identifier (this one) and is given a local id for each library (done within the library code).
|
||||
*/
|
||||
id: string;
|
||||
id: CoreDevicePubId;
|
||||
/**
|
||||
* name is the display name of the current node. This is set by the user and is shown in the UI. // TODO: Length validation so it can fit in DNS record
|
||||
*/
|
||||
name: string; identity: RemoteIdentity; p2p: NodeConfigP2P; features: BackendFeature[]; preferences: NodePreferences }) & { data_path: string; device_model: string | null; is_in_docker: boolean }
|
||||
name: string; identity: RemoteIdentity; p2p: NodeConfigP2P; features: BackendFeature[]; preferences: NodePreferences; os: DeviceOS; hardware_model: CoreHardwareModel }) & { data_path: string; device_model: string | null; is_in_docker: boolean }
|
||||
|
||||
export type NonCriticalError = { indexer: NonCriticalIndexerError } | { file_identifier: NonCriticalFileIdentifierError } | { media_processor: NonCriticalMediaProcessorError }
|
||||
|
||||
@@ -562,6 +571,8 @@ export type NotificationId = { type: "library"; id: [string, number] } | { type:
|
||||
|
||||
export type NotificationKind = "info" | "success" | "error" | "warning"
|
||||
|
||||
export type NotifyUser = { kind: "ReceivedJoinSyncGroupRequest"; data: { ticket: CloudP2PTicket; asking_device: Device; sync_group: SyncGroup } } | { kind: "ReceivedJoinSyncGroupResponse"; data: { response: JoinSyncGroupResponse; sync_group: SyncGroup } } | { kind: "SendingJoinSyncGroupResponseError"; data: { error: JoinSyncGroupError; sync_group: SyncGroup } } | { kind: "TimedOutJoinRequest"; data: { device: Device; succeeded: boolean } }
|
||||
|
||||
export type Object = { id: number; pub_id: number[]; kind: number | null; key_id: number | null; hidden: boolean | null; favorite: boolean | null; important: boolean | null; note: string | null; date_created: string | null; date_accessed: string | null }
|
||||
|
||||
export type ObjectCursor = "none" | { dateAccessed: CursorOrderItem<string> } | { kind: CursorOrderItem<number> }
|
||||
@@ -602,7 +613,7 @@ export type P2PDiscoveryState = "Everyone" | "ContactsOnly" | "Disabled"
|
||||
|
||||
export type P2PEvent = { type: "PeerChange"; identity: RemoteIdentity; connection: ConnectionMethod; discovery: DiscoveryMethod; metadata: PeerMetadata; addrs: string[] } | { type: "PeerDelete"; identity: RemoteIdentity } | { type: "SpacedropRequest"; id: string; identity: RemoteIdentity; peer_name: string; files: string[] } | { type: "SpacedropProgress"; id: string; percent: number } | { type: "SpacedropTimedOut"; id: string } | { type: "SpacedropRejected"; id: string }
|
||||
|
||||
export type PeerMetadata = { name: string; operating_system: OperatingSystem | null; device_model: core_HardwareModel | null; version: string | null }
|
||||
export type PeerMetadata = { name: string; operating_system: OperatingSystem | null; device_model: CoreHardwareModel | null; version: string | null }
|
||||
|
||||
export type PlusCode = string
|
||||
|
||||
@@ -675,6 +686,10 @@ export type Stream = { id: number; name: string | null; codec: Codec | null; asp
|
||||
|
||||
export type SubtitleProps = { width: number; height: number }
|
||||
|
||||
export type SyncGroup = { pub_id: SyncGroupPubId; name: string; latest_key_hash: KeyHash; library: Library; devices: Device[]; created_at: string; updated_at: string }
|
||||
|
||||
export type SyncGroupPubId = string
|
||||
|
||||
export type SyncStatus = { ingest: boolean; cloud_send: boolean; cloud_receive: boolean; cloud_ingest: boolean }
|
||||
|
||||
export type SystemLocations = { desktop: string | null; documents: string | null; downloads: string | null; pictures: string | null; music: string | null; videos: string | null }
|
||||
@@ -699,8 +714,8 @@ export type ThumbKey = { shard_hex: string; cas_id: CasId; base_directory_str: s
|
||||
|
||||
export type UpdateThumbnailerPreferences = Record<string, never>
|
||||
|
||||
export type UserResponse = { kind: "AcceptDeviceInSyncGroup"; data: { ticket: CloudP2PTicket; accepted: boolean } }
|
||||
|
||||
export type VideoProps = { pixel_format: string | null; color_range: string | null; bits_per_channel: number | null; color_space: string | null; color_primaries: string | null; color_transfer: string | null; field_order: string | null; chroma_location: string | null; width: number; height: number; aspect_ratio_num: number | null; aspect_ratio_den: number | null; properties: string[] }
|
||||
|
||||
export type Volume = { name: string; mount_points: string[]; total_capacity: string; available_capacity: string; disk_type: DiskType; file_system: string | null; is_root_filesystem: boolean }
|
||||
|
||||
export type core_HardwareModel = "Other" | "MacStudio" | "MacBookAir" | "MacBookPro" | "MacBook" | "MacMini" | "MacPro" | "IMac" | "IMacPro" | "IPad" | "IPhone" | "Simulator" | "Android"
|
||||
|
||||
Reference in New Issue
Block a user