Eng 332 auto generate sync ids (#523)

* generate sync ids and use model name from ModelActions

* cleanup AddLocationDialog imports

* rustfmt
This commit is contained in:
Brendan Allan
2023-01-15 07:25:11 -08:00
committed by GitHub
parent 02b4c2eba6
commit 8e479f736f
8 changed files with 299 additions and 153 deletions

View File

@@ -40,24 +40,6 @@ model SharedOperation {
@@map("shared_operation")
}
model SyncEvent {
id Int @id @default(autoincrement())
node_id Int
timestamp String
// individual record pub id OR compound many-to-many pub ids
record_id Bytes
// the type of operation, I.E: CREATE, UPDATE, DELETE as an enum
kind Int
// the column name for atomic update operations
column String?
// the new value for create/update operations, msgpack encoded
value String
node Node @relation(fields: [node_id], references: [id])
@@map("sync_event")
}
model Statistics {
id Int @id @default(autoincrement())
date_captured DateTime @default(now())
@@ -72,6 +54,7 @@ model Statistics {
@@map("statistics")
}
/// @local(id: pub_id)
model Node {
id Int @id @default(autoincrement())
pub_id Bytes @unique
@@ -82,10 +65,9 @@ model Node {
timezone String?
date_created DateTime @default(now())
sync_events SyncEvent[]
jobs Job[]
jobs Job[]
Location Location[]
Location Location[]
OwnedOperation OwnedOperation[]
SharedOperation SharedOperation[]
@@ -108,6 +90,7 @@ model Volume {
@@map("volume")
}
/// @owned(id: pub_id)
model Location {
id Int @id @default(autoincrement())
pub_id Bytes @unique
@@ -130,6 +113,7 @@ model Location {
@@map("location")
}
/// @shared(id: cas_id)
model Object {
id Int @id @default(autoincrement())
// content addressable storage id - blake3 sampled checksum
@@ -174,6 +158,7 @@ model Object {
@@map("object")
}
/// @shared(id: [location, id])
model FilePath {
id Int
is_dir Boolean @default(false)
@@ -220,6 +205,7 @@ model FileConflict {
// keys allow us to know exactly which files can be decrypted with a given key
// they can be "mounted" to a client, and then used to decrypt files automatically
/// @shared(id: uuid)
model Key {
id Int @id @default(autoincrement())
// uuid to identify the key
@@ -277,6 +263,7 @@ model MediaData {
@@map("media_data")
}
/// @shared(id: pub_id)
model Tag {
id Int @id @default(autoincrement())
pub_id Bytes @unique
@@ -376,6 +363,7 @@ model Job {
@@map("job")
}
/// @shared(id: pub_id)
model Album {
id Int @id @default(autoincrement())
pub_id Bytes @unique

View File

@@ -7,6 +7,7 @@ use crate::{
preview::{ThumbnailJob, ThumbnailJobInit},
},
prisma::{file_path, indexer_rules_in_location, location, node, object},
prisma_sync,
};
use rspc::Type;
@@ -339,8 +340,9 @@ async fn create_location(
.write_op(
db,
ctx.sync.owned_create(
"Location",
json!({ "id": location_pub_id.as_bytes() }),
prisma_sync::location::SyncId {
pub_id: location_pub_id.as_bytes().to_vec(),
},
[
("node", json!({ "pub_id": ctx.id.as_bytes() })),
("name", json!(location_name)),

View File

@@ -1,7 +1,6 @@
use futures::future::join_all;
use sd_sync::*;
use serde::Deserialize;
use serde_json::{from_slice, from_value, to_vec, Value};
use serde_json::{from_slice, from_value, json, to_vec, Value};
use std::{
collections::{HashMap, HashSet},
sync::Arc,
@@ -10,8 +9,9 @@ use tokio::sync::mpsc::{self, Receiver, Sender};
use uhlc::{HLCBuilder, HLC, NTP64};
use uuid::Uuid;
use crate::prisma::{
file_path, location, node, object, owned_operation, shared_operation, PrismaClient,
use crate::{
prisma::{file_path, location, node, object, owned_operation, shared_operation, PrismaClient},
prisma_sync,
};
pub struct SyncManager {
@@ -220,19 +220,14 @@ impl SyncManager {
match op.typ {
CRDTOperationType::Owned(owned_op) => match owned_op.model.as_str() {
"FilePath" => {
#[derive(Deserialize)]
struct FilePathId {
location_id: Vec<u8>,
id: i32,
}
for item in owned_op.items {
let id: FilePathId = serde_json::from_value(item.id).unwrap();
let id: prisma_sync::file_path::SyncId =
serde_json::from_value(item.id).unwrap();
let location = self
.db
.location()
.find_unique(location::pub_id::equals(id.location_id))
.find_unique(location::pub_id::equals(id.location.pub_id))
.select(location::select!({ id }))
.exec()
.await?
@@ -264,14 +259,17 @@ impl SyncManager {
values,
skip_duplicates,
} => {
let location_ids = values
.iter()
.map(|(id, _)| {
serde_json::from_value::<FilePathId>(id.clone())
let location_ids =
values
.iter()
.map(|(id, _)| {
serde_json::from_value::<prisma_sync::file_path::SyncId>(id.clone())
.unwrap()
.location_id
})
.collect::<HashSet<_>>();
.location
.pub_id
})
.collect::<HashSet<_>>();
let location_id_mappings =
join_all(location_ids.iter().map(|id| async move {
self.db
@@ -291,12 +289,14 @@ impl SyncManager {
values
.into_iter()
.map(|(id, mut data)| {
let id: FilePathId =
let id: prisma_sync::file_path::SyncId =
serde_json::from_value(id).unwrap();
file_path::create_unchecked(
id.id,
*location_id_mappings.get(&id.location_id).unwrap(),
*location_id_mappings
.get(&id.location.pub_id)
.unwrap(),
serde_json::from_value(
data.remove("materialized_path").unwrap(),
)
@@ -340,20 +340,15 @@ impl SyncManager {
}
}
"Location" => {
#[derive(Deserialize)]
struct LocationId {
id: Vec<u8>,
}
for item in owned_op.items {
let id: LocationId = serde_json::from_value(item.id).unwrap();
let id: prisma_sync::location::SyncId = from_value(item.id).unwrap();
match item.data {
OwnedOperationData::Create(mut data) => {
self.db
.location()
.create(
id.id,
id.pub_id,
{
let val: std::collections::HashMap<String, Value> =
from_value(data.remove("node").unwrap()).unwrap();
@@ -379,15 +374,15 @@ impl SyncManager {
},
CRDTOperationType::Shared(shared_op) => match shared_op.model.as_str() {
"Object" => {
let cas_id: String = from_value(shared_op.record_id).unwrap();
let id: prisma_sync::object::SyncId = from_value(shared_op.record_id).unwrap();
match shared_op.data {
SharedOperationData::Create(_) => {
self.db
.object()
.upsert(
object::cas_id::equals(cas_id.clone()),
(cas_id, vec![]),
object::cas_id::equals(id.cas_id.clone()),
(id.cas_id, vec![]),
vec![],
)
.exec()
@@ -398,7 +393,7 @@ impl SyncManager {
self.db
.object()
.update(
object::cas_id::equals(cas_id),
object::cas_id::equals(id.cas_id),
vec![object::SetParam::deserialize(&field, value).unwrap()],
)
.exec()
@@ -426,18 +421,21 @@ impl SyncManager {
}
}
pub fn owned_create<const SIZE: usize>(
pub fn owned_create<
const SIZE: usize,
TSyncId: SyncId<ModelTypes = TModel>,
TModel: SyncType<Marker = OwnedSyncType>,
>(
&self,
model: &str,
id: Value,
id: TSyncId,
values: [(&'static str, Value); SIZE],
) -> CRDTOperation {
self.new_op(CRDTOperationType::Owned(OwnedOperation {
model: model.to_string(),
model: TModel::MODEL.to_string(),
items: [(id, values)]
.into_iter()
.map(|(id, data)| OwnedOperationItem {
id,
id: json!(id),
data: OwnedOperationData::Create(
data.into_iter().map(|(k, v)| (k.to_string(), v)).collect(),
),

View File

@@ -1,3 +1,5 @@
use prisma_client_rust_sdk::prelude::*;
mod parser;
#[derive(Debug)]
@@ -14,12 +16,12 @@ impl AttributeFieldValue<'_> {
}
}
// pub fn as_list(&self) -> Option<&Vec<&str>> {
// match self {
// AttributeFieldValue::List(fields) => Some(fields),
// _ => None,
// }
// }
pub fn as_list(&self) -> Option<&Vec<&str>> {
match self {
AttributeFieldValue::List(fields) => Some(fields),
_ => None,
}
}
}
#[derive(Debug)]
@@ -29,13 +31,23 @@ pub struct Attribute<'a> {
}
impl<'a> Attribute<'a> {
pub fn parse(input: &'a impl AsRef<str>) -> Result<Self, ()> {
parser::parse(input.as_ref())
.map(|(_, a)| a)
.map_err(|_| ())
pub fn parse(input: &'a str) -> Result<Self, ()> {
parser::parse(input).map(|(_, a)| a).map_err(|_| ())
}
pub fn field(&self, name: &str) -> Option<&AttributeFieldValue> {
self.fields.iter().find(|(n, _)| *n == name).map(|(_, v)| v)
}
}
pub fn model_attributes(model: &dml::Model) -> Vec<Attribute> {
model
.documentation
.as_ref()
.map(|docs| {
docs.lines()
.flat_map(|line| Attribute::parse(line))
.collect()
})
.unwrap_or_default()
}

View File

@@ -24,7 +24,7 @@ fn parens(input: &str) -> IResult<&str, &str> {
delimited(char('('), is_not(")"), char(')'))(input)
}
pub fn single_value<T, E: ParseError<T>>(i: T) -> IResult<T, T, E>
fn single_value<T, E: ParseError<T>>(i: T) -> IResult<T, T, E>
where
T: InputTakeAtPosition,
<T as InputTakeAtPosition>::Item: AsChar,

View File

@@ -1,4 +1,8 @@
pub use prisma_client_rust_sdk::prelude::*;
mod attribute;
use attribute::*;
use prisma_client_rust_sdk::prelude::*;
#[derive(Debug, serde::Serialize, thiserror::Error)]
enum Error {}
@@ -6,6 +10,65 @@ enum Error {}
#[derive(serde::Deserialize)]
struct SDSyncGenerator {}
type FieldVec<'a> = Vec<&'a dml::Field>;
#[derive(Debug)]
enum ModelSyncType<'a> {
Local {
id: FieldVec<'a>,
},
Owned {
id: FieldVec<'a>,
},
Shared {
id: FieldVec<'a>,
},
Relation {
group: FieldVec<'a>,
item: FieldVec<'a>,
},
}
impl<'a> ModelSyncType<'a> {
fn from_attribute(attr: &'a Attribute, model: &'a dml::Model) -> Option<Self> {
let id = attr
.field("id")
.map(|field| match field {
AttributeFieldValue::Single(s) => vec![*s],
AttributeFieldValue::List(l) => l.clone(),
})
.unwrap_or_else(|| {
model
.primary_key
.as_ref()
.unwrap()
.fields
.iter()
.map(|f| f.name.as_str())
.collect()
})
.into_iter()
.flat_map(|name| model.find_field(name))
.collect();
Some(match attr.name {
"local" => Self::Local { id },
"owned" => Self::Owned { id },
"shared" => Self::Shared { id },
_ => return None,
})
}
fn sync_id(&self) -> Vec<&dml::Field> {
match self {
Self::Owned { id } => id.clone(),
Self::Local { id } => id.clone(),
Self::Shared { id } => id.clone(),
_ => vec![],
}
}
}
impl PrismaGenerator for SDSyncGenerator {
const NAME: &'static str = "SD Sync Generator";
const DEFAULT_OUTPUT: &'static str = "prisma-sync.rs";
@@ -13,97 +76,158 @@ impl PrismaGenerator for SDSyncGenerator {
type Error = Error;
fn generate(self, args: GenerateArgs) -> Result<String, Self::Error> {
let set_param_impls = args.dml.models().map(|model| {
let model_name_snake = snake_ident(&model.name);
let model_modules = args.dml.models().map(|model| {
let model_name_snake = snake_ident(&model.name);
let field_matches = model.fields().filter_map(|field| {
let field_name_snake = snake_ident(field.name());
let field_name_snake_str = field_name_snake.to_string();
let attributes = model_attributes(&model);
match field {
dml::Field::ScalarField(_) => {
Some(quote! {
#field_name_snake_str => crate::prisma::#model_name_snake::#field_name_snake::set(::serde_json::from_value(val).unwrap()),
})
},
dml::Field::RelationField(relation_field) => {
let relation_model_name_snake = snake_ident(&relation_field.relation_info.referenced_model);
let sync_id = attributes
.iter()
.find_map(|a| ModelSyncType::from_attribute(a, model))
.map(|sync_type| {
let fields = sync_type.sync_id();
let fields = fields.iter().flat_map(|field| {
let name_snake = snake_ident(field.name());
match &relation_field.relation_info.references[..] {
[_] => {
Some(quote! {
#field_name_snake_str => {
let val: std::collections::HashMap<String, ::serde_json::Value> = ::serde_json::from_value(val).unwrap();
let val = val.into_iter().next().unwrap();
crate::prisma::#model_name_snake::#field_name_snake::connect(
crate::prisma::#relation_model_name_snake::UniqueWhereParam::deserialize(&val.0, val.1).unwrap()
)
},
})
let typ = match field {
dml::Field::ScalarField(_) => {
field.type_tokens(quote!(self))
},
_ => None
}
},
_ => None
}
});
dml::Field::RelationField(relation)=> {
let relation_model_name_snake = snake_ident(&relation.relation_info.referenced_model);
quote!(super::#relation_model_name_snake::SyncId)
},
_ => return None
};
match field_matches.clone().count() {
0 => quote!(),
_ => quote! {
impl crate::prisma::#model_name_snake::SetParam {
pub fn deserialize(field: &str, val: ::serde_json::Value) -> Option<Self> {
Some(match field {
#(#field_matches)*
_ => return None
Some(quote!(pub #name_snake: #typ))
});
let sync_type_marker = match &sync_type {
ModelSyncType::Local { .. } => quote!(LocalSyncType),
ModelSyncType::Owned { .. } => quote!(OwnedSyncType),
ModelSyncType::Shared { .. } => quote!(SharedSyncType),
ModelSyncType::Relation { .. } => quote!(RelationSyncType),
};
quote! {
#[derive(serde::Serialize, serde::Deserialize)]
pub struct SyncId {
#(#fields),*
}
impl sd_sync::SyncId for SyncId {
type ModelTypes = #model_name_snake::Types;
}
impl sd_sync::SyncType for #model_name_snake::Types {
type SyncId = SyncId;
type Marker = sd_sync::#sync_type_marker;
}
}
});
let set_param_impl = {
let field_matches = model.fields().filter_map(|field| {
let field_name_snake = snake_ident(field.name());
let field_name_snake_str = field_name_snake.to_string();
match field {
dml::Field::ScalarField(_) => {
Some(quote! {
#field_name_snake_str => #model_name_snake::#field_name_snake::set(::serde_json::from_value(val).unwrap()),
})
},
dml::Field::RelationField(relation_field) => {
let relation_model_name_snake = snake_ident(&relation_field.relation_info.referenced_model);
match &relation_field.relation_info.references[..] {
[_] => {
Some(quote! {
#field_name_snake_str => {
let val: std::collections::HashMap<String, ::serde_json::Value> = ::serde_json::from_value(val).unwrap();
let val = val.into_iter().next().unwrap();
#model_name_snake::#field_name_snake::connect(
#relation_model_name_snake::UniqueWhereParam::deserialize(&val.0, val.1).unwrap()
)
},
})
},
_ => None
}
},
_ => None
}
});
match field_matches.clone().count() {
0 => quote!(),
_ => quote! {
impl #model_name_snake::SetParam {
pub fn deserialize(field: &str, val: ::serde_json::Value) -> Option<Self> {
Some(match field {
#(#field_matches)*
_ => return None
})
}
}
}
}
};
let unique_param_impl = {
let field_matches = model
.loose_unique_criterias()
.iter()
.flat_map(|criteria| match &criteria.fields[..] {
[field] => {
let unique_field_name_str = &field.name;
let unique_field_name_snake = snake_ident(unique_field_name_str);
Some(quote!(#unique_field_name_str =>
#model_name_snake::#unique_field_name_snake::equals(
::serde_json::from_value(val).unwrap()
),
))
}
_ => None,
})
.collect::<Vec<_>>();
match field_matches.len() {
0 => quote!(),
_ => quote! {
impl #model_name_snake::UniqueWhereParam {
pub fn deserialize(field: &str, val: ::serde_json::Value) -> Option<Self> {
Some(match field {
#(#field_matches)*
_ => return None
})
}
}
},
}
};
quote! {
pub mod #model_name_snake {
use super::prisma::*;
#sync_id
#set_param_impl
#unique_param_impl
}
}
});
let unique_where_param_impls = args.dml.models().map(|model| {
let model_name_snake = snake_ident(&model.name);
let field_matches = model
.loose_unique_criterias()
.iter()
.flat_map(|criteria| match &criteria.fields[..] {
[field] => {
let unique_field_name_str = &field.name;
let unique_field_name_snake = snake_ident(unique_field_name_str);
Some(quote!(#unique_field_name_str =>
crate::prisma::#model_name_snake::#unique_field_name_snake::equals(
::serde_json::from_value(val).unwrap()
),
))
}
_ => None,
})
.collect::<Vec<_>>();
match field_matches.len() {
0 => quote!(),
_ => quote! {
impl crate::prisma::#model_name_snake::UniqueWhereParam {
pub fn deserialize(field: &str, val: ::serde_json::Value) -> Option<Self> {
Some(match field {
#(#field_matches)*
_ => return None
})
}
}
},
}
});
});
Ok(quote! {
#(#set_param_impls)*
use crate::prisma;
#(#unique_where_param_impls)*
#(#model_modules)*
}
.to_string())
}

View File

@@ -5,9 +5,33 @@ pub use crdt::*;
// pub use db::*;
use prisma_client_rust::ModelTypes;
use serde::{de::DeserializeOwned, Serialize};
use serde_value::Value;
use std::collections::BTreeMap;
pub trait SyncId: Serialize + DeserializeOwned {
type ModelTypes: SyncType;
}
pub trait SyncType: ModelTypes {
type SyncId: SyncId;
type Marker: SyncTypeMarker;
}
pub trait SyncTypeMarker {}
pub struct LocalSyncType;
impl SyncTypeMarker for LocalSyncType {}
pub struct OwnedSyncType;
impl SyncTypeMarker for OwnedSyncType {}
pub struct SharedSyncType;
impl SyncTypeMarker for SharedSyncType {}
pub struct RelationSyncType;
impl SyncTypeMarker for RelationSyncType {}
pub trait CreateCRDTMutation<T: ModelTypes> {
fn operation_from_data(
d: &BTreeMap<String, Value>,

View File

@@ -1,9 +1,7 @@
import { useLibraryMutation } from '@sd/client';
import { Input } from '@sd/ui';
import { Dialog } from '@sd/ui';
import { forms } from '@sd/ui';
const { useZodForm, z } = forms;
import { Input, useZodForm, z } from '@sd/ui/src/forms';
const schema = z.object({ path: z.string() });