From 00e4aa9e8f8291cc5d68cbda3013de3ca28d6a4f Mon Sep 17 00:00:00 2001 From: Brendan Allan Date: Mon, 7 Aug 2023 00:52:43 -0700 Subject: [PATCH] Sync ingesting (#1181) * sync + ingest refactor * fix Event enum description * actually do sync over network * re-enable heif * remove comment --- Cargo.lock | Bin 244849 -> 245142 bytes Cargo.toml | 6 +- apps/cli/Cargo.toml | 2 +- apps/desktop/src-tauri/Cargo.toml | 4 +- apps/desktop/src/commands.ts | 6 +- apps/server/Cargo.toml | 2 +- apps/server/package.json | 14 +- core/crates/sync/Cargo.toml | 3 +- core/crates/sync/src/actor.rs | 61 ++++ core/crates/sync/src/db_operation.rs | 62 ++++ core/crates/sync/src/ingest.rs | 355 ++++++++++++++++------ core/crates/sync/src/lib.rs | 391 ++----------------------- core/crates/sync/src/manager.rs | 206 +++++++++++++ core/crates/sync/tests/lib.rs | 23 +- core/prisma/schema.prisma | 5 +- core/src/lib.rs | 2 + core/src/library/library.rs | 38 ++- core/src/location/indexer/mod.rs | 2 +- core/src/location/mod.rs | 10 +- core/src/object/file_identifier/mod.rs | 3 +- core/src/p2p/p2p_manager.rs | 17 +- core/src/p2p/pairing/initial_sync.rs | 376 ------------------------ core/src/p2p/pairing/mod.rs | 265 ++++++++++++++++- core/src/p2p/pairing/proto.rs | 78 ----- core/src/p2p/sync/mod.rs | 132 +++++---- interface/app/p2p/pairing.tsx | 1 - 26 files changed, 1012 insertions(+), 1052 deletions(-) create mode 100644 core/crates/sync/src/actor.rs create mode 100644 core/crates/sync/src/db_operation.rs create mode 100644 core/crates/sync/src/manager.rs delete mode 100644 core/src/p2p/pairing/initial_sync.rs diff --git a/Cargo.lock b/Cargo.lock index 68827870409812962125b1f1042fc1c25dd99655..de37c0cbbdbfede6103b4d5e5073cc754a82d572 100644 GIT binary patch delta 18524 zcmbuHd3;}0o%dho+%!$Pq-&a%bV2bQBncsw0jLP(}qsgy((kFDdvuKF>e)r8LcQmvg?$ z=kxh~PG0?E$Mt{ixa&lr`c4qb`wO>-gD|(k)OU+CH@!SE?KsQREQq~4&n%C>%-C_A zs0b1>xBRFuZO^tXH}t(+%|F7J(RfQdIHt@$J0{f=(^oEP+|c%x5Kejg z%C5$}<6f51G8xV=?v55z#f6FX)Wd#P90nU`}D*Un8dNfR&dy}enGFeAq* zTr;<`Jc$z5a!uDY(>zsEY-47*r?ah`+Brj=e!Lv$i<~eF+@kQJ&`eAx4ujP5Y&UV@ z+)KPDEutu~GCxT}hgS$Q)+F`fERKBFNuAP~+tqlY^8l%~eOe6mB^IBvv(U`bG>DVH z4S4m`2|_nXbI;B5Fb-lfP2D2l3Q6JexOQ6j1$z*BrL|{f<8w2g5^Co!V#fYWRVvpt{()hiWiF+YWr;2T6T6% z9iOh>ROIo-O~d(y#)9rKV#{f)X5?p%8JoW6T165UNg5O(`(v4@>07>=@|1S!nwI5+ zg&${G7P64O9i?t!`GKXD_KKN}CuhGVr0td4jPCLy3tAg5&b8n5@3D==J?WUX9<$d8 zdTpoNx_@UY|8RSwa>c6$mXi+})7ZWL^HSL5j`sv2P`hUugZ-(WWx3BAXRhPLE-%l= zf#XGX;U#gznpvI|7nW%TEQINX9$V8Sv+tbpg@;;%-PrNoVWGZy zvmEGmO)n^HwkXb$#4b|H3v=5p5+`&WCnbcFqzL>Z;HH6Nr$K6_tc%Z5C%)(TYWLl& z>H3d~Ddp+EovL1rWqYG#p&?p(tX`+rR#VzVXJhuF@j^}ewd`&j)_bQY=i5INk=lHm z80hn3E6ZKax2z&dJjY9lEDw@^&lQ>Hl9!pCSONcYxN+=7iBE_`340v4Nm%ai4k*9s z%ow>@x!pNYM2#+YjVyP4i&t|j9Sm7)5(}>5`&=d>PsnULiLKBkDKaZdZJU60_-~kz zOeD9yXKvMrSiU!K8)R!(%1g?<#+xoek0$IWaU@qw3x`ucrx zS-%zW#CFW<26h(NX;g#}OKyg~9XOfo@x>xFNjy8Ys91!m>wBrCH`^v2%5B%qXgqV& zHEmqYSk~_cQAnxEQo^?=%p%TRD-Ap+#yW(1U1c36_A5_a_J(lOHMU&PICuF|Eu;H?L2Pt2 zK63J$=Kh!0pVCzhpFhyASAb>qVoFWnxluw+$3epS*uG__W>nZN<%Q_=&A^Oe&-FY~ zn^h$;lZcI?e!1b)IqKl6#lR9uMr?aI3!lf-m^@_zE!!<@Za`wXQ4zs&@s*#Ijjc=d!KSF(GI#t1oqBv%8O|#%n zSePVWWP5RJ#Z(;@k&>%xrV~VYLH)HY>bjYG?6_}+PMst?Z`-(@1+3g^)-6z5e=BB} zSA1le2+Q5a4)pt}pV*#nTNV*+hiS9f&}OnE;^Ao&+$>D9BKLy8=7zcDc^Oe_2R3b~ zeD%B;?Psk^Hsr(S#}`?)hM&G#;AX@SQNg|3F3gD^AekC zNHxfk%p*gcAaM$Zl!&aHKxvYm(Q;oQ;r>exctC|e;LW0M*GIW(TdYJb<_voxY9 zkin5vIBAv!0b8JZ4uA7}<*uL3EFb*%JY_Bv%a#;@<+|)>mg?HWW9NZud*G>v5V3Wo zAua-{rAs7J6w}BeBBR2}NCY$0wXv%lcVV|0xSOne<$_;H-CXaUxUigjSxaN=#aGMG zY7={6x_aTizEMSf0T2>my4zMdDhV3E@ zVz;15dO$NPqx<9<%P4caX#1-hT>DEortDlam)pOt)*I|eOpgjEEDjs)+liUjLE#W1 zW*P>jnL+F%86AQz)!|FB5DO(OI|KWf2s6E$w$q`qM-D5WhG1%sHHm=FYyi8$-mx8_<+wns+;gaJQ91v%S(6=q z!9uEhx@lFmXw$}3tIIP!cj}n>BemmHF<0$bE)VbX^4Lxa3Na7Eq6aB0E2Qq*8R18F zj$(kRL*Xk5A6ThZ5P)xm1TVE*l|mX{Z9{bBQ76aXrX>x8_fjt?)$!knD)oCGG|XFddmHUL74%ph=L zAF3Anj_&$S?kj7W(WM5?mu=VGcxa8ByUU4R8rwMI#(jl)`Xgfbp}Jl}K!|tBOvd%V znW5!TsaavTNT91)L^0xH$D+oCv_LO~#MxQhszzT##U~qGBez&{(~L6y#Iim=a_J>t zHo7*s8oGdGGj$_!9;Q|JbQGJ?KqZaiF!DfsxtRg`XmLRjsTGUGOr1(?Q+pPi8E-f< z8D4kpT0ny3l2k|D?1`|6u`ar`rSavPzuPt@Ub`~SRNGrJUJ^jTV&F}p(SH<#tcHe# zdCoH>mGhY7rv<1 zX*}|9hg2&M6Cdi&Qg{Mcn}@(%FSVV5wTc}ZKnp)2H=slCS*Qp-$Tfq|rcKkjGmsMt zQ4UU?S-RW1%SR^-^k)fpGfy2_jZ3K`a@ZA%%2OnfAJT<+V4obeDV&}afeE+|RE#?7 zhG|fWkIrbxHm-|xST$zt_>EDne`Z;KZ2KTwhhR)ohfQ{~JObkRkodsyGuNaV6wr9$ z%1kZL8dM{VKm(Z-Te+#5SL62YRHD4-j^%w+#XPiQpZ)>lrlLSh%q;NiI7&l!7ZH@Z zsT1Y8!qfQmZcv8U*Cg|!ataZ$_35$Yl<##5zp?0hTSk_){MpVK>U&?7gZ)VuIba)- zA1q?0p=DVX~CV0q@%t@p=I zK^><-))L2L3!|9FfHPP=TuHN$z}MUxB1&GlMF5-NM_uY0ZG*O-)81+PYVc*z)wpig z@1?Tu5zG2B_&ZREdK*)^NX5*_BN}g*p^(6RB8TK4Wx#7;pdqO4Z;|cLpeW>46H_nx zkyoDiqaJYS6+gPGr9AbS>FSs*;)KN!sYk)2QF#_e+I_#F<_V}7OHY<+y(5Gy0uU`1 zm`0H!p=~!a;q}xyx~5-)NxyLHC`)XBtO;%QnjE#dXjT-K`_;pm z8bWvD+TU$#9Yrv^&M-O~v)Vo5qu~mlZ!pIzW(ELng>O4J*l^5J5(EI*=CKd=kBHzy=ND6I2j- zf({yJR}=4O<*e#y5etd4H%0eXR<;VEE?t5YMPW8ki|DE`O&+#yCA3yBE)CvuG)1!1~nV0BMj_V^dP3 z%LGP~nie7p2xx}d*(FXGxo@@KOfgv?gY8p&XqI?XsFP=jPIdpC#J!#RwrM311P?Zd zw8Lh`#J>mQfGJRuKo=1bW&w>hH7Yg0wGF(? z!>h>)#hB`(_lf6(dg~wMz!ENtz@x*H%Vr^HHAj^IJp?X#RtUKS#nE;W%S;@N7M;K} z0Y(HEO*1U?4fI7@s-G?tueGRc?}2g^WIR8J@)(CU4;1HUK8P2Dp92Tyi!c*(YHD-_ z5Tlwq^kH%jBI~*D65`a8XUk4?gDp-&WxFVm-4oWOXXhL3rpBl?-!3~V(-9j)^S-DT znU{n%8I5?v1yMLn>P!yYMCJ>88@>+miXDo+<9aC--GpI6PBJpD7Qd^bz$h&>KkI|K zk&(Iq!t^Y7M2Pwu=7hKde}MA&fJo>y`UO-XbrEeKuCM_XE3%a3L5=_Q!nW%D2a6k} z4$Hobh@%A{V|$Jkfm}DvKnj{U(WMH)1^h$FC5;_6Bu>x+K>AvELvL0WKQFqfD;J9= zTGXja#Z=uOKrx;hP_t|swKo7TN3_D!Ck^wMaM3~z=}6dmSVl?crkTi&w_JT ztwsMMj6Ab`x$0Qh_J--mm2gD(7R7YOp|u-4rvU%qLWz~ z-Q{#NxJ!tp1jYjyXmUra)9L^tv^^vyAYMeJN*%Q;p&$HaP&_qK_p1k&iTSN+=rNcR znF_W*eu;u2&>9{T&7zJ51(zim;2dCb5Q8puw0>APV8uh#4Uz2C{INz?b1Mb0N2;$6i&cW@#gr4pwCa5uX+8g+AE~CD zBQ6x3T%mc@y&=d@q}q0__&ist7N0N9Fu4C0KQ0a@3+jo9vOXpbs>XjpJlayuP{1j~ zICKiS2E>B?NUVUM0Dx%$;G@Jqk70gnD-|0_&Zc4@Yaly#Q~?3>wVx>4_Z;sSpW~{6l`sP42LEX(_CV*RoA4PN`)$zAb=v!ksK5} zY9+C2YW}0%yiEMN>Y4=P0sA4PX-N%%1!6@QIt0594h1{g1uU3J1wsi8(ux3`&^#9! z&Y&zq*r@ye^-qfN>e5e&hSNJ6m!Suae`X)h!Bx9HmuHbu=NMh9LZdsn+oH2&m)gz)|Kd zD*nr^v3#-RGj>B4MHV3Ci89iXIzZ8jg7DVL3XYIHYzv-$Q zl&eh-m#q z5->p25P%jGlQi55*bJ95o#q_=U9x($^?7lUXet=$k)Mk;)xBFhJ1*!o52D)iTIbYL z9KR;)0W=8t1+mCWOtujSi4ek#ve+f#GJ+w>*lnaJBpGZAs6NPqW|JvzeR_;~_{Z$x z_MeMx(7hE8^f@32DiRVpSd!|cr~QNpjXsT0KuJR1!|cF@##DN}=tLX22vdXfsG&P# z*Y!e7t~UQdOl~U=IJ#Rs{9Ez!`h#!$w|H7@en~t(k}rB3zJImlWwB3_q*Xt7McirV z`Hy<`4`N;!S<9=|-;3KtEvIW<$*1S5XTQgwKH?>qCr}I6BtR;fVjXq`HWepCC=00e ze!y4&`Hz1v@zUavo3by7>+U7T`;pk;28|VwM2U z1r&V;fgUeIs&@Z{;_}NsiL(tQ_ocm}f+L=WOc;Fa?SK|o4g)`;A3cDfhfgyhKmCw4 zZgTw)6&+%Lu!CYAv!i=vsz3fkOjC~^%Y-0K6WSWxok1^&6QL>V<1sKg?YW%}P z3qx^X7z&|_d>h*9#4K|S!_*qXXjc!tC6?+dGE3J$6Rry|M>haLniMBmk`tmL!5(;1 zup*q+A!{fp)n%8&UMg_9z1~5S=t0FO5esPqhAQQEL6K+;UQP-Ym8T8C(5Jr z1(_!^MaK+*ph^WvU7Qo(elV^sRazS1(PG|TLpkOzD0w`-h2lh{^HQ}ZVC$$5st%bT z$E(F1SgC4hiSe<1Lo|m7r5c=|Q^`hS@W>AjV+N!|OKeaUXc+Z7p`Q~hHuV+AtTo~4 zjShLRQJy)ttRMH0$sWV|86N_{8A~&>sAuL#-8n@Aq663g{l^J*D^zmrGvPf@Pjj=L z!D=H?yL#es{`1mQSx*=>=J>-@=`!BbRij&dV49p>-t!Y}Kp?~MjU*_o^fCBH%C?bn zTHUA31hJJuvlx`HbsFwb!5NJ}GF_0Tr3YzU<&u*Da7|mz*y+-*rAT%9bh&HfvsG`W z+-|%xy+gMJv}ubBna!HzM7|#VnN0Ff;@NdXGKjbtLsKbGW)j9dT6f{a!NR!i=0A!WqAvQInLo#5kh#ei?x1e#c zT4#@zpM&PeYsBAqy`S{RKuzvJGt1}6_P_IL6<1sO_g!PAdT?K=!gUMeH~+3Tn^|$4 z=i4VWfi1HUcHHcZ)X=}nIo0Ef>-~H+7)wDz8&&DWmBNzjvA%aigLxV`A8tYIbsR9_hP`$LS ztEUSEWPl+8G@f3;G>(Zj0GD8H&U&|Y^r#1x%GtFGVF{cWA)i@r05k^X!_x87=m8A! z0y>5sEQR1JFj(a138BY_$Qoo~z?gZj9@(z0Tqh@~Z$3w^9lu?UQCklqfSg)@1wlDz zzQ7)>nNVu*N5+(6Oj~+fhWZRgLZoqQKsL1)=0q6+Km|GOLVslZ(xL7Q94mfjWMTftQig!n_4nLxkpjSYrZah@gH< zKaBmg_Xc|coenu%XEhxGCa)g|w4!z_q;LLonS23}zxvh*aw_<#+I6D*ck!;fJ$I6v zT|R%nGWGsbD4Z<7f2%Q2qNKPfq@Kd4918)H(XxF$mp}tJ0ubrX3IY~^^qI7r1|6v!q<}|E` z|8qmPzULp+W2eck()73|AwxVwdKAMrHN=sFvg1S`J-9YlfMF6266_OL5iKOF6n`9s zK}r$oJieiizH`q5Mz7C|;dI2%Cq68{tL>djz`yWarZe1#$`4}$;zfLt;)5v+R|>*( zkuvkv>KU_tuz`)Ci_!#G$!)dm2syJl^ds`W-<=23D$?T)sm{HE48C`UF-83;kb8|=dn?4#8nARz;3=m41ZsnYOdMn(l726(59Oaxq3V30u@ zC_w-4O~|0Oc_}kAI@FtIj_!K}ex+UX1(TssiWDBe!2uN{8m#M2Y zbrfiahoF_x>2uwQ)p5TP6RZC`OMYWi!csS^mlxCo^4jxcOLgdwObzPI{0Xo=W@U&V zq;jY!J~Y^6eCIQ91E;b2Oho9~5MBg~kPd2N`M}KubE7rEiJSfOU4%j%yGi<5OK^Ha zh2&=`eNT1eM$#lKht41Dhv0$j@p&>`t-X3SU5VKq4U>`tndD``gMnoOXMmdp!Un!L zpi<221&bMDz1kUM_1N6SsmpCXJiK;Y{n6@GNjmbu+D+#z8jjChwBh_3P4}W5?|q;r z&XH59?{AVlQiJJ!fUzE0(oR`*_!Fi#)LX)l#!FviDxl$MmU;AlloKilju|p8&=O)$ zXzW{k{(RXZYL;f9M$uK_MK}U%C#OIni$hk?0hv-DaqI7JxGdr$-~`f!uEoNnp{M6` z+R9udACun{>c@vLpwO;uDi1l%j2Fe8aS;A(ON*trPr?Ym_uRzf@c-P>h(E z=L5K*e-KoZ6-JAA0t>fU3$OVnU2fC4$d!{f11|S`0|))o4bu z!R>Neb#N)yj47vmob{lJ;9A#qC?*?Nh^ZmCM<^gnsF-ClAPI5aC)6A`ycX@6jxRms zMZMGhqyO7mhIwtAI@=S|$`>vktFHQId5ZeVXJLO}QRb)EkeH#+`1CwQ4?YqSh1{o0 zurYd>&?TrC82v#}HqpvpAE+DDS^nwI$+t!U;HN%M2ej18FUZ5y(}&i=i=NN&_;?zS z7$IQw83w9sfpe1q5rQGPiJGl#Ra9WT>C|H8Q;3z#as2gPls;aS9s7#G#aMY^0vvuo zgC%)tXq{7WZTbY~6Cl!5JgmGHVmjvD2uhpRzBAqgZT&Xz>WY)bRJ*?@pEJ1T(0vSW zalB(BWGahDLpNk7Mp8g{xreSFTqFf~gScV-1>OlgHD*+eqk*>KsBL3uHG4hqYUpM; zU)^$}?9`o5tM>h%28R1uyT`Ty$o7B)Oqp?t!}U3*QRK|CxVVq}!5}UNU*iEmYo}>5 zL(+9`fB18L9$`hOnIm%cs=X+zE(ycuZ>sG)YU_=%Rekl#a-RCsB=QTF7OAHvS8OVi z9Mi|VTu&ls$5sl^*Lf%TbjQ7Ot9RX^ya^VB=WlQ^3C0|S zfgfi%06qZ(IHw3=uZElXz%tei6-~<@)I&SrunNS*u~@|6*K$qf4z z`E+~qH&*GNz9L^7LH}y=oANz#&R7$t!|OJ@tN3nQwI;7CKIG}uOJuj|`64C%(yvhR zopP{ec|W`lYoj){h7?1sL6e8*Ie2vOg;@-AeF`6)9u1j+4PpzJgD3#;>A^|O+}}mo z25yzzfAf(XB&+5e<8MFG_ivRug`);fHV&l!)lv9^Tfy7Sr-V_WTw&G0n+B^y0VaqM zm^~st1`BFuyp}zQOtO&u2PGoCI6e)T1g)XhmAA=>d+$)~xJ`agR?pooKiE=Ubss8r zfVqUS!Q>Cy5;GBvdy$>VVlAgx+yP>SW0v#n-GQ24Jgg9ioybWlc=48&>5JT(Z=0nISorChN`F7i946{vvd z5X2jevDp)P9+Af|Ix2Tux4+uFUH)2?+Zk!S^>2D)$t)SG8RZPC2a`$MnB+R+U_2nq z)A%y?;P5~W}yl#4m+CU2#J!B__6yy#R$6yJ+>g6c?EqvW&PNA3H|d40j`ko*1#xqAB7e z3{3#-SRWk%q|tIf>>2ov(#}Yqa~RsX6X$iK!;sUZ?k`|rUpfUn?ea~u={>JOH9zrl z`PL{2RocH@VXwYIsv}m>^|U1lSuREnK$Q+L7tvcmJ}I=rCMqOA?P=S3-&3PF9(Q7^GRq@i}a zT>l0n?S}hg3{nJEz_zicA`i04A;dU?Qy>DvTO%?7oDPa&%bAmDQH^l~n_*G^BdgvL z0MZ1tQV)0@iDhfG-yXR@sH^88n;rS89Bv1%Hlsk@{i^&Jm1@!d$SccloVctXhX=XM zSq!ilog0>#b8L%}0{XXUSURlOEN%@o;Y5hXAv!&yj%|Ik%Tn>dvP-S_o&0F|$+j-u zsa46}LkYRGK37q*d9Fuz0WuklY7472bK|gKZF1-b(=Hr)ppwwmOq7`>hb%CXl#_3& zg`!$k8>gQBy}XPu*vWs83wiv@{veOx9?SdCO-XbY>`GljVAjSoM^h!7eYjAmBj}>q zyvTt#2c-gSlpzS5hw*}+=yd5SuYKHCcf2M~tj$bIKm}}^KC?wd!F{1$fk_Y51t%93 z$=CQ6XO)&GEQYVxr8%(2aG28*Tr(<%4(wD{UnSeruezXiJ6@N&nl350F!O6$)5-Cc!5EOMu|w>V|= zx^Eu?E?(O(7#`@+GY1(ck{YP7@@x~3hXw@_0G*`F6F^W*SS>7td^;YDW)& zivE98^J@72EzYNK9s~Bt`A`5TegmfVOo8>dox!C3S$qbZ6=fDpOr>;8Yyj}BWr?rkgH7Lbho2JJKTU~ihYzw!Cs235bt%u`8Bj2 zJtRep^}?ENGiJ*n4qY>Hlv@nv&91Z;%&IBVAGRHuFWvju4B*%ckUyXEx|O% zd*F2fvVaI^_G#D!f_M2Co z-fr9_nf=A*4fZ2}XsMc-Nm#H(phBjHFaY!pv}x?dpnUCL(E2w!#6iG<>X2fYNT^QA_#C1h$(_MLR)r(k+$Hi1k5p z02XEsC_0C#SS(yD6jo02AOq3DwX@CFr<^_LE(gRRM{R0gnz6D*;e;Q!n(;QXG7R+8 zeeL+>qzbJY#(?gF)WleW@WSNP6u{?a`AT6JHFu;N5 z#2ALy&jb%shjv)uyGUp{5lXI}X+g)aT<9-qT@-!>l&VFgj=yF?<&fK1o!n{MCCaBb zlBdm2bPL1|eO4Q(3NV58gD6M8!#*$SC)DVPFXd4)l3NeQ*Yv#gKz$#|Z=~JZu(y00X9F;&nw82Z2Ja z9Zo&iG}xFtMf+-I#pCZc+N)O<85{mzcB!HF8(*mB$7<M$LWGnxp%+H(XWL{>yabVp?8lYE1N%EyGp z!POCw@g1QaK1K{KfjY36oV)=3z$N^U2_?2t)F6_KZVuzay~L=UDOk#a(#SD&=%c?e zK1e66ZMSzFV!SZ6eCTE@Kl=1%plyo?&8z^XAJjV11qG93$pTJFFnxrZv1b^G7`Rb7 zQAtp#vEn38DcC19CO?UJkY}7}4kHac$Do0o9~AhahknFps59Y6ykm!Z}QO zBsLZe+N!6Q8{?@cdQ3A@{on-SxPG7&C0!rN0Ho@@k!d#vy7l1{Zb;)pQ3Ts? zhLi&vz+nArG01$rrMb55$;J`9`1vOrr!=n$h{3_8fA0YY{y5re!p=fECf-qy8ckRW z==6I^rp8emWW(Hp5*|a9(J%LsgYgP-GG2Y=6ywDD!hH-oU1kDg9y2FlzQzL zz}(Mf!6``;5F;n2X>SZQNFYwdrOPrv^HlH*IzD@erN}8h8?w@ zGTABZo9T-K)ATs%b14r1+&Eq%imZ)ft5dA&;%Zp~{g^PC!!WAXfM&^r>yIAleh| ztR^AH5P`r{r}mFOV!ScJtt-y7+&;?S6u)ucd(A@@D-xqqz4T$EHB@{e7(s#Yo`sDY z;V>Y$0i+x@Vl$Z`V!CIKwdL5S_7jm13>$`~2Yt=+xo=%9Csqfh#wQI(#a*k7IpxE1 z+w`dZyQ__!>b1gXYr&=Z_E|;?#l zPwK#?z$AEvDW!PU(Ie4=GkyyL+*Wr?WKq%sfm?}xh2o;|mtFiT<3+Xm1i=5GPB*IXzxQ!#S+D+CGDq7Aqqwj)u+~QVq-z zb)uuEntQYHm{9X>Hl_=&jHlBez{Fq>&>F?k delta 18435 zcmcJXd6ZsNneNx#r>as($XKZ%6I2o=nB>ib2rRQ7Q^vf*R}&cvobGm zq9}KQB+Wf1wS719i@Zp3I||LvOcO82)p3sKYW%SCP+2}34fUk~t7wGEp_e`G=K@r~g8IrjUaJVHSHfo0A4+VDTnS5!9Sh?=twuU(LX(_C7(#q-jkEM85m+;H@xxr(TC{430EO;y5N^&aH zOHJF!3ePXB#E#8`Jqn#L&|lq20xJ&P*s{|oDg0cmYL`>h`dj6!zdgGl{%J_4@jn%V z{iYwASsXh-Sg`$;={s)jx``RH8+IH9Nf?9^QT(@a=k zFHRCZ$t$8DcT(SqBQxdT3!y)X#OOqZpVW{H(V zksHLWmt{`j1UcIs*=fR$fp58?9dfiHsy5HdXOHOWI6q&rV%f;NI9b+f&bQ`U(d@bE zZC}hSJI$ZyE$c5rEAXNqvc1T%fdwnIvViUILO;)goF#W1*YZ4Y#LhTyshzTcKBqSe zv%*nVERbF0dh1ZhgSSmGI?KPjIJ$hw@`c}+Y_E`N(pJs|u#q_&P>yKmQ60RqlY7)j z=oG$Lgdr&AC4TCgX5b~JRXBd)B%x<{`V6ucqeXXli08ENCH(nz;{)EM?UR=-TRAd& z%!l$dYnKhL>^-)3wyU0fSe(|Mnt7OpG0)bJw?vjAPDM^yAdZ z)5NzO+v2o>Q$d!i9XIifp84STap{_iR;K4)QZJleZZ@Vhethh0qWoz8-7WLh<|Arx zhm8Bpz%;WgPCSj1PGXy}=_D2?>e#-Nx3pdWp*tLLm54Z`lf-2!(=5so+fDMA zrH?%)cTLyyTq{U{Q6~VgGLC#4uo`B}&gO+zZtLhQk2!U$dgWb4$LM+SMI*y2;*nTQ z{E0lcy!_NRg{3CVm9rZ+E!^4CSaZ7DJgeo7GrHBi{{bwdo@3@&0fIY$$yZD5ED8eG zWX-%Z^GsSw7C4+>`awWbOW2Mi;IOj!QJgnnVaZtsmz&R=Rt|o5s6WY)(DpK3)5{7I zD0gYNewO4WeZjN~3-C=W0KEt-CpI~j>}BGnXY>#u~unLMY&WC{X8>Gwe=+0--f>S^4hb< zs!Yo1Dgn=75i9@M?TznVGh_6?8 zz)4KAZ`S&Xa}H46(Y)~&&UsIZFxAFmXJM9AVO z_vqDN5MV|<(-J_v0)P!%U^_|TG>_Aa(&mU|dEnE`&GNWcyUL#yrwdPQS|Ynu@T54X zJZxE~TE9drYJ7ibVyM$UNNGe)9Opo#71_4S*Efqi05&7b3wZy;jlHCxM#GRbOfuJr zd}=;~rG&KoIHenRl{+ttn-6mf*9P1*nc*FRFpbhUi7B>}YW71;cTRnW5tx|fuq2ng z3_Kp{#4xHzt$v91^Rwks+d1OJ@_6am^2Oz+HXm(4t|HA`d^^(nX4*jkd(dc@X0~Ui zxeaxQGY!)&g`Njt?m8*8$s5~&+WS5#zN7P7WxBEh`o?y2ARze4i;K59mccS<0vt` z!U8mNzOx&qewwJE4$*DsR<+|uqf@Dd>}^aOdBji$-Ov8&Bf&bDfeXM#ZqCBNEi>wf z)#p40R+tw6vy)Lw{G}9V%p(iR87Gld4*ja9TzBz7^?vqSta+FRu4kHI!Lf<*6t3&C zi(bLj#t@ANn1_!+KJ=HO1o{SQDpyx*pbIYf&4G9LsQVEkP|@wcx7(lf{2Mt`*du5VhW$e6S{$!!j2rr)4j=$LbtgX zd-jOY>h~w`-m|XmD%<||qW;(}=seI2FO3S{x1stFtbkKw+9{Nd`tU8wGjk`jsk|7H zkmae9ur=`atlT@))0lPjpIQxbzNvQ1kR7GDK2-N_^>*|X50<8g@75Uy(ZwJ<2hZ5Xe%cZgYyqc_|u%WuqF)X%|mW588! z8eBW{Og9REfWS>G4SAZ86()z(rpfUG-I4+fT@DjmF==*~Tgqc3hh^itWjv5L2$(oY zfoYQ-9M~o(WJj#C8}NF3Svmqu)b}_JmIG{s(3Qx|=o?TYue2`dDW5E-sIED1t0MO@ z6Oi#D7=OVTV4)JaQc)Bxlq9B$Tf7G#$OpN0o)lK>nu){7%yYk}_o)3yK2_w!PrA@nK0c32kQ^Y;x)^c ztz5czMVzh~hRE5bvR)K3%Kn?CmzUjqT6y)2Pw0;xIyT{m$97Kh;(O#YVZ>7q+7S8_ z?#n?pUF1hV^Mk#_5n|S2(*sxzTc{=;PlZ2y===HWW1GJi^21%0XgM!9IPkun=w>hjQb!^%rZ=Urfbck=>#xzdn3x zm%8pev8X?Shr&_{c#BQ_P-5VUm1wuE7SmZsn!QJ(2ur#P%* z%l=K{412y)F5Kjapvl74-8Qaq_on;X8$(|;+S;27yDC>3UtuB7+1AIxzhk#<7tAj*OQuLIQ77?_}pQ_Zdejyr{s zIhl{F@zt}h(Cyav{&635k_(DScT=YJhe>!+4f45YbGt~3o&^dKGOHRxG*i`E+1b2`mkLe zzQ8uxHVXmmhMR^VO)3f4Qn)ynU2eFxM{SQ~TY1bw(}mOcz(b#VXRk*8psU>aBbrEP zT0Wu?Z~)cdd$60#=G;NVu)81P=w&S-dB3Q=jR*;SS18E(T5(x#iyyly`miTMu!i|jl?_UE`SrJK#BZS@0^0tpMno;4poc zw_ih+gS&d+q^oyrZD~CF)5nbK4jUZcumhOTS-M&t`eq8mq%kE5&_h!q#7&5EgZ7P5 zgcj-vSYWy6>wf3~%xIZ$EAkb?MS(t~GQ!4^r*_L0ehO;bY5eokcN=Qp$MUp3ARu(v zZ+e|if9Asg8bn zH@o&rtKIuv8D-Y!d9jLrixhb6&(qYBvvjL*=+D#`J60N*v_0L|8@+L?)aH zfC^C>dtSg}^pWPoq*R}q>5hz`ZLFa zhsC&t@NEuTXrqUM5J_exk+0=C>aUNSUSr%qk!LO< zo*jb7A%cqtoTL-cqmYRnB!(`l{R3zdUBj|*@Z218mXCAM)YaaZVrDz`DJ!A29xhs{ zd54N$it^5Ls0Hk1$cQdqn`44$;`6ouJ?$!nzqxtH%H#+t-_pjj&!;$ZUGQrAIr4CI*L*P>$lf(y%&Oj)FFqyIu620!@Qf^=Wz#ZnQs~Eg zB)&8roSD!o@#WZ22y2Gcb$r?lSPS8XZ({7|vaSxXMA)LX+E{#+%WweE1pwQ8rv!b^ z#$n^3m?62TjgGbOe1MTCL{YI3kjV^t*v1(0HBPJh50mZH0lv6Rqo#I3v2!q~vF;qq z2=EH0pQeIKgjj+6L+b;EBa~Q%@MAl1bc&qc%g__>QLTvSgPX+|b<-_kOm%uB9&1rO z{bFaCO~H)7fkZp=r5Jc1eK<~RXMuF^u-uZEy@n&8)p85Mfv1q5lOfTFUesVzbD?Ve zfVf3e&b!6iZTbW`sQvYi9i^5p0GM>k#k^vVW4=bDsh01=7=Z}X3`tg`HtQL}G6Hy} zMK6Xwn3kr!utD5k%C?q3v(`54{`vracv+UO9a#g1-v7sy@v3F&;Pd2Mb@j<&in??k z*vu*o@Z zQKo29Mmmr?(?XenhgpHMVrL#Sj>B#TXhBa~!xpxDi0<()U7dg$#rkh-JPJh4HCTgNN;Ci)c@&(Fde8$0!JNUYj_WatdSIcLr0zRaJkSKfrOTHAgo!b;`tm}t zueIDfIMhdB$FLO+x5I%Rr(HUyT6M0N)TXZZCi~&D4Jcb|dr%9j87%;vfo=pA0tTa&?uuHa$MN-0#u2#3 z#0w6M)tf@-@YnDes>P}Js!*q7Vxs!OPM(W}##W;$V8xb}5i|i{*|IVRmJi!cVPx8- z;%MOC;bCB-U>jt(NKR2G>(in~y>+o@QwQYYB@wB?3-}9%FBOxjg9~v$t1v4X!{cFVKVk&k) zz2|>l+#|ymE*n-mE!OeXwc;Z@uo}KdEH?Q3$1fJg3-#=`#liZ8+p7~kB6hT>i{B%0 z(VYl00lk5dL+1fGHQA;W^Q9w)Lk^k1cy*u|v?>oWm&Cvto(YWrRioz%>VcS{&k;|H z?rPWNVx?3qpAb)!Z(Xe?v(y0|2y;u1Aq=wwpYs4m#91^=-lKwdfnAp8c$tuD-uP{7hZG#sGYn>*20v zP*qwL+$9GUv^Nl$+Kz(M;8uIoBi5$p<9pNK5c?R9z(MRbA@0QM#OGwilXnVrtI)LH zx3JcEI|wkwHKYL7J;E`kwnAJ>YA#8662q1}hP|*m@MZjP6R^$7?aO=Arh8tvl$j7Smrh zz=7qf@gLWtwSWcXq^G;f$3N7r-ukj}fI9IMMtlFdSsY9qJ-u0X^T8t)^&?byE?t8I zPR-D`d37%fLyUAdFjIHhjqMch8kW$kq}ui%TmH3eVtcjv8{$6=wfQK~Q4QZM zd=76tI8eX3Pjr-Dzig2@>|SwRInSG=zILy8L2bNOoT2vIC${LHrdIdgFRnDI1rLe~ zTGV1si$w<#h74xBk~u7~eoGI&paC|>(iGPE58v*(p0pvffi5eO&yQ>d8 zEJj+@a|;Y=p7PDG>lxot_HZUNE54DoaUsxj3MMR!9-!1wfFSVLQQ(%^((ZPqHjaj< z>QmGXZm-Z}RqZUr40YXO0M?Fipa3hHMs%gva@f9vXUCKZW6sE?&7smsH9%V#{L%3X z3_8v+I){>qr~^lv_PtFO4EF=RFZ-crmt!%=7LT021}(I<{Npuoe=Rj>ITqkTjnEQ& zrgJHV0hCS0B)pihi3v5KDY^`or~z6oV>ncAvfmK{%GIhLK`lSMQ&fOkIV&3K#}p-; z;Ngxq&}!HYt_9VM@WQOYVOgpw`%{1cu}a0 zf7ULP&s-1k!ZyKCfh*Vx6~{5s1JW>LQ#5Ngd0Fs^EdW7XfPj8#x?!Nb`s${ii6voZ;h0*9vM6qPL0H56|}epwqwRc8Eje;Wf$zm zFJ@SgfxDQX>;kkb%TkOPhRI>M^|kK50=k2R4(JY`BXaE*L0>Tu`E>>z+K8|Usv4U- zM7iM7FnC7WUrcnr|!Cs{vP}R&d|G8bk?5(!J4A(dXN_={~%thKKY6`L6oyPG${^TA2N*|ho=Hy zj1(XatTXaM4+&D}BnS|N^dxK_O2(q6QsdaLS)dip)N0jh;zFam_CY!`-~h!>Va|?4 zyoeDL<%+T8;pgDeL!t;d&dHTf$?EXPSkzD9GTG1r z9}8Op%7fz4iGe7=W5dW}{>ibx{ieE^ieZApE)c*80+o`W9{H)hH^s@#$N7-93?mR$ z%lGBvx{ht*qF6LE}7qH((1pn*Y&<@M#$XFj%OKdUfPJaq9lZLF5=<;^q1- zfhWQe%##5<3^xmS1Q*Frm{@`~zB6tsJq&6_bD;sTq&3yoj~lP-KZ_&YIUxH)tGevZ zm=*O3;6#u#p~M-PTe`DC6>`2I6qT6}z5qZ>tHdeyX)xN3KuCkJ5sLvo*Wm{B)+}1$ zz`3$Tz4tF-(*828F8YgDIi{Rv;s9ZLBXr23p=vR2p+4|xqyv_x8RPof6dg_(Gh&@m z;oVV8d^3hau+>=2Iu%!C&m3cnD%<6I%bQmZ^%K_OdE{b1E|*pD=+(r|IOI@KJXp#D zJ3a`t645p<{0iY**rpDEnZf@sG}Zm1<*eGR8X&l)83rK(Bs6+C1V1^+6sjKS!)cfl zxzH%4;>do)3S&ar7W{^8Vc|hrV`WeEsWCD#)UO}bfXR$VhY}c9#vn1;55Wud0AN4~ zW&)7%2)4w+GIXTJL(w5w+D(Uv?4K$Rd}*}0afKM&zHDWdU%Vn-r9L%Yx_Zo6BgQYr zL)x^!Mds_pV3efoofP9pM~D!fG+wkBfdJl_W>&BX029!R;y{0B=+!HI#r@(ywP}L% z|7vK?RC_1Loz=q=<#wZC=9`nIz^(`rCIL)0 zGtP*f5kocS{D1yp^LVavj1JZ1a_%qak^R+{9=Trp<36tX%rtrOKk)gA=Lgk!Gvt_m z;D?llZJD6HJ5%2Gk6S1G5!EvX%3Y&75q)I7n)+RjTk1bn+Ygr)h*8Zc;>dNsjt9sX zP*pyoM#ziLP*+Rwl;9DLVS#7v*vzo05hMb$fRD~CmO@jN-HMcsJ1F}1qQk~78i z-@_`MaJHOOy=cqZL{+--53Op}Nw}dL8CWX7KuSOx9#(;F(qJ6JRxn?1js#}G&R0>}VtEl`WFE$t2O9a0R(a#9W^vmeYG zMh>(q8b32MdKEJ^vXsXu=AHYkz&3U1GO~*^J!7;w@)-HqGTA%S4^xM5GscM-Ns<%9 zW*{GDG4a9iz)vJgi?iXSC^w=5b;b<41@c4ef>B#N-Z1BkuReUNTrx%togrtab;rw= z>W!1+qhjpet-ac8i?|<|jg&;ddAPLj2tMA{vn29dT@dKo=Q%U)74i_2Ksfhw~uTfs7Rv#EE%Tb4I&qG@Y5mv zmFg)sV$H|RvfwoQU0gUAE+IHHmuu@WoSy$5_kMX$^`ZC4r|J~SwHIRgt~gcxQDgPZ z9~B2zpI#_&p>8{y?|a^me5Co)dl$*aFiVEtrZ(VIqt*4}#T@m4)8rJmrgaj=J;N3~ zsKPLS5nH)yf#&EWTnTnK(T1Eu4-8U9>12p4_@xf>5LVKuLShu!kzi`zsDU9lQ>{Cb zX1(ro`3JS&EI#rJXUd1`{4w5yPc(sfCZk)%kOV-m%_%?}A4EF*1Bzua!e!TOm>2EZ zChCI}Ax2Bz(qZXp)Cc5Cb(vR``PZd-^jvwo+POrIujZa7C$y?hycZjjsW+i0gc$>B zra%nA=&h)aguw(h0H1>VV}?#H0@{Sj4)J3=&(|TQokqBB|^Qkk=@K51F zP-1xfmVNeBtzr6G#Q&au;kl`MwhxH(wi#DXbIALp>? zMC%-qdf%|bipSqnJibM}u^MD~b(LiJ6whShQ)nThXWkG-Yzed)gAR}bF9F9Kf~h+# zgqARMPJ|J2Fx7q}ZI`cSHjPgG;I<~{H4kHob4?0``v|U{R`Hs+^BQNNs?3SB_Zt$c zF0|thY~gj~%(dBaI0K2udh-ZVv>(3uqGjoY^YS%ohS&VxaQfkxsI0bfhc8l_-w@NQ ze_JbiMTv0i0@*g4TkKP9XjAeg9R;7~XY)x?YC>#b_yavTILwG${0Bn$_S&`?1H0Y`z600~gdg-QK!qao9g zY{K%y{>gi)MeA1oT5lFSvaf#0s@XerVN&w{{ZVNnfKE`w+c0+g5&1Iyh= zYSL{7mb-_?sGYx-lU1=5y|jKSnfk}BmwjsOI%B9`O9L`}xR-d6LqQmNBMSUjGDcL&CKIR4Y zTXd0-N1!M{9cD*JbJmE$f$U>D$U51>L2ksgxlaL4?Ceu{Fe_N1Q>WSNAkFrNt6Ra_v?$%0? z1!l>aGivpr(?pnn2H~tSaDobIjo_~>()X%;o zf2yprsh0Yl#=Gv2V<$Gz2<&Oj;5=pBz~5hXhkRT;c%nGCnF7;Wc3`ExBIir>3GTs`x{a!Xvf^ZsVwlyLjv;nkK`bMc^%uLqP4w0wHkXr0*D1(KVTf9KKg)sPMxts?oiJ> z&z*}sua8!jw?nf}eo#KIwzrB-b?QT`6f`X>Cq3GsHm#Slss&fc4+?eDL-G&RXTKx+ z+SSWGBxTy;a(>nMn0!i%ZL(5=BXbkEC?ug-qFe}Cb zz)=F;DKo9JU3?ySk1{Y~71jK6)~1hwjSa z3(PW|y27Xd4alI=Oh65!kS5;5t|&3GuUKLL52Msl?Cw&Jyd+Pruj~vk;=Y``MW_anka%qtH&n1QQw%3=AJO|Ypr@V=v8na1Fy(sm_sD}qA0z`n z){kd0SO@QcEa)?hng9SPbmR!_k~|KB5FSX^u=T*0F)xWmZk9xaz8==21|F3MjCtpd z7Q*ZCe_?mO{4#gO)KB)v)#{AB@*?2!2Ycl;^=I~@0=P<)akmN9mr}tbOSQ2A%tMDk z1ImG&uO*b`EZioExW3>s55OV}>-!7(Q^%_ISL7=7mri4j+VBd}Kz}1Pnj8}0c)jq% zxVW9kIGxVQ)P}Orj*?z99e89Yg_90)!m0wS5Y0$44e|Ajzjk%bt28-ZZQKi3-uXu| zx8q-9n=RGzntY%Btke48mQ0@c6qhtA0ZN*+PliKBQ*gMT!rC%plE-K)WUR;VT3>j; zlE;)J3amB;%rp*oorQV$X7r@KV5RFBYzKa5zi0tcs?-H`6iti%%pDX)BWySgmZUoK za8wi`AcUpu(ovqiKg!?LPwR)IuyI-*Q8rxFV_Jdk;B|GV0SqUM%#~9FE7?le1m}Y+ zp}r8x%V7<1p|q#^;T!UHq;V5%&zs7UA6am;>(~^P)TVa6$zZKscYHNHp<~p@6(XLe z6YkusLe>$pBtZv3aF~A(5u;Nw4Cj&qgpjZ#o+5*pnuDur_Q@$NOP3EP`fPQnhxW;` zIZz({v&1~tcWtUK{Y8!y^-ZUKoN?$34TkHS&@x7`j5SFzYEPNJq30PopGcqv(SXDv z!$m3@D;L?qP)2Lo8milim_!8zf>v~MG&^sa zcRGs@;BRU#k+BR&!r9V;JUu>Pm&lgZf8-3J<~R5%|8N+Bm~ypAM* zg&3R}cD?qGbxajcSPo$JdW0K?M@-phH86IWV^n{C^s&9$ngfcr*o0o%Ko5mU~)#IZLRIhe) zu&KFei6=r*o3m;m+R4-D0GgXJSn|;m2olH@qB};r5;D+S3(~3gt$+@F0r1ebH`FoX zjFIZ@amJzh2VK=0WkPE9+^;*ExMpQHL=5JQOt%PqFhq($e9|lP7M`y#hvw&^E zel&UaU%m2i?!c`|D^8=YK$b&$5exO7Ljw* zA~%9G2i9r=z|ga9xB~aU^!*@epb1xhJAP{6LB?;I6Ps#buQAvz%xcr&#;0^7mH(fu zdJi+aYTZ8>rx-fn-H)NK?_zU%A09x^0HUX}a8P^{a&1`-?0iI5E>$QNlX&RHOpIFAK7Bz5~7#v{g zz=t4jagT_>F)U#t^$dkXDR(*`VcfLP30KD3`T_vhgVDpy6%BG+Q&!vhjGpQx-*`#v zkB_P+L*uB{c+Hyl618_5mvI^Fku>8P5_wpZCMi#@^3(gdo~aW%TvG(88P@6i6IUT> z3|w82vj5r0egfeSxK}Tgb;HP10!Vu zicli@f|)@$nU+M%lmn{oFQ}Er812>7M;kvMRo^CpO<{#Gqvxtpps6w}gnwZHEf`}F zg2}xs_8qMV1+}r%JOm2aL4FZn^IfXETa7-!m}WR|+ukF@v})i4<2mgpzw1O}f-Ij| z#6>85=N`eDabn;ijFy;~Q}?_oS66Dsm;!^+f!zE8zk%4odcZ27^EXp~>X{Se0o(+M z>N_CSqy@&3w)Xvo%SdV69S`tUKnB*Dp6Wn>FaW74n2)|Gj>m=@1<8Rt;I9w`$N)>w zQY&9ViBLMV<6*jP|F`7KYQ;k1t5RoP`v|GPFIgghksbma#$%*`6S>zI!kvE9tsdTz z%*XuV-%O!Xj2O+hC3aG7dl*FACEKcd78&D&HrAK+Te$rU+_|;@$tR)3WpLUjyX4?L zVAnxHfJ8TBu}Q?D%ZaK5{1>jO>1~}(e0q`5p+0}QvAa&vYBHoRnd58$ezY=|wWGuG z6Q&|!3lWTUj!Qs#08C_5&bdQH+lcXk`68AO?0&2|>`Y@({Y3IjTrC8eiFt9SkDK>w zIzMQX06Q{_Tn>*OI)a|uq#XjM&!M1MGvA;Zxj2M9#OOfZA7jG{gbawowOBneLeEeQ zz`}87klTFWT;m*pSHv|=ofZJ73CeSGfE~h4L&%{c;h`{1pn_X8u{ItFJ;Q*YD%=+J zbwmv?ug#(xp9LW=w-CDr30lj2OPF6DvewYqe80ZOpe|4NcPd0z4<4(N zOibdrd5Ote-%E%3V_xce7Wf97F+$Xs#VCGWjhqViRTBQ~CS0`%EY!XC#9yBxm<#N>vio;=~qT!zo^_SMGNYWp?DgFWiz84xcFXd@Et&E$3TNsYh5=)C?~aay(QcH@KCuwQ2H zSir4cF@E1xUo-xiF&Di(OU#|MUVcb;zi&yj=h(9*Cu2Q}AP2*}~ Ne_#LNw~bd?{treYvSI)L diff --git a/Cargo.toml b/Cargo.toml index 83680a706..d28cd39be 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -19,19 +19,19 @@ edition = "2021" repository = "https://github.com/spacedriveapp/spacedrive" [workspace.dependencies] -prisma-client-rust = { git = "https://github.com/Brendonovich/prisma-client-rust", rev = "047c1102284a61ee400788baa7529d54ae745790", features = [ +prisma-client-rust = { git = "https://github.com/Brendonovich/prisma-client-rust", branch = "spacedrive", features = [ "rspc", "sqlite-create-many", "migrations", "sqlite", ], default-features = false } -prisma-client-rust-cli = { git = "https://github.com/Brendonovich/prisma-client-rust", rev = "047c1102284a61ee400788baa7529d54ae745790", features = [ +prisma-client-rust-cli = { git = "https://github.com/Brendonovich/prisma-client-rust", branch = "spacedrive", features = [ "rspc", "sqlite-create-many", "migrations", "sqlite", ], default-features = false } -prisma-client-rust-sdk = { git = "https://github.com/Brendonovich/prisma-client-rust", rev = "047c1102284a61ee400788baa7529d54ae745790", features = [ +prisma-client-rust-sdk = { git = "https://github.com/Brendonovich/prisma-client-rust", branch = "spacedrive", features = [ "sqlite", ], default-features = false } diff --git a/apps/cli/Cargo.toml b/apps/cli/Cargo.toml index b60a717b2..b435d17b1 100644 --- a/apps/cli/Cargo.toml +++ b/apps/cli/Cargo.toml @@ -1,5 +1,5 @@ [package] -name = "cli" +name = "sd-cli" version = "0.1.0" license = { workspace = true } repository = { workspace = true } diff --git a/apps/desktop/src-tauri/Cargo.toml b/apps/desktop/src-tauri/Cargo.toml index 5cbdf0e27..85841d870 100644 --- a/apps/desktop/src-tauri/Cargo.toml +++ b/apps/desktop/src-tauri/Cargo.toml @@ -1,9 +1,9 @@ [package] -name = "spacedrive" +name = "sd-desktop" version = "0.1.0" description = "The universal file manager." authors = ["Spacedrive Technology Inc."] -default-run = "spacedrive" +default-run = "sd-desktop" license = { workspace = true } repository = { workspace = true } edition = { workspace = true } diff --git a/apps/desktop/src/commands.ts b/apps/desktop/src/commands.ts index 58e13e0ee..e413d10f0 100644 --- a/apps/desktop/src/commands.ts +++ b/apps/desktop/src/commands.ts @@ -42,7 +42,7 @@ export function lockAppTheme(themeType: AppThemeType) { return invoke()("lock_app_theme", { themeType }) } -export type OpenWithApplication = { url: string; name: string } -export type OpenFilePathResult = { t: "NoLibrary" } | { t: "NoFile"; c: number } | { t: "OpenError"; c: [number, string] } | { t: "AllGood"; c: number } | { t: "Internal"; c: string } -export type AppThemeType = "Auto" | "Light" | "Dark" export type RevealItem = { Location: { id: number } } | { FilePath: { id: number } } +export type AppThemeType = "Auto" | "Light" | "Dark" +export type OpenFilePathResult = { t: "NoLibrary" } | { t: "NoFile"; c: number } | { t: "OpenError"; c: [number, string] } | { t: "AllGood"; c: number } | { t: "Internal"; c: string } +export type OpenWithApplication = { url: string; name: string } diff --git a/apps/server/Cargo.toml b/apps/server/Cargo.toml index b33dc1eb0..3ee4a663a 100644 --- a/apps/server/Cargo.toml +++ b/apps/server/Cargo.toml @@ -1,5 +1,5 @@ [package] -name = "server" +name = "sd-server" version = "0.1.0" license = { workspace = true } repository = { workspace = true } diff --git a/apps/server/package.json b/apps/server/package.json index f1254391f..b868df6fa 100644 --- a/apps/server/package.json +++ b/apps/server/package.json @@ -1,9 +1,9 @@ { - "name": "@sd/server", - "version": "0.0.0", - "main": "index.js", - "license": "GPL-3.0-only", - "scripts": { - "dev": "RUST_LOG=\"sd_core=info\" cargo watch -x 'run -p server'" - } + "name": "@sd/server", + "version": "0.0.0", + "main": "index.js", + "license": "GPL-3.0-only", + "scripts": { + "dev": "RUST_LOG=\"sd_core=info\" cargo watch -x 'run -p sd-server'" + } } diff --git a/core/crates/sync/Cargo.toml b/core/crates/sync/Cargo.toml index 83cc6fc92..0c4c506b7 100644 --- a/core/crates/sync/Cargo.toml +++ b/core/crates/sync/Cargo.toml @@ -4,7 +4,7 @@ version = "0.0.0" edition = "2021" [features] -default = ["emit-messages"] +default = [] emit-messages = [] [dependencies] @@ -19,3 +19,4 @@ serde_json = { workspace = true } tokio = { workspace = true } uuid = { workspace = true } uhlc = "0.5.2" +slotmap = "1.0.6" diff --git a/core/crates/sync/src/actor.rs b/core/crates/sync/src/actor.rs new file mode 100644 index 000000000..f59430502 --- /dev/null +++ b/core/crates/sync/src/actor.rs @@ -0,0 +1,61 @@ +use tokio::sync::mpsc; + +pub trait ActorTypes { + type Event; + type Request; + type Handler; +} + +pub struct ActorIO { + pub event_rx: mpsc::Receiver, + pub req_tx: mpsc::Sender, +} + +impl ActorIO { + pub async fn send(&self, value: T::Request) -> Result<(), mpsc::error::SendError> { + self.req_tx.send(value).await + } +} + +pub struct HandlerIO { + handler: T::Handler, + req_rx: mpsc::Receiver, +} + +pub type SplitHandlerIO = ( + ::Handler, + mpsc::Receiver<::Request>, +); + +impl HandlerIO { + pub fn split(self) -> SplitHandlerIO { + (self.handler, self.req_rx) + } +} + +pub fn create_actor_io( + make_handler: fn(mpsc::Sender) -> T::Handler, +) -> (ActorIO, HandlerIO) { + let (req_tx, req_rx) = mpsc::channel(20); + let (event_tx, event_rx) = mpsc::channel(20); + + ( + ActorIO { event_rx, req_tx }, + HandlerIO { + handler: make_handler(event_tx), + req_rx, + }, + ) +} + +#[macro_export] +macro_rules! wait { + ($rx:expr, $pattern:pat $(=> $expr:expr)?) => { + loop { + match $rx.recv().await { + Some($pattern) => break $($expr)?, + _ => continue + } + } + }; +} diff --git a/core/crates/sync/src/db_operation.rs b/core/crates/sync/src/db_operation.rs new file mode 100644 index 000000000..72dc4270a --- /dev/null +++ b/core/crates/sync/src/db_operation.rs @@ -0,0 +1,62 @@ +use sd_prisma::prisma::*; +use sd_sync::*; +use uhlc::NTP64; +use uuid::Uuid; + +shared_operation::include!(shared_include { + instance: select { pub_id } +}); +relation_operation::include!(relation_include { + instance: select { pub_id } +}); + +pub enum DbOperation { + Shared(shared_include::Data), + Relation(relation_include::Data), +} + +impl DbOperation { + pub fn timestamp(&self) -> NTP64 { + NTP64(match self { + Self::Shared(op) => op.timestamp, + Self::Relation(op) => op.timestamp, + } as u64) + } + + pub fn id(&self) -> Uuid { + Uuid::from_slice(match self { + Self::Shared(op) => &op.id, + Self::Relation(op) => &op.id, + }) + .unwrap() + } + + pub fn instance(&self) -> Uuid { + Uuid::from_slice(match self { + Self::Shared(op) => &op.instance.pub_id, + Self::Relation(op) => &op.instance.pub_id, + }) + .unwrap() + } + + pub fn into_operation(self) -> CRDTOperation { + CRDTOperation { + id: self.id(), + instance: self.instance(), + timestamp: self.timestamp(), + typ: match self { + Self::Shared(op) => CRDTOperationType::Shared(SharedOperation { + record_id: serde_json::from_slice(&op.record_id).unwrap(), + model: op.model, + data: serde_json::from_slice(&op.data).unwrap(), + }), + Self::Relation(op) => CRDTOperationType::Relation(RelationOperation { + relation: op.relation, + data: serde_json::from_slice(&op.data).unwrap(), + relation_item: serde_json::from_slice(&op.item_id).unwrap(), + relation_group: serde_json::from_slice(&op.group_id).unwrap(), + }), + }, + } + } +} diff --git a/core/crates/sync/src/ingest.rs b/core/crates/sync/src/ingest.rs index 98086a9da..4863c9fb7 100644 --- a/core/crates/sync/src/ingest.rs +++ b/core/crates/sync/src/ingest.rs @@ -1,34 +1,261 @@ -use sd_p2p::{spacetunnel::Tunnel, PeerId}; -use sd_sync::CRDTOperation; +use std::{ops::Deref, sync::Arc}; + +use sd_p2p::spacetunnel::Tunnel; +use sd_prisma::{prisma::*, prisma_sync::ModelSyncData}; +use sd_sync::*; +use sd_utils::uuid_to_bytes; +use serde_json::to_vec; use tokio::sync::mpsc; -use uhlc::NTP64; +use uhlc::{Timestamp, NTP64}; use uuid::Uuid; -use crate::Timestamps; - -pub struct Actor { - pub events: mpsc::Sender, -} +use crate::{actor::*, wait, SharedState}; #[must_use] +/// Stuff that can be handled outside the actor pub enum Request { Messages { tunnel: Tunnel, timestamps: Vec<(Uuid, NTP64)>, }, - Ingest(Vec), + Ingested, } +/// Stuff that the actor consumes #[derive(Debug)] pub enum Event { Notification(NotificationEvent), Messages(MessagesEvent), } +#[derive(Debug, Default)] +pub enum State { + #[default] + WaitingForNotification, + RetrievingMessages(Tunnel), + Ingesting(MessagesEvent), +} + +pub struct Actor { + state: Option, + shared: Arc, + io: ActorIO, +} + +impl Actor { + async fn tick(mut self) -> Option { + let state = match self.state.take()? { + State::WaitingForNotification => { + let notification = wait!(self.io.event_rx, Event::Notification(n) => n); + + State::RetrievingMessages(notification.tunnel) + } + State::RetrievingMessages(tunnel) => { + self.io + .send(Request::Messages { + tunnel, + timestamps: self + .timestamps + .read() + .await + .iter() + .map(|(&k, &v)| (k, v)) + .collect(), + }) + .await + .ok(); + + State::Ingesting(wait!(self.io.event_rx, Event::Messages(event) => event)) + } + State::Ingesting(event) => { + let count = event.messages.len(); + + dbg!(&event.messages); + + for op in event.messages { + let fut = self.receive_crdt_operation(op); + fut.await; + } + + println!("Ingested {count} messages!"); + + match event.has_more { + true => State::RetrievingMessages(event.tunnel), + false => State::WaitingForNotification, + } + } + }; + + Some(Self { + state: Some(state), + ..self + }) + } + + pub fn spawn(shared: Arc) -> SplitHandlerIO { + let (actor_io, handler_io) = create_actor_io::(|event_tx| Handler { event_tx }); + + tokio::spawn(async move { + let mut this = Self { + state: Some(Default::default()), + io: actor_io, + shared, + }; + + loop { + this = match this.tick().await { + Some(this) => this, + None => break, + }; + } + }); + + handler_io.split() + } + + async fn receive_crdt_operation(&mut self, op: CRDTOperation) { + self.clock + .update_with_timestamp(&Timestamp::new(op.timestamp, op.instance.into())) + .ok(); + + let mut timestamp = { + let mut clocks = self.timestamps.write().await; + clocks + .entry(op.instance) + .or_insert_with(|| op.timestamp) + .clone() + }; + + if timestamp < op.timestamp { + timestamp = op.timestamp; + } + + let op_instance = op.instance; + + let is_old = self.compare_message(&op).await; + + if !is_old { + self.apply_op(op).await.ok(); + } + + self.db + ._transaction() + .run({ + let timestamps = self.timestamps.clone(); + |db| async move { + match db + .instance() + .update( + instance::pub_id::equals(uuid_to_bytes(op_instance)), + vec![instance::timestamp::set(Some(timestamp.as_u64() as i64))], + ) + .exec() + .await + { + Ok(_) => { + timestamps.write().await.insert(op_instance, timestamp); + Ok(()) + } + Err(e) => Err(e), + } + } + }) + .await + .unwrap(); + } + + async fn apply_op(&mut self, op: CRDTOperation) -> prisma_client_rust::Result<()> { + ModelSyncData::from_op(op.typ.clone()) + .unwrap() + .exec(&self.db) + .await?; + + match &op.typ { + CRDTOperationType::Shared(shared_op) => { + shared_op_db(&op, shared_op) + .to_query(&self.db) + .exec() + .await?; + } + CRDTOperationType::Relation(relation_op) => { + relation_op_db(&op, relation_op) + .to_query(&self.db) + .exec() + .await?; + } + } + + self.io.req_tx.send(Request::Ingested).await.ok(); + + Ok(()) + } + + async fn compare_message(&mut self, op: &CRDTOperation) -> bool { + let old_timestamp = match &op.typ { + CRDTOperationType::Shared(shared_op) => { + let newer_op = self + .db + .shared_operation() + .find_first(vec![ + shared_operation::timestamp::gte(op.timestamp.as_u64() as i64), + shared_operation::model::equals(shared_op.model.to_string()), + shared_operation::record_id::equals( + serde_json::to_vec(&shared_op.record_id).unwrap(), + ), + shared_operation::kind::equals(shared_op.kind().to_string()), + ]) + .order_by(shared_operation::timestamp::order(SortOrder::Desc)) + .exec() + .await + .unwrap(); + + newer_op.map(|newer_op| newer_op.timestamp) + } + CRDTOperationType::Relation(relation_op) => { + let newer_op = self + .db + .relation_operation() + .find_first(vec![ + relation_operation::timestamp::gte(op.timestamp.as_u64() as i64), + relation_operation::relation::equals(relation_op.relation.to_string()), + relation_operation::item_id::equals( + serde_json::to_vec(&relation_op.relation_item).unwrap(), + ), + relation_operation::kind::equals(relation_op.kind().to_string()), + ]) + .order_by(relation_operation::timestamp::order(SortOrder::Desc)) + .exec() + .await + .unwrap(); + + newer_op.map(|newer_op| newer_op.timestamp) + } + }; + + old_timestamp + .map(|old| old != op.timestamp.as_u64() as i64) + .unwrap_or_default() + } +} + +impl Deref for Actor { + type Target = SharedState; + + fn deref(&self) -> &Self::Target { + &self.shared + } +} + +pub struct Handler { + pub event_tx: mpsc::Sender, +} + #[derive(Debug)] pub struct MessagesEvent { pub instance_id: Uuid, pub messages: Vec, + pub has_more: bool, + pub tunnel: Tunnel, } #[derive(Debug)] @@ -36,91 +263,39 @@ pub struct NotificationEvent { pub tunnel: Tunnel, } -#[derive(Debug)] -pub enum State { - WaitingForNotification, - ExecutingMessagesRequest, - Ingesting, +impl ActorTypes for Actor { + type Event = Event; + type Request = Request; + type Handler = Handler; } -#[macro_export] -macro_rules! wait { - ($rx:ident, $pattern:pat $(=> $expr:expr)?) => { - loop { - match $rx.recv().await { - Some($pattern) => break $($expr)?, - _ => continue - } - } - }; -} - -impl Actor { - pub fn spawn(timestamps: Timestamps) -> (Self, mpsc::Receiver) { - let (req_tx, req_rx) = mpsc::channel(4); - let (events_tx, mut events_rx) = mpsc::channel(4); - - tokio::spawn(async move { - let mut state = State::WaitingForNotification; - - loop { - dbg!(&state); - - state = match state { - State::WaitingForNotification => { - let notification = wait!(events_rx, Event::Notification(n) => n); - - // req_tx.send(Request::Messages(tunnel, peer_id, 69)); - - // let notification = wait!( - // events_rx, - // Incoming::Notification(notification) => notification - // ); - - req_tx - .send(Request::Messages { - tunnel: notification.tunnel, - timestamps: timestamps - .read() - .await - .iter() - .map(|(&k, &v)| (k, v)) - .collect(), - }) - .await - .ok(); - - State::ExecutingMessagesRequest - } - State::ExecutingMessagesRequest => { - let event = wait!(events_rx, Event::Messages(event) => event); - - req_tx - .send(Request::Ingest(event.messages.clone())) - .await - .ok(); - - dbg!(&event.messages); - - State::Ingesting - } - State::Ingesting => { - println!("Ingested!"); - - State::WaitingForNotification - } - }; - } - }); - - (Self { events: events_tx }, req_rx) +fn shared_op_db(op: &CRDTOperation, shared_op: &SharedOperation) -> shared_operation::Create { + shared_operation::Create { + id: op.id.as_bytes().to_vec(), + timestamp: op.timestamp.0 as i64, + instance: instance::pub_id::equals(op.instance.as_bytes().to_vec()), + kind: shared_op.kind().to_string(), + data: to_vec(&shared_op.data).unwrap(), + model: shared_op.model.to_string(), + record_id: to_vec(&shared_op.record_id).unwrap(), + _params: vec![], } +} - pub async fn notify(&self, tunnel: Tunnel, _peer_id: PeerId) { - self.events - .send(Event::Notification(NotificationEvent { tunnel })) - .await - .ok(); +fn relation_op_db( + op: &CRDTOperation, + relation_op: &RelationOperation, +) -> relation_operation::Create { + relation_operation::Create { + id: op.id.as_bytes().to_vec(), + timestamp: op.timestamp.0 as i64, + instance: instance::pub_id::equals(op.instance.as_bytes().to_vec()), + kind: relation_op.kind().to_string(), + data: to_vec(&relation_op.data).unwrap(), + relation: relation_op.relation.to_string(), + item_id: to_vec(&relation_op.relation_item).unwrap(), + group_id: to_vec(&relation_op.relation_group).unwrap(), + _params: vec![], } } diff --git a/core/crates/sync/src/lib.rs b/core/crates/sync/src/lib.rs index 3da4d12a2..cb29a617f 100644 --- a/core/crates/sync/src/lib.rs +++ b/core/crates/sync/src/lib.rs @@ -1,10 +1,12 @@ #![allow(clippy::unwrap_used, clippy::panic)] // TODO: Brendan remove this once you've got error handling here +mod actor; +mod db_operation; pub mod ingest; +mod manager; -use sd_prisma::{prisma::*, prisma_sync::ModelSyncData}; +use sd_prisma::prisma::*; use sd_sync::*; -use sd_utils::uuid_to_bytes; use std::{ collections::{BTreeMap, HashMap}, @@ -12,15 +14,8 @@ use std::{ sync::Arc, }; -use serde_json::to_vec; -use tokio::sync::{ - broadcast::{self}, - mpsc, RwLock, -}; -use uhlc::{HLCBuilder, Timestamp, HLC}; -use uuid::Uuid; - -pub use sd_prisma::prisma_sync; +pub use ingest::*; +pub use manager::*; pub use uhlc::NTP64; #[derive(Clone)] @@ -29,367 +24,29 @@ pub enum SyncMessage { Created, } -pub type Timestamps = Arc>>; +pub type Timestamps = Arc>>; -pub struct SyncManager { - db: Arc, - pub instance: Uuid, - // TODO: Remove `Mutex` and store this on `ingest` actor - timestamps: Timestamps, - clock: HLC, - pub tx: broadcast::Sender, - pub ingest: ingest::Actor, +pub struct SharedState { + pub db: Arc, + pub instance: uuid::Uuid, + pub timestamps: Timestamps, + pub clock: uhlc::HLC, } -impl fmt::Debug for SyncManager { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - f.debug_struct("SyncManager").finish() - } -} - -pub struct SyncManagerNew { - pub manager: SyncManager, - pub rx: broadcast::Receiver, - pub ingest_rx: mpsc::Receiver, -} - -impl SyncManager { - #[allow(clippy::new_ret_no_self)] - pub fn new(db: &Arc, instance: Uuid) -> SyncManagerNew { - let (tx, rx) = broadcast::channel(64); - - let timestamps: Timestamps = Default::default(); - - let (ingest, ingest_rx) = ingest::Actor::spawn(timestamps.clone()); - - SyncManagerNew { - manager: Self { - db: db.clone(), - instance, - clock: HLCBuilder::new().with_id(instance.into()).build(), - timestamps, - tx, - ingest, - }, - rx, - ingest_rx, - } - } - - pub async fn write_ops<'item, I: prisma_client_rust::BatchItem<'item>>( - &self, - tx: &PrismaClient, - (_ops, queries): (Vec, I), - ) -> prisma_client_rust::Result<::ReturnValue> { - #[cfg(feature = "emit-messages")] - let res = { - macro_rules! variant { - ($var:ident, $variant:ident, $fn:ident) => { - let $var = _ops - .iter() - .filter_map(|op| match &op.typ { - CRDTOperationType::$variant(inner) => { - Some($fn(&op, &inner).to_query(tx)) - } - _ => None, - }) - .collect::>(); - }; - } - - variant!(shared, Shared, shared_op_db); - variant!(relation, Relation, relation_op_db); - - let (res, _) = tx._batch((queries, (shared, relation))).await?; - - self.tx.send(SyncMessage::Created).ok(); - - res - }; - #[cfg(not(feature = "emit-messages"))] - let res = tx._batch([queries]).await?.remove(0); - - Ok(res) - } - - #[allow(unused_variables)] - pub async fn write_op<'item, Q: prisma_client_rust::BatchItem<'item>>( - &self, - tx: &PrismaClient, - op: CRDTOperation, - query: Q, - ) -> prisma_client_rust::Result<::ReturnValue> { - #[cfg(feature = "emit-messages")] - let ret = { - macro_rules! exec { - ($fn:ident, $inner:ident) => { - tx._batch(($fn(&op, $inner).to_query(tx), query)).await?.1 - }; - } - - let ret = match &op.typ { - CRDTOperationType::Shared(inner) => exec!(shared_op_db, inner), - CRDTOperationType::Relation(inner) => exec!(relation_op_db, inner), - }; - - self.tx.send(SyncMessage::Created).ok(); - - ret - }; - #[cfg(not(feature = "emit-messages"))] - let ret = tx._batch(vec![query]).await?.remove(0); - - Ok(ret) - } - - pub async fn get_ops( - &self, - args: GetOpsArgs, - ) -> prisma_client_rust::Result> { - let Self { db, .. } = self; - - shared_operation::include!(shared_include { - instance: select { pub_id } - }); - relation_operation::include!(relation_include { - instance: select { pub_id } - }); - - enum DbOperation { - Shared(shared_include::Data), - Relation(relation_include::Data), - } - - impl DbOperation { - fn timestamp(&self) -> NTP64 { - NTP64(match self { - Self::Shared(op) => op.timestamp, - Self::Relation(op) => op.timestamp, - } as u64) - } - - fn id(&self) -> Uuid { - Uuid::from_slice(match self { - Self::Shared(op) => &op.id, - Self::Relation(op) => &op.id, - }) - .unwrap() - } - - fn instance(&self) -> Uuid { - Uuid::from_slice(match self { - Self::Shared(op) => &op.instance.pub_id, - Self::Relation(op) => &op.instance.pub_id, - }) - .unwrap() - } - - fn into_operation(self) -> CRDTOperation { - CRDTOperation { - id: self.id(), - instance: self.instance(), - timestamp: self.timestamp(), - typ: match self { - Self::Shared(op) => CRDTOperationType::Shared(SharedOperation { - record_id: serde_json::from_slice(&op.record_id).unwrap(), - model: op.model, - data: serde_json::from_slice(&op.data).unwrap(), - }), - Self::Relation(op) => CRDTOperationType::Relation(RelationOperation { - relation: op.relation, - data: serde_json::from_slice(&op.data).unwrap(), - relation_item: serde_json::from_slice(&op.item_id).unwrap(), - relation_group: serde_json::from_slice(&op.group_id).unwrap(), - }), - }, - } - } - } - - macro_rules! db_args { - ($op:ident) => { - vec![prisma_client_rust::operator::or( - args.clocks - .iter() - .map(|(instance_id, timestamp)| { - prisma_client_rust::and![ - $op::instance::is(vec![instance::pub_id::equals(uuid_to_bytes( - *instance_id - ))]), - $op::timestamp::gte(timestamp.as_u64() as i64) - ] - }) - .collect(), - )] - }; - } - - let (shared, relation) = db - ._batch(( - db.shared_operation() - .find_many(db_args!(shared_operation)) - .take(args.count as i64) - .include(shared_include::include()), - db.relation_operation() - .find_many(db_args!(relation_operation)) - .take(args.count as i64) - .include(relation_include::include()), - )) - .await?; - - let mut ops = BTreeMap::new(); - - ops.extend( - shared - .into_iter() - .map(DbOperation::Shared) - .map(|op| (op.timestamp(), op)), - ); - ops.extend( - relation - .into_iter() - .map(DbOperation::Relation) - .map(|op| (op.timestamp(), op)), - ); - - Ok(ops - .into_values() - .rev() - .take(args.count as usize) - .map(DbOperation::into_operation) - .collect()) - } - - pub async fn apply_op(&self, op: CRDTOperation) -> prisma_client_rust::Result<()> { - ModelSyncData::from_op(op.typ.clone()) - .unwrap() - .exec(&self.db) - .await?; - - match &op.typ { - CRDTOperationType::Shared(shared_op) => { - shared_op_db(&op, shared_op) - .to_query(&self.db) - .exec() - .await?; - } - CRDTOperationType::Relation(relation_op) => { - relation_op_db(&op, relation_op) - .to_query(&self.db) - .exec() - .await?; - } - } - - self.tx.send(SyncMessage::Ingested).ok(); - - Ok(()) - } - - async fn compare_message(&self, op: &CRDTOperation) -> bool { - let old_timestamp = match &op.typ { - CRDTOperationType::Shared(shared_op) => { - let newer_op = self - .db - .shared_operation() - .find_first(vec![ - shared_operation::timestamp::gte(op.timestamp.as_u64() as i64), - shared_operation::model::equals(shared_op.model.to_string()), - shared_operation::record_id::equals( - serde_json::to_vec(&shared_op.record_id).unwrap(), - ), - shared_operation::kind::equals(shared_op.kind().to_string()), - ]) - .order_by(shared_operation::timestamp::order(SortOrder::Desc)) - .exec() - .await - .unwrap(); - - newer_op.map(|newer_op| newer_op.timestamp) - } - CRDTOperationType::Relation(relation_op) => { - let newer_op = self - .db - .relation_operation() - .find_first(vec![ - relation_operation::timestamp::gte(op.timestamp.as_u64() as i64), - relation_operation::relation::equals(relation_op.relation.to_string()), - relation_operation::item_id::equals( - serde_json::to_vec(&relation_op.relation_item).unwrap(), - ), - relation_operation::kind::equals(relation_op.kind().to_string()), - ]) - .order_by(relation_operation::timestamp::order(SortOrder::Desc)) - .exec() - .await - .unwrap(); - - newer_op.map(|newer_op| newer_op.timestamp) - } - }; - - old_timestamp - .map(|old| old != op.timestamp.as_u64() as i64) - .unwrap_or_default() - } - - pub async fn receive_crdt_operation(&self, op: CRDTOperation) { - self.clock - .update_with_timestamp(&Timestamp::new(op.timestamp, op.instance.into())) - .ok(); - - let mut clocks = self.timestamps.write().await; - let timestamp = clocks.entry(op.instance).or_insert_with(|| op.timestamp); - - if *timestamp < op.timestamp { - *timestamp = op.timestamp; - } - - let op_timestamp = op.timestamp; - let op_instance = op.instance; - - let is_old = self.compare_message(&op).await; - - if !is_old { - self.apply_op(op).await.ok(); - } - - self.db - .instance() - .update( - instance::pub_id::equals(uuid_to_bytes(op_instance)), - vec![instance::timestamp::set(Some(op_timestamp.as_u64() as i64))], - ) - .exec() - .await - .ok(); - } - - pub async fn register_instance(&self, instance_id: Uuid) { - self.timestamps.write().await.insert(instance_id, NTP64(0)); - } -} - -#[derive(serde::Serialize, serde::Deserialize, Debug, PartialEq, Eq)] -pub struct GetOpsArgs { - pub clocks: Vec<(Uuid, NTP64)>, - pub count: u32, -} - -fn shared_op_db(op: &CRDTOperation, shared_op: &SharedOperation) -> shared_operation::Create { +pub fn shared_op_db(op: &CRDTOperation, shared_op: &SharedOperation) -> shared_operation::Create { shared_operation::Create { id: op.id.as_bytes().to_vec(), timestamp: op.timestamp.0 as i64, instance: instance::pub_id::equals(op.instance.as_bytes().to_vec()), kind: shared_op.kind().to_string(), - data: to_vec(&shared_op.data).unwrap(), + data: serde_json::to_vec(&shared_op.data).unwrap(), model: shared_op.model.to_string(), - record_id: to_vec(&shared_op.record_id).unwrap(), + record_id: serde_json::to_vec(&shared_op.record_id).unwrap(), _params: vec![], } } -fn relation_op_db( +pub fn relation_op_db( op: &CRDTOperation, relation_op: &RelationOperation, ) -> relation_operation::Create { @@ -398,20 +55,10 @@ fn relation_op_db( timestamp: op.timestamp.0 as i64, instance: instance::pub_id::equals(op.instance.as_bytes().to_vec()), kind: relation_op.kind().to_string(), - data: to_vec(&relation_op.data).unwrap(), + data: serde_json::to_vec(&relation_op.data).unwrap(), relation: relation_op.relation.to_string(), - item_id: to_vec(&relation_op.relation_item).unwrap(), - group_id: to_vec(&relation_op.relation_group).unwrap(), + item_id: serde_json::to_vec(&relation_op.relation_item).unwrap(), + group_id: serde_json::to_vec(&relation_op.relation_group).unwrap(), _params: vec![], } } - -impl OperationFactory for SyncManager { - fn get_clock(&self) -> &HLC { - &self.clock - } - - fn get_instance(&self) -> Uuid { - self.instance - } -} diff --git a/core/crates/sync/src/manager.rs b/core/crates/sync/src/manager.rs new file mode 100644 index 000000000..3df8a726a --- /dev/null +++ b/core/crates/sync/src/manager.rs @@ -0,0 +1,206 @@ +use sd_prisma::prisma::*; +use sd_sync::*; +use sd_utils::uuid_to_bytes; + +use crate::{db_operation::*, *}; +use std::{cmp::Ordering, ops::Deref, sync::Arc}; +use tokio::sync::{broadcast, mpsc}; +use uhlc::{HLCBuilder, HLC}; +use uuid::Uuid; + +pub struct Manager { + pub tx: broadcast::Sender, + pub ingest: ingest::Handler, + shared: Arc, +} + +pub struct SyncManagerNew { + pub manager: Manager, + pub rx: broadcast::Receiver, + pub ingest_rx: mpsc::Receiver, +} + +#[derive(serde::Serialize, serde::Deserialize, Debug, PartialEq, Eq)] +pub struct GetOpsArgs { + pub clocks: Vec<(Uuid, NTP64)>, + pub count: u32, +} + +impl Manager { + pub fn new(db: &Arc, instance: Uuid) -> SyncManagerNew { + let (tx, rx) = broadcast::channel(64); + + let timestamps: Timestamps = Default::default(); + let clock = HLCBuilder::new().with_id(instance.into()).build(); + + let shared = Arc::new(SharedState { + db: db.clone(), + instance, + timestamps, + clock, + }); + + let (ingest, ingest_rx) = ingest::Actor::spawn(shared.clone()); + + SyncManagerNew { + manager: Self { shared, tx, ingest }, + rx, + ingest_rx, + } + } + + pub async fn write_ops<'item, I: prisma_client_rust::BatchItem<'item>>( + &self, + tx: &PrismaClient, + (_ops, queries): (Vec, I), + ) -> prisma_client_rust::Result<::ReturnValue> { + #[cfg(feature = "emit-messages")] + let res = { + macro_rules! variant { + ($var:ident, $variant:ident, $fn:ident) => { + let $var = _ops + .iter() + .filter_map(|op| match &op.typ { + CRDTOperationType::$variant(inner) => { + Some($fn(&op, &inner).to_query(tx)) + } + _ => None, + }) + .collect::>(); + }; + } + + variant!(shared, Shared, shared_op_db); + variant!(relation, Relation, relation_op_db); + + let (res, _) = tx._batch((queries, (shared, relation))).await?; + + self.tx.send(SyncMessage::Created).ok(); + + res + }; + #[cfg(not(feature = "emit-messages"))] + let res = tx._batch([queries]).await?.remove(0); + + Ok(res) + } + + #[allow(unused_variables)] + pub async fn write_op<'item, Q: prisma_client_rust::BatchItem<'item>>( + &self, + tx: &PrismaClient, + op: CRDTOperation, + query: Q, + ) -> prisma_client_rust::Result<::ReturnValue> { + #[cfg(feature = "emit-messages")] + let ret = { + macro_rules! exec { + ($fn:ident, $inner:ident) => { + tx._batch(($fn(&op, $inner).to_query(tx), query)).await?.1 + }; + } + + let ret = match &op.typ { + CRDTOperationType::Shared(inner) => exec!(shared_op_db, inner), + CRDTOperationType::Relation(inner) => exec!(relation_op_db, inner), + }; + + self.tx.send(SyncMessage::Created).ok(); + + ret + }; + #[cfg(not(feature = "emit-messages"))] + let ret = tx._batch(vec![query]).await?.remove(0); + + Ok(ret) + } + + pub async fn get_ops( + &self, + args: GetOpsArgs, + ) -> prisma_client_rust::Result> { + let db = &self.db; + + macro_rules! db_args { + ($args:ident, $op:ident) => { + vec![prisma_client_rust::operator::or( + $args + .clocks + .iter() + .map(|(instance_id, timestamp)| { + prisma_client_rust::and![ + $op::instance::is(vec![instance::pub_id::equals(uuid_to_bytes( + *instance_id + ))]), + $op::timestamp::gt(timestamp.as_u64() as i64) + ] + }) + .chain([ + $op::instance::is_not(vec![ + instance::pub_id::in_vec( + $args + .clocks + .iter() + .map(|(instance_id, _)| { + uuid_to_bytes(*instance_id) + }) + .collect() + ) + ]) + ]) + .collect(), + )] + }; + } + + let (shared, relation) = db + ._batch(( + db.shared_operation() + .find_many(db_args!(args, shared_operation)) + .take(args.count as i64) + .order_by(shared_operation::timestamp::order(SortOrder::Asc)) + .include(shared_include::include()), + db.relation_operation() + .find_many(db_args!(args, relation_operation)) + .take(args.count as i64) + .order_by(relation_operation::timestamp::order(SortOrder::Asc)) + .include(relation_include::include()), + )) + .await?; + + let mut ops: Vec<_> = [] + .into_iter() + .chain(shared.into_iter().map(DbOperation::Shared)) + .chain(relation.into_iter().map(DbOperation::Relation)) + .collect(); + + ops.sort_by(|a, b| match a.timestamp().cmp(&b.timestamp()) { + Ordering::Equal => a.instance().cmp(&b.instance()), + o => o, + }); + + Ok(ops + .into_iter() + .take(args.count as usize) + .map(DbOperation::into_operation) + .collect()) + } +} + +impl OperationFactory for Manager { + fn get_clock(&self) -> &HLC { + &self.clock + } + + fn get_instance(&self) -> Uuid { + self.instance + } +} + +impl Deref for Manager { + type Target = SharedState; + + fn deref(&self) -> &Self::Target { + &self.shared + } +} diff --git a/core/crates/sync/tests/lib.rs b/core/crates/sync/tests/lib.rs index d25b8c812..194eb3572 100644 --- a/core/crates/sync/tests/lib.rs +++ b/core/crates/sync/tests/lib.rs @@ -18,7 +18,7 @@ struct Instance { id: Uuid, _peer_id: sd_p2p::PeerId, db: Arc, - sync: Arc, + sync: Arc, } impl Instance { @@ -54,7 +54,7 @@ impl Instance { .await .unwrap(); - let sync = sd_core_sync::SyncManager::new(&db, id); + let sync = sd_core_sync::Manager::new(&db, id); ( Arc::new(Self { @@ -105,9 +105,6 @@ impl Instance { .exec() .await .unwrap(); - - left.sync.register_instance(right.id).await; - right.sync.register_instance(left.id).await; } } @@ -125,15 +122,9 @@ async fn bruh() -> Result<(), Box> { async move { while let Ok(msg) = sync_rx1.recv().await { match msg { - SyncMessage::Created => instance2 - .sync - .ingest - .events - .send(ingest::Event::Notification(ingest::NotificationEvent { - tunnel: todo!(), - })) - .await - .unwrap(), + SyncMessage::Created => { + instance2.sync.ingest.event_tx.send(todo!()).await.unwrap() + } _ => {} } } @@ -160,9 +151,11 @@ async fn bruh() -> Result<(), Box> { instance2 .sync .ingest - .events + .event_tx .send(ingest::Event::Messages(ingest::MessagesEvent { + tunnel: todo!(), messages, + has_more: false, instance_id: instance1.id, })) .await diff --git a/core/prisma/schema.prisma b/core/prisma/schema.prisma index 0df6ccbca..a4f7c0a23 100644 --- a/core/prisma/schema.prisma +++ b/core/prisma/schema.prisma @@ -6,7 +6,7 @@ datasource db { generator client { provider = "cargo prisma" output = "../../crates/prisma/src/prisma" - module_path = "sd_prisma::prisma" + module_path = "prisma" client_format = "folder" } @@ -91,6 +91,7 @@ model Instance { SharedOperation SharedOperation[] RelationOperation RelationOperation[] + locations Location[] @@map("instance") } @@ -141,7 +142,7 @@ model Location { date_created DateTime? instance_id Int? - // instance Instance? @relation(fields: [instance_id], references: [id]) // TODO: Enabling this breaks migration 7's `update_many` in `library/config.rs` + instance Instance? @relation(fields: [instance_id], references: [id], onDelete: SetNull) file_paths FilePath[] indexer_rules IndexerRulesInLocation[] diff --git a/core/src/lib.rs b/core/src/lib.rs index b6f56c6a8..2398ebe17 100644 --- a/core/src/lib.rs +++ b/core/src/lib.rs @@ -46,6 +46,8 @@ pub(crate) mod preferences; pub mod util; pub(crate) mod volume; +pub(crate) use sd_core_sync as sync; + /// Holds references to all the services that make up the Spacedrive core. /// This can easily be passed around as a context to the rest of the core. pub struct NodeServices { diff --git a/core/src/library/library.rs b/core/src/library/library.rs index f6aa9d1c6..bf85b91a6 100644 --- a/core/src/library/library.rs +++ b/core/src/library/library.rs @@ -6,6 +6,7 @@ use crate::{ location::file_path_helper::{file_path_to_full_path, IsolatedFilePathData}, object::{orphan_remover::OrphanRemoverActor, preview::get_thumbnail_path}, prisma::{file_path, location, PrismaClient}, + sync, util::{db::maybe_missing, error::FileIOError}, Node, NotificationManager, }; @@ -18,7 +19,7 @@ use std::{ }; use chrono::{DateTime, Utc}; -use sd_core_sync::{SyncManager, SyncMessage}; +use sd_core_sync::SyncMessage; use sd_p2p::spacetunnel::Identity; use sd_prisma::prisma::notification; use tokio::{fs, io, sync::broadcast}; @@ -42,7 +43,7 @@ pub struct LoadedLibrary { pub config: LibraryConfig, /// db holds the database client for the current library. pub db: Arc, - pub sync: Arc, + pub sync: Arc, /// key manager that provides encryption keys to functions that require them // pub key_manager: Arc, /// p2p identity @@ -78,7 +79,7 @@ impl LoadedLibrary { manager: Arc, node: &Arc, ) -> Arc { - let mut sync = SyncManager::new(&db, instance_id); + let mut sync = sync::Manager::new(&db, instance_id); let library = Arc::new(Self { id, @@ -106,24 +107,31 @@ impl LoadedLibrary { loop { tokio::select! { req = sync.ingest_rx.recv() => { - use sd_core_sync::ingest::Request; + use sd_core_sync::ingest; let Some(req) = req else { continue; }; + const OPS_PER_REQUEST: u32 = 100; + match req { - Request::Messages { tunnel, timestamps } => { - node.nlm.request_and_ingest_ops( - tunnel, - sd_core_sync::GetOpsArgs { clocks: timestamps, count: 100 }, - &library.sync, - library.id + ingest::Request::Messages { mut tunnel, timestamps } => { + let ops = library.node().nlm.request_ops( + &mut tunnel, + sd_core_sync::GetOpsArgs { clocks: timestamps, count: OPS_PER_REQUEST }, ).await; + + library.sync.ingest + .event_tx + .send(ingest::Event::Messages(ingest::MessagesEvent { + tunnel, + instance_id: library.sync.instance, + has_more: ops.len() == OPS_PER_REQUEST as usize, + messages: ops, + })) + .await + .expect("TODO: Handle ingest channel closed, so we don't loose ops"); }, - Request::Ingest(ops) => { - for op in ops.into_iter() { - library.sync.receive_crdt_operation(op).await; - } - } + _ => {} } }, msg = sync.rx.recv() => { diff --git a/core/src/location/indexer/mod.rs b/core/src/location/indexer/mod.rs index 851869dca..c7161df58 100644 --- a/core/src/location/indexer/mod.rs +++ b/core/src/location/indexer/mod.rs @@ -110,7 +110,7 @@ async fn execute_indexer_save_step( ( location::NAME, json!(prisma_sync::location::SyncId { - pub_id: pub_id.clone() + pub_id: location.pub_id.clone() }), ), location_id::set(Some(location.id)), diff --git a/core/src/location/mod.rs b/core/src/location/mod.rs index c202cb3d8..8eb4def27 100644 --- a/core/src/location/mod.rs +++ b/core/src/location/mod.rs @@ -24,6 +24,7 @@ use normpath::PathExt; use prisma_client_rust::{operator::and, or, QueryError}; use sd_prisma::prisma_sync; use sd_sync::*; +use sd_utils::uuid_to_bytes; use serde::Deserialize; use serde_json::json; use specta::Type; @@ -600,10 +601,9 @@ async fn create_location( (location::path::NAME, json!(&location_path)), (location::date_created::NAME, json!(date_created)), ( - location::instance_id::NAME, + location::instance::NAME, json!(prisma_sync::instance::SyncId { - pub_id: vec![], - // id: library.config.instance_id, + pub_id: uuid_to_bytes(library.sync.instance) }), ), ], @@ -741,7 +741,7 @@ impl From for location::Data { date_created: data.date_created, file_paths: None, indexer_rules: None, - // instance: None, + instance: None, } } } @@ -763,7 +763,7 @@ impl From<&location_with_indexer_rules::Data> for location::Data { date_created: data.date_created, file_paths: None, indexer_rules: None, - // instance: None, + instance: None, } } } diff --git a/core/src/object/file_identifier/mod.rs b/core/src/object/file_identifier/mod.rs index 301a89e0e..d21000e8d 100644 --- a/core/src/object/file_identifier/mod.rs +++ b/core/src/object/file_identifier/mod.rs @@ -9,7 +9,6 @@ use crate::{ util::{db::maybe_missing, error::FileIOError}, }; -use sd_core_sync::SyncManager; use sd_file_ext::{extensions::Extension, kind::ObjectKind}; use sd_prisma::prisma_sync; use sd_sync::{CRDTOperation, OperationFactory}; @@ -308,7 +307,7 @@ async fn identifier_job_step( fn file_path_object_connect_ops<'db>( file_path_id: Uuid, object_id: Uuid, - sync: &SyncManager, + sync: &crate::sync::Manager, db: &'db PrismaClient, ) -> (CRDTOperation, file_path::UpdateQuery<'db>) { #[cfg(debug_assertions)] diff --git a/core/src/p2p/p2p_manager.rs b/core/src/p2p/p2p_manager.rs index 111ce82fa..e0a617ba7 100644 --- a/core/src/p2p/p2p_manager.rs +++ b/core/src/p2p/p2p_manager.rs @@ -259,6 +259,7 @@ impl P2PManager { } Header::Sync(library_id) => { // Header -> Tunnel -> SyncMessage + use sd_core_sync::ingest; let mut tunnel = Tunnel::responder(stream).await.unwrap(); @@ -276,16 +277,16 @@ impl P2PManager { SyncMessage::NewOperations => { // The ends up in `NetworkedLibraryManager::request_and_ingest_ops`. // TODO: Throw tunnel around like this makes it soooo confusing. - ingest.notify(tunnel, event.peer_id).await; + ingest + .event_tx + .send(ingest::Event::Notification( + ingest::NotificationEvent { tunnel }, + )) + .await + .ok(); } SyncMessage::OperationsRequest(_) => { - nlm.exchange_sync_ops( - tunnel, - &event.peer_id, - library_id, - &library.sync, - ) - .await; + todo!("this should be received somewhere else!"); } SyncMessage::OperationsRequestResponse(_) => { todo!("unreachable but add proper error handling") diff --git a/core/src/p2p/pairing/initial_sync.rs b/core/src/p2p/pairing/initial_sync.rs deleted file mode 100644 index 7f87d0c20..000000000 --- a/core/src/p2p/pairing/initial_sync.rs +++ /dev/null @@ -1,376 +0,0 @@ -use sd_prisma::prisma::*; - -// TODO: Turn this entire file into a Prisma generator cause it could be way more maintainable - -// Pairing will fail if the two clients aren't on versions with identical DB models so it's safe to send them and ignore migrations. - -const ITEMS_PER_BATCH: i64 = 1000; - -macro_rules! impl_for_models { - ($($variant:ident($model:ident)),* $(,)+) => { - /// Represents any DB model to be ingested into the database as part of the initial sync - #[derive(Debug, serde::Serialize, serde::Deserialize)] - pub enum ModelData { - $( - $variant(Vec<$model::Data>), - )* - } - - impl ModelData { - /// Length of data - pub fn len(&self) -> usize { - match self { - $( - Self::$variant(data) => data.len(), - )* - } - } - - /// Get count of all of the rows in the database - pub async fn total_count(db: &PrismaClient) -> Result { - let mut total_count = 0; - - let ($( $model ),*) = tokio::join!( - $( - db.$model().count(vec![]).exec(), - )* - ); - - $(total_count += $model?;)* - Ok(total_count) - } - - /// Insert the data into the database - pub async fn insert(self, db: &PrismaClient) -> Result<(), prisma_client_rust::QueryError> { - match self { - $( - Self::$variant(data) => { - // TODO: Prisma Client Rust is broke - // db.$model().create_many(data.into_iter().map(|v| FromData(v).into()).collect()).exec().await?; - - for i in data { - $model::CreateUnchecked::from(FromData(i)).to_query(db).exec().await?; - } - } - )* - } - - Ok(()) - } - - pub fn model_name(&self) -> &'static str { - match self { - $( - Self::$variant(_) => stringify!($model), - )* - } - } - } - - /// This exists to determine the next model to sync. - /// It emulates `.window()` functionality but for a `macro_rules` - // TODO: When replacing with a generator this can be removed and done at compile time - #[derive(Debug)] - enum ModelSyncCursorIterator { - Done = 0, - $( - $variant, - )* - } - - impl<'a> From<&'a ModelSyncCursor> for ModelSyncCursorIterator { - fn from(cursor: &'a ModelSyncCursor) -> Self { - match cursor { - $( - ModelSyncCursor::$variant(_) => Self::$variant, - )* - ModelSyncCursor::Done => Self::Done, - } - } - } - - impl ModelSyncCursorIterator { - pub fn next(self) -> ModelSyncCursor { - let i = self as i32; - match i + 1 { - $( - v if v == Self::$variant as i32 => ModelSyncCursor::$variant(0), - )* - _ => ModelSyncCursor::Done, - } - } - } - - /// Represent where we ar eup to with the sync - #[derive(Debug, serde::Serialize, serde::Deserialize)] - pub enum ModelSyncCursor { - $( - $variant(i64), - )* - Done, - } - - impl ModelSyncCursor { - pub fn new() -> Self { - new_impl!($( $variant ),*) - } - - pub async fn next(&mut self, db: &PrismaClient) -> Option> { - match self { - $( - Self::$variant(cursor) => { - match db.$model() - .find_many(vec![]) - .skip(*cursor) - .take(ITEMS_PER_BATCH + 1) - .exec() - .await { - Ok(data) => { - if data.len() <= ITEMS_PER_BATCH as usize { - *self = ModelSyncCursorIterator::from(&*self).next(); - } else { - *self = Self::$variant(*cursor + ITEMS_PER_BATCH); - } - - Some(Ok(ModelData::$variant(data))) - }, - Err(e) => return Some(Err(e)), - } - }, - )* - Self::Done => None - } - } - } - }; -} - -macro_rules! new_impl { - ($x:ident, $($y:ident),+) => { - Self::$x(0) - }; -} - -impl PartialEq for ModelData { - // Crude EQ impl based only on ID's not struct content. - // It's super annoying PCR does have this impl but it kinda makes sense with relation fetching. - fn eq(&self, other: &Self) -> bool { - match (self, other) { - (ModelData::SharedOperation(a), ModelData::SharedOperation(b)) => a - .iter() - .map(|x| x.id.clone()) - .eq(b.iter().map(|x| x.id.clone())), - (ModelData::Volume(a), ModelData::Volume(b)) => { - a.iter().map(|x| x.id).eq(b.iter().map(|x| x.id)) - } - (ModelData::Location(a), ModelData::Location(b)) => { - a.iter().map(|x| x.id).eq(b.iter().map(|x| x.id)) - } - (ModelData::FilePath(a), ModelData::FilePath(b)) => { - a.iter().map(|x| x.id).eq(b.iter().map(|x| x.id)) - } - (ModelData::Object(a), ModelData::Object(b)) => { - a.iter().map(|x| x.id).eq(b.iter().map(|x| x.id)) - } - (ModelData::Tag(a), ModelData::Tag(b)) => { - a.iter().map(|x| x.id).eq(b.iter().map(|x| x.id)) - } - (ModelData::TagOnObject(a), ModelData::TagOnObject(b)) => a - .iter() - .map(|x| (x.tag_id, x.object_id)) - .eq(b.iter().map(|x| (x.tag_id, x.object_id))), - (ModelData::IndexerRule(a), ModelData::IndexerRule(b)) => { - a.iter().map(|x| x.id).eq(b.iter().map(|x| x.id)) - } - (ModelData::IndexerRulesInLocation(a), ModelData::IndexerRulesInLocation(b)) => a - .iter() - .map(|x| (x.location_id, x.indexer_rule_id)) - .eq(b.iter().map(|x| (x.location_id, x.indexer_rule_id))), - (ModelData::Preference(a), ModelData::Preference(b)) => a - .iter() - .map(|x| (x.key.clone(), x.value.clone())) - .eq(b.iter().map(|x| (x.key.clone(), x.value.clone()))), - _ => false, - } - } -} - -/// Meaningless wrapper to avoid Rust's orphan rule -struct FromData(T); - -impl From> for shared_operation::CreateUnchecked { - fn from(FromData(data): FromData) -> Self { - Self { - id: data.id, - timestamp: data.timestamp, - model: data.model, - record_id: data.record_id, - kind: data.kind, - data: data.data, - instance_id: data.instance_id, - _params: vec![], - } - } -} - -impl From> for volume::CreateUnchecked { - fn from(FromData(data): FromData) -> Self { - Self { - name: data.name, - mount_point: data.mount_point, - _params: vec![ - volume::id::set(data.id), - volume::total_bytes_capacity::set(data.total_bytes_capacity), - volume::total_bytes_available::set(data.total_bytes_available), - volume::disk_type::set(data.disk_type), - volume::filesystem::set(data.filesystem), - volume::is_system::set(data.is_system), - volume::date_modified::set(data.date_modified), - ], - } - } -} - -impl From> for location::CreateUnchecked { - fn from(FromData(data): FromData) -> Self { - Self { - pub_id: data.pub_id, - _params: vec![ - location::id::set(data.id), - location::name::set(data.name), - location::path::set(data.path), - location::total_capacity::set(data.total_capacity), - location::available_capacity::set(data.available_capacity), - location::is_archived::set(data.is_archived), - location::generate_preview_media::set(data.generate_preview_media), - location::sync_preview_media::set(data.sync_preview_media), - location::hidden::set(data.hidden), - location::date_created::set(data.date_created), - location::instance_id::set(data.instance_id), - ], - } - } -} - -impl From> for file_path::CreateUnchecked { - fn from(FromData(data): FromData) -> Self { - Self { - pub_id: data.pub_id, - _params: vec![ - file_path::id::set(data.id), - file_path::is_dir::set(data.is_dir), - file_path::cas_id::set(data.cas_id), - file_path::integrity_checksum::set(data.integrity_checksum), - file_path::location_id::set(data.location_id), - file_path::materialized_path::set(data.materialized_path), - file_path::name::set(data.name), - file_path::extension::set(data.extension), - file_path::size_in_bytes::set(data.size_in_bytes), - file_path::size_in_bytes_bytes::set(data.size_in_bytes_bytes), - file_path::inode::set(data.inode), - file_path::device::set(data.device), - file_path::object_id::set(data.object_id), - file_path::key_id::set(data.key_id), - file_path::date_created::set(data.date_created), - file_path::date_modified::set(data.date_modified), - file_path::date_indexed::set(data.date_indexed), - ], - } - } -} - -impl From> for object::CreateUnchecked { - fn from(FromData(data): FromData) -> Self { - Self { - pub_id: data.pub_id, - _params: vec![ - object::id::set(data.id), - object::kind::set(data.kind), - object::key_id::set(data.key_id), - object::hidden::set(data.hidden), - object::favorite::set(data.favorite), - object::important::set(data.important), - object::note::set(data.note), - object::date_created::set(data.date_created), - object::date_accessed::set(data.date_accessed), - ], - } - } -} - -impl From> for tag::CreateUnchecked { - fn from(FromData(data): FromData) -> Self { - Self { - pub_id: data.pub_id, - _params: vec![ - tag::id::set(data.id), - tag::name::set(data.name), - tag::color::set(data.color), - tag::redundancy_goal::set(data.redundancy_goal), - tag::date_created::set(data.date_created), - tag::date_modified::set(data.date_modified), - ], - } - } -} - -impl From> for tag_on_object::CreateUnchecked { - fn from(FromData(data): FromData) -> Self { - Self { - tag_id: data.tag_id, - object_id: data.object_id, - _params: vec![], - } - } -} - -impl From> for indexer_rule::CreateUnchecked { - fn from(FromData(data): FromData) -> Self { - Self { - pub_id: data.pub_id, - _params: vec![ - indexer_rule::id::set(data.id), - indexer_rule::name::set(data.name), - indexer_rule::default::set(data.default), - indexer_rule::rules_per_kind::set(data.rules_per_kind), - indexer_rule::date_created::set(data.date_created), - indexer_rule::date_modified::set(data.date_modified), - ], - } - } -} - -impl From> - for indexer_rules_in_location::CreateUnchecked -{ - fn from(FromData(data): FromData) -> Self { - Self { - location_id: data.location_id, - indexer_rule_id: data.indexer_rule_id, - _params: vec![], - } - } -} - -impl From> for preference::CreateUnchecked { - fn from(FromData(data): FromData) -> Self { - Self { - key: data.key, - _params: vec![preference::value::set(data.value)], - } - } -} - -// Ensure you order the models to Foreign Keys are created before the models that reference them. -impl_for_models! { - Object(object), - SharedOperation(shared_operation), - Volume(volume), - Location(location), - FilePath(file_path), - Tag(tag), - TagOnObject(tag_on_object), - IndexerRule(indexer_rule), - IndexerRulesInLocation(indexer_rules_in_location), - Preference(preference), -} diff --git a/core/src/p2p/pairing/mod.rs b/core/src/p2p/pairing/mod.rs index dc66c0439..62f0e68cd 100644 --- a/core/src/p2p/pairing/mod.rs +++ b/core/src/p2p/pairing/mod.rs @@ -22,10 +22,8 @@ use tokio::{ use tracing::{error, info}; use uuid::Uuid; -mod initial_sync; mod proto; -pub use initial_sync::*; use proto::*; use crate::{ @@ -79,7 +77,157 @@ impl PairingManager { node_config: NodeConfig, library_manager: Arc, ) -> u16 { - todo!(); + // TODO: Timeout for max number of pairings in a time period + + let pairing_id = self.id.fetch_add(1, Ordering::SeqCst); + self.emit_progress(pairing_id, PairingStatus::EstablishingConnection); + + info!("Beginning pairing '{pairing_id}' as originator to remote peer '{peer_id}'"); + + tokio::spawn(async move { + let mut stream = self.manager.stream(peer_id).await.unwrap(); + stream.write_all(&Header::Pair.to_bytes()).await.unwrap(); + + // TODO: Ensure both clients are on a compatible version cause Prisma model changes will cause issues + + // 1. Create new instance for originator and send it to the responder + self.emit_progress(pairing_id, PairingStatus::PairingRequested); + let now = Utc::now(); + let identity = Identity::new(); + let self_instance_id = Uuid::new_v4(); + let req = PairingRequest(Instance { + id: self_instance_id, + identity: identity.to_remote_identity(), + node_id: node_config.id, + node_name: node_config.name.clone(), + node_platform: Platform::current(), + last_seen: now, + date_created: now, + }); + stream.write_all(&req.to_bytes()).await.unwrap(); + + // 2. + match PairingResponse::from_stream(&mut stream).await.unwrap() { + PairingResponse::Accepted { + library_id, + library_name, + library_description, + instances, + } => { + info!("Pairing '{pairing_id}' accepted by remote into library '{library_id}'"); + // TODO: Log all instances and library info + self.emit_progress( + pairing_id, + PairingStatus::PairingInProgress { + library_name: library_name.clone(), + library_description: library_description.clone(), + }, + ); + + // TODO: Future - Library in pairing state + // TODO: Create library + + if library_manager + .get_all_libraries() + .await + .into_iter() + .find(|i| i.id == library_id) + .is_some() + { + self.emit_progress(pairing_id, PairingStatus::LibraryAlreadyExists); + + // TODO: Properly handle this at a protocol level so the error is on both sides + + return; + } + + let (this, instances): (Vec<_>, Vec<_>) = instances + .into_iter() + .partition(|i| i.id == self_instance_id); + + if this.len() != 1 { + todo!("error handling"); + } + let this = this.first().expect("unreachable"); + if this.identity != identity.to_remote_identity() { + todo!("error handling. Something went really wrong!"); + } + + let library = library_manager + .create_with_uuid( + library_id, + LibraryName::new(library_name).unwrap(), + library_description, + node_config, + false, // We will sync everything which will conflict with the seeded stuff + Some(instance::Create { + pub_id: this.id.as_bytes().to_vec(), + identity: IdentityOrRemoteIdentity::Identity(identity).to_bytes(), + node_id: this.node_id.as_bytes().to_vec(), + node_name: this.node_name.clone(), // TODO: Remove `clone` + node_platform: this.node_platform as i32, + last_seen: this.last_seen.into(), + date_created: this.date_created.into(), + // timestamp: Default::default(), // TODO: Source this properly! + _params: vec![], + }), + ) + .await + .unwrap(); + + let library = library_manager.get_library(&library.id).await.unwrap(); + + library + .db + .instance() + .create_many( + instances + .into_iter() + .map(|i| { + instance::CreateUnchecked { + pub_id: i.id.as_bytes().to_vec(), + identity: IdentityOrRemoteIdentity::RemoteIdentity( + i.identity, + ) + .to_bytes(), + node_id: i.node_id.as_bytes().to_vec(), + node_name: i.node_name, + node_platform: i.node_platform as i32, + last_seen: i.last_seen.into(), + date_created: i.date_created.into(), + // timestamp: Default::default(), // TODO: Source this properly! + _params: vec![], + } + }) + .collect(), + ) + .exec() + .await + .unwrap(); + + // Called again so the new instances are picked up + library_manager.node.nlm.load_library(&library).await; + + P2PManager::resync( + library_manager.node.nlm.clone(), + &mut stream, + peer_id, + self.metadata_manager.get().instances, + ) + .await; + + // TODO: Done message to frontend + self.emit_progress(pairing_id, PairingStatus::PairingComplete(library_id)); + stream.flush().await.unwrap(); + } + PairingResponse::Rejected => { + info!("Pairing '{pairing_id}' rejected by remote"); + self.emit_progress(pairing_id, PairingStatus::PairingRejected); + } + } + }); + + pairing_id } pub async fn responder( @@ -88,7 +236,116 @@ impl PairingManager { mut stream: impl AsyncRead + AsyncWrite + Unpin, library_manager: &LibraryManager, ) { - todo!(); + let pairing_id = self.id.fetch_add(1, Ordering::SeqCst); + self.emit_progress(pairing_id, PairingStatus::EstablishingConnection); + + info!("Beginning pairing '{pairing_id}' as responder to remote peer '{peer_id}'"); + + let remote_instance = PairingRequest::from_stream(&mut stream).await.unwrap().0; + self.emit_progress(pairing_id, PairingStatus::PairingDecisionRequest); + self.events_tx + .send(P2PEvent::PairingRequest { + id: pairing_id, + name: remote_instance.node_name.clone(), + os: remote_instance.node_platform.clone().into(), + }) + .ok(); + + // Prompt the user and wait + // TODO: After 1 minute remove channel from map and assume it was rejected + let (tx, rx) = oneshot::channel(); + self.pairing_response + .write() + .unwrap() + .insert(pairing_id, tx); + let PairingDecision::Accept(library_id) = rx.await.unwrap() else { + info!("The user rejected pairing '{pairing_id}'!"); + // self.emit_progress(pairing_id, PairingStatus::PairingRejected); // TODO: Event to remove from frontend index + stream.write_all(&PairingResponse::Rejected.to_bytes()).await.unwrap(); + return; + }; + info!("The user accepted pairing '{pairing_id}' for library '{library_id}'!"); + + let library = library_manager.get_library(&library_id).await.unwrap(); + + // TODO: Rollback this on pairing failure + instance::Create { + pub_id: remote_instance.id.as_bytes().to_vec(), + identity: IdentityOrRemoteIdentity::RemoteIdentity(remote_instance.identity.clone()) + .to_bytes(), + node_id: remote_instance.node_id.as_bytes().to_vec(), + node_name: remote_instance.node_name, + node_platform: remote_instance.node_platform as i32, + last_seen: remote_instance.last_seen.into(), + date_created: remote_instance.date_created.into(), + // timestamp: Default::default(), // TODO: Source this properly! + _params: vec![], + } + .to_query(&library.db) + .exec() + .await + .unwrap(); + + stream + .write_all( + &PairingResponse::Accepted { + library_id: library.id, + library_name: library.config.name.clone().into(), + library_description: library.config.description.clone(), + instances: library + .db + .instance() + .find_many(vec![]) + .exec() + .await + .unwrap() + .into_iter() + .map(|i| Instance { + id: Uuid::from_slice(&i.pub_id).unwrap(), + identity: IdentityOrRemoteIdentity::from_bytes(&i.identity) + .unwrap() + .remote_identity(), + node_id: Uuid::from_slice(&i.node_id).unwrap(), + node_name: i.node_name, + node_platform: Platform::try_from(i.node_platform as u8) + .unwrap_or(Platform::Unknown), + last_seen: i.last_seen.into(), + date_created: i.date_created.into(), + }) + .collect(), + } + .to_bytes(), + ) + .await + .unwrap(); + + // TODO: Pairing confirmation + rollback + + // Called again so the new instances are picked up + library_manager.node.nlm.load_library(&library).await; + + let Header::Connected(remote_identities) = Header::from_stream(&mut stream).await.unwrap() else { + todo!("unreachable; todo error handling"); + }; + + P2PManager::resync_handler( + library_manager.node.nlm.clone(), + &mut stream, + peer_id, + self.metadata_manager.get().instances, + remote_identities, + ) + .await; + + self.emit_progress(pairing_id, PairingStatus::PairingComplete(library_id)); + + library_manager + .node + .nlm + .alert_new_ops(library_id, library.sync.clone()) + .await; + + stream.flush().await.unwrap(); } } diff --git a/core/src/p2p/pairing/proto.rs b/core/src/p2p/pairing/proto.rs index c46270c18..27313d86d 100644 --- a/core/src/p2p/pairing/proto.rs +++ b/core/src/p2p/pairing/proto.rs @@ -10,8 +10,6 @@ use uuid::Uuid; use crate::node::Platform; -use super::ModelData; - /// Terminology: /// Instance - DB model which represents a single `.db` file. /// Originator - begins the pairing process and is asking to join a library that will be selected by the responder. @@ -62,19 +60,6 @@ pub enum PairingConfirmation { Error, } -/// 4. Sync the data in the database with the originator. -/// Sent `Responder` -> `Originator`. -#[derive(Debug, PartialEq)] -pub enum SyncData { - Data { - /// Only included in first request and is an **estimate** of how many models will be sent. - /// It will likely be wrong so should be constrained to being used for UI purposes only. - total_models: Option, - data: ModelData, - }, - Finished, -} - impl Instance { pub async fn from_stream( stream: &mut (impl AsyncRead + Unpin), @@ -222,50 +207,6 @@ impl PairingConfirmation { } } -impl SyncData { - pub async fn from_stream( - stream: &mut (impl AsyncRead + Unpin), - ) -> Result { - let discriminator = stream - .read_u8() - .await - .map_err(|e| ("discriminator", e.into()))?; - - match discriminator { - 0 => Ok(Self::Data { - total_models: match stream - .read_i64_le() - .await - .map_err(|e| ("total_models", e.into()))? - { - 0 => None, - n => Some(n), - }, - data: rmp_serde::from_slice(&decode::buf(stream).await.map_err(|e| ("data", e))?) - .unwrap(), // TODO: Error handling - }), - 1 => Ok(Self::Finished), - _ => todo!(), // TODO: Error handling - } - } - - pub fn to_bytes(&self) -> Result, rmp_serde::encode::Error> { - let mut buf = Vec::new(); - match self { - Self::Data { total_models, data } => { - buf.push(0); - buf.extend((total_models.unwrap_or(0) as i64).to_le_bytes()); - encode::buf(&mut buf, &rmp_serde::to_vec_named(data)?); - } - Self::Finished => { - buf.push(1); - } - } - - Ok(buf) - } -} - #[cfg(test)] mod tests { use sd_p2p::spacetunnel::Identity; @@ -342,24 +283,5 @@ mod tests { let result = PairingConfirmation::from_stream(&mut cursor).await.unwrap(); assert_eq!(original, result); } - - { - let original = SyncData::Data { - total_models: Some(123), - data: ModelData::Location(vec![]), - }; - - let mut cursor = std::io::Cursor::new(original.to_bytes().unwrap()); - let result = SyncData::from_stream(&mut cursor).await.unwrap(); - assert_eq!(original, result); - } - - { - let original = SyncData::Finished; - - let mut cursor = std::io::Cursor::new(original.to_bytes().unwrap()); - let result = SyncData::from_stream(&mut cursor).await.unwrap(); - assert_eq!(original, result); - } } } diff --git a/core/src/p2p/sync/mod.rs b/core/src/p2p/sync/mod.rs index db741be06..6a94d3a15 100644 --- a/core/src/p2p/sync/mod.rs +++ b/core/src/p2p/sync/mod.rs @@ -1,7 +1,5 @@ use std::{collections::HashMap, sync::Arc}; -use futures::future::join_all; -use sd_core_sync::{ingest, GetOpsArgs, SyncManager}; use sd_p2p::{ spacetunnel::{RemoteIdentity, Tunnel}, DiscoveredPeer, PeerId, @@ -17,6 +15,7 @@ use super::{Header, IdentityOrRemoteIdentity, P2PManager, PeerMetadata}; mod proto; pub use proto::*; +#[derive(Debug, Clone, Copy)] pub enum InstanceState { Unavailable, Discovered(PeerId), @@ -201,85 +200,93 @@ impl NetworkedLibraryManager { } // TODO: Error handling - pub async fn alert_new_ops(&self, library_id: Uuid, sync: &Arc) { + pub async fn alert_new_ops(&self, library_id: Uuid, sync: Arc) { debug!("NetworkedLibraryManager::alert_new_ops({library_id})"); - join_all( - self.libraries - .read() - .await - .get(&library_id) - .unwrap() - .instances - .iter() - .filter_map(|(_, i)| match i { - InstanceState::Connected(peer_id) => Some(peer_id), - _ => None, - }) - // TODO: Deduplicate any duplicate peer ids -> This is an edge case but still - .map(|peer_id| { - let p2p = self.p2p.clone(); - async move { - debug!("Alerting peer '{peer_id:?}' of new sync events for library '{library_id:?}'"); + let libraries = self.libraries.read().await; + let library = libraries.get(&library_id).unwrap(); - let mut stream = - p2p.manager.stream(*peer_id).await.map_err(|_| ()).unwrap(); // TODO: handle providing incorrect peer id + // libraries only connecting one-way atm + dbg!(&library.instances); - stream - .write_all(&Header::Sync(library_id).to_bytes()) - .await - .unwrap(); + // TODO: Deduplicate any duplicate peer ids -> This is an edge case but still + for (_, instance) in &library.instances { + let InstanceState::Connected(peer_id) = *instance else { + continue; + }; - let mut tunnel = Tunnel::initiator(stream).await.unwrap(); + let sync = sync.clone(); + let p2p = self.p2p.clone(); - tunnel - .write_all(&SyncMessage::NewOperations.to_bytes()) - .await - .unwrap(); - tunnel.flush().await.unwrap(); + tokio::spawn(async move { + debug!( + "Alerting peer '{peer_id:?}' of new sync events for library '{library_id:?}'" + ); - let id = match SyncMessage::from_stream(&mut tunnel).await.unwrap() { - SyncMessage::OperationsRequest(resp) => resp, - _ => todo!("unreachable but proper error handling"), - }; + let mut stream = p2p + .manager + .stream(peer_id.clone()) + .await + .map_err(|_| ()) + .unwrap(); // TODO: handle providing incorrect peer id - self.exchange_sync_ops(tunnel, peer_id, library_id, sync) - .await; - } - }), - ) - .await; + stream + .write_all(&Header::Sync(library_id).to_bytes()) + .await + .unwrap(); + + let mut tunnel = Tunnel::initiator(stream).await.unwrap(); + + tunnel + .write_all(&SyncMessage::NewOperations.to_bytes()) + .await + .unwrap(); + tunnel.flush().await.unwrap(); + + while let Ok(SyncMessage::OperationsRequest(args)) = + SyncMessage::from_stream(&mut tunnel).await + { + // let args = match .unwrap() { + // => resp, + // _ => todo!("unreachable but proper error handling"), + // }; + + let ops = sync.get_ops(args).await.unwrap(); + + debug!( + "Sending '{}' sync ops from peer '{peer_id:?}' for library '{library_id:?}'", + ops.len() + ); + + tunnel + .write_all(&SyncMessage::OperationsRequestResponse(ops).to_bytes()) + .await + .unwrap(); + tunnel.flush().await.unwrap(); + } + }); + } } // Ask the remote for operations and then ingest them - pub async fn request_and_ingest_ops( + pub async fn request_ops( &self, - mut tunnel: Tunnel, + tunnel: &mut Tunnel, args: GetOpsArgs, - sync: &SyncManager, - library_id: Uuid, - ) { + ) -> Vec { tunnel .write_all(&SyncMessage::OperationsRequest(args).to_bytes()) .await .unwrap(); tunnel.flush().await.unwrap(); - let SyncMessage::OperationsRequestResponse(ops) = SyncMessage::from_stream(&mut tunnel).await.unwrap() else { + let SyncMessage::OperationsRequestResponse(ops) = SyncMessage::from_stream(tunnel).await.unwrap() else { todo!("unreachable but proper error handling") }; // debug!("Received sync events response w/ id '{id}' from peer '{peer_id:?}' for library '{library_id:?}'"); - sync.ingest - .events - .send(ingest::Event::Messages(ingest::MessagesEvent { - instance_id: sync.instance, - messages: ops, - })) - .await - .map_err(|_| "TODO: Handle ingest channel closed, so we don't loose ops") - .unwrap(); + ops } // TODO: Error handling @@ -288,15 +295,10 @@ impl NetworkedLibraryManager { mut tunnel: Tunnel, peer_id: &PeerId, library_id: Uuid, - sync: &SyncManager, + sync: &sync::Manager, + args: GetOpsArgs, ) { - let ops = sync - .get_ops(sd_core_sync::GetOpsArgs { - clocks: vec![], - count: 100, - }) - .await - .unwrap(); + let ops = sync.get_ops(args).await.unwrap(); debug!( "Sending '{}' sync ops from peer '{peer_id:?}' for library '{library_id:?}'", diff --git a/interface/app/p2p/pairing.tsx b/interface/app/p2p/pairing.tsx index d890a8252..287aff645 100644 --- a/interface/app/p2p/pairing.tsx +++ b/interface/app/p2p/pairing.tsx @@ -47,7 +47,6 @@ function OriginatorDialog({ ctaLabel="Done" // closeLabel="Cancel" onSubmit={async () => { - alert('TODO'); // TODO: Change into the new library }} // onCancelled={() => acceptSpacedrop.mutate([props.dropId, null])}