diff --git a/Cargo.lock b/Cargo.lock index 232f993ea..c32ada937 100644 Binary files a/Cargo.lock and b/Cargo.lock differ diff --git a/core/crates/sync/Cargo.toml b/core/crates/sync/Cargo.toml index 0c4c506b7..f68d2d10a 100644 --- a/core/crates/sync/Cargo.toml +++ b/core/crates/sync/Cargo.toml @@ -11,7 +11,6 @@ emit-messages = [] sd-prisma = { path = "../../../crates/prisma" } sd-sync = { path = "../../../crates/sync" } sd-utils = { path = "../../../crates/utils" } -sd-p2p = { path = "../../../crates/p2p" } prisma-client-rust = { workspace = true } serde = { workspace = true } diff --git a/core/crates/sync/src/actor.rs b/core/crates/sync/src/actor.rs index f59430502..ad264d9c2 100644 --- a/core/crates/sync/src/actor.rs +++ b/core/crates/sync/src/actor.rs @@ -18,34 +18,15 @@ impl ActorIO { } pub struct HandlerIO { - handler: T::Handler, - req_rx: mpsc::Receiver, + pub event_tx: mpsc::Sender, + pub req_rx: mpsc::Receiver, } -pub type SplitHandlerIO = ( - ::Handler, - mpsc::Receiver<::Request>, -); - -impl HandlerIO { - pub fn split(self) -> SplitHandlerIO { - (self.handler, self.req_rx) - } -} - -pub fn create_actor_io( - make_handler: fn(mpsc::Sender) -> T::Handler, -) -> (ActorIO, HandlerIO) { +pub fn create_actor_io() -> (ActorIO, HandlerIO) { let (req_tx, req_rx) = mpsc::channel(20); let (event_tx, event_rx) = mpsc::channel(20); - ( - ActorIO { event_rx, req_tx }, - HandlerIO { - handler: make_handler(event_tx), - req_rx, - }, - ) + (ActorIO { event_rx, req_tx }, HandlerIO { event_tx, req_rx }) } #[macro_export] diff --git a/core/crates/sync/src/ingest.rs b/core/crates/sync/src/ingest.rs index 4863c9fb7..5b5bfff42 100644 --- a/core/crates/sync/src/ingest.rs +++ b/core/crates/sync/src/ingest.rs @@ -1,11 +1,10 @@ use std::{ops::Deref, sync::Arc}; -use sd_p2p::spacetunnel::Tunnel; use sd_prisma::{prisma::*, prisma_sync::ModelSyncData}; use sd_sync::*; use sd_utils::uuid_to_bytes; use serde_json::to_vec; -use tokio::sync::mpsc; +use tokio::sync::{mpsc, Mutex}; use uhlc::{Timestamp, NTP64}; use uuid::Uuid; @@ -14,17 +13,14 @@ use crate::{actor::*, wait, SharedState}; #[must_use] /// Stuff that can be handled outside the actor pub enum Request { - Messages { - tunnel: Tunnel, - timestamps: Vec<(Uuid, NTP64)>, - }, + Messages { timestamps: Vec<(Uuid, NTP64)> }, Ingested, } /// Stuff that the actor consumes #[derive(Debug)] pub enum Event { - Notification(NotificationEvent), + Notification, Messages(MessagesEvent), } @@ -32,7 +28,7 @@ pub enum Event { pub enum State { #[default] WaitingForNotification, - RetrievingMessages(Tunnel), + RetrievingMessages, Ingesting(MessagesEvent), } @@ -46,14 +42,13 @@ impl Actor { async fn tick(mut self) -> Option { let state = match self.state.take()? { State::WaitingForNotification => { - let notification = wait!(self.io.event_rx, Event::Notification(n) => n); + wait!(self.io.event_rx, Event::Notification); - State::RetrievingMessages(notification.tunnel) + State::RetrievingMessages } - State::RetrievingMessages(tunnel) => { + State::RetrievingMessages => { self.io .send(Request::Messages { - tunnel, timestamps: self .timestamps .read() @@ -80,7 +75,7 @@ impl Actor { println!("Ingested {count} messages!"); match event.has_more { - true => State::RetrievingMessages(event.tunnel), + true => State::RetrievingMessages, false => State::WaitingForNotification, } } @@ -92,8 +87,8 @@ impl Actor { }) } - pub fn spawn(shared: Arc) -> SplitHandlerIO { - let (actor_io, handler_io) = create_actor_io::(|event_tx| Handler { event_tx }); + pub fn spawn(shared: Arc) -> Handler { + let (actor_io, handler_io) = create_actor_io::(); tokio::spawn(async move { let mut this = Self { @@ -110,7 +105,10 @@ impl Actor { } }); - handler_io.split() + Handler { + event_tx: handler_io.event_tx, + req_rx: Arc::new(Mutex::new(handler_io.req_rx)), + } } async fn receive_crdt_operation(&mut self, op: CRDTOperation) { @@ -248,6 +246,7 @@ impl Deref for Actor { pub struct Handler { pub event_tx: mpsc::Sender, + pub req_rx: Arc>>, } #[derive(Debug)] @@ -255,12 +254,6 @@ pub struct MessagesEvent { pub instance_id: Uuid, pub messages: Vec, pub has_more: bool, - pub tunnel: Tunnel, -} - -#[derive(Debug)] -pub struct NotificationEvent { - pub tunnel: Tunnel, } impl ActorTypes for Actor { diff --git a/core/crates/sync/src/manager.rs b/core/crates/sync/src/manager.rs index 3df8a726a..42ba1b158 100644 --- a/core/crates/sync/src/manager.rs +++ b/core/crates/sync/src/manager.rs @@ -4,7 +4,7 @@ use sd_utils::uuid_to_bytes; use crate::{db_operation::*, *}; use std::{cmp::Ordering, ops::Deref, sync::Arc}; -use tokio::sync::{broadcast, mpsc}; +use tokio::sync::broadcast; use uhlc::{HLCBuilder, HLC}; use uuid::Uuid; @@ -17,7 +17,6 @@ pub struct Manager { pub struct SyncManagerNew { pub manager: Manager, pub rx: broadcast::Receiver, - pub ingest_rx: mpsc::Receiver, } #[derive(serde::Serialize, serde::Deserialize, Debug, PartialEq, Eq)] @@ -40,12 +39,11 @@ impl Manager { clock, }); - let (ingest, ingest_rx) = ingest::Actor::spawn(shared.clone()); + let ingest = ingest::Actor::spawn(shared.clone()); SyncManagerNew { manager: Self { shared, tx, ingest }, rx, - ingest_rx, } } diff --git a/core/crates/sync/tests/lib.rs b/core/crates/sync/tests/lib.rs index 194eb3572..942bc67d0 100644 --- a/core/crates/sync/tests/lib.rs +++ b/core/crates/sync/tests/lib.rs @@ -5,30 +5,23 @@ use sd_utils::uuid_to_bytes; use prisma_client_rust::chrono::Utc; use serde_json::json; -use std::{sync::Arc, time::Duration}; -use tokio::sync::{broadcast, mpsc}; +use std::sync::Arc; +use tokio::sync::broadcast; use uuid::Uuid; fn db_path(id: Uuid) -> String { - format!("./tests/test-{id}.db") + format!("/tmp/test-{id}.db") } #[derive(Clone)] struct Instance { id: Uuid, - _peer_id: sd_p2p::PeerId, db: Arc, sync: Arc, } impl Instance { - async fn new( - id: Uuid, - ) -> ( - Arc, - broadcast::Receiver, - mpsc::Receiver, - ) { + async fn new(id: Uuid) -> (Arc, broadcast::Receiver) { let db = Arc::new( prisma::PrismaClient::_builder() .with_url(format!("file:{}", db_path(id))) @@ -60,11 +53,9 @@ impl Instance { Arc::new(Self { id, db, - _peer_id: sd_p2p::PeerId::random(), sync: Arc::new(sync.manager), }), sync.rx, - sync.ingest_rx, ) } @@ -110,8 +101,8 @@ impl Instance { #[tokio::test] async fn bruh() -> Result<(), Box> { - let (instance1, mut sync_rx1, _) = Instance::new(Uuid::new_v4()).await; - let (instance2, _, mut ingest_rx2) = Instance::new(Uuid::new_v4()).await; + let (instance1, mut sync_rx1) = Instance::new(Uuid::new_v4()).await; + let (instance2, mut sync_rx2) = Instance::new(Uuid::new_v4()).await; Instance::pair(&instance1, &instance2).await; @@ -122,9 +113,13 @@ async fn bruh() -> Result<(), Box> { async move { while let Ok(msg) = sync_rx1.recv().await { match msg { - SyncMessage::Created => { - instance2.sync.ingest.event_tx.send(todo!()).await.unwrap() - } + SyncMessage::Created => instance2 + .sync + .ingest + .event_tx + .send(ingest::Event::Notification) + .await + .unwrap(), _ => {} } } @@ -136,7 +131,7 @@ async fn bruh() -> Result<(), Box> { let instance2 = instance2.clone(); async move { - while let Some(msg) = ingest_rx2.recv().await { + while let Some(msg) = instance2.sync.ingest.req_rx.lock().await.recv().await { match msg { ingest::Request::Messages { timestamps, .. } => { let messages = instance1 @@ -148,12 +143,10 @@ async fn bruh() -> Result<(), Box> { .await .unwrap(); - instance2 - .sync - .ingest + let ingest = &instance2.sync.ingest; + ingest .event_tx .send(ingest::Event::Messages(ingest::MessagesEvent { - tunnel: todo!(), messages, has_more: false, instance_id: instance1.id, @@ -161,7 +154,9 @@ async fn bruh() -> Result<(), Box> { .await .unwrap(); } - _ => {} + ingest::Request::Ingested => { + instance2.sync.tx.send(SyncMessage::Ingested).ok(); + } } } } @@ -202,10 +197,18 @@ async fn bruh() -> Result<(), Box> { }) .await?; - tokio::time::sleep(Duration::from_millis(10)).await; + assert!(matches!(sync_rx2.recv().await?, SyncMessage::Ingested)); - // assert_eq!(out.len(), 3); - // assert!(matches!(out[0].typ, CRDTOperationType::Shared(_))); + let out = instance2 + .sync + .get_ops(GetOpsArgs { + clocks: vec![], + count: 100, + }) + .await?; + + assert_eq!(out.len(), 3); + assert!(matches!(out[0].typ, CRDTOperationType::Shared(_))); instance1.teardown().await; instance2.teardown().await; diff --git a/core/src/library/manager/mod.rs b/core/src/library/manager/mod.rs index a67f05fd9..7ae1d545a 100644 --- a/core/src/library/manager/mod.rs +++ b/core/src/library/manager/mod.rs @@ -3,7 +3,7 @@ use crate::{ location::indexer, node::Platform, object::tag, - p2p::IdentityOrRemoteIdentity, + p2p::{self, IdentityOrRemoteIdentity}, prisma::location, sync, util::{ @@ -399,43 +399,9 @@ impl Libraries { async move { loop { - tokio::select! { - req = sync.ingest_rx.recv() => { - use sd_core_sync::ingest; + let Ok(SyncMessage::Created) = sync.rx.recv().await else { continue }; - let Some(req) = req else { continue; }; - - const OPS_PER_REQUEST: u32 = 100; - - match req { - ingest::Request::Messages { mut tunnel, timestamps } => { - let ops = node.nlm.request_ops( - &mut tunnel, - sd_core_sync::GetOpsArgs { clocks: timestamps, count: OPS_PER_REQUEST }, - ).await; - - library.sync.ingest - .event_tx - .send(ingest::Event::Messages(ingest::MessagesEvent { - tunnel, - instance_id: library.sync.instance, - has_more: ops.len() == OPS_PER_REQUEST as usize, - messages: ops, - })) - .await - .expect("TODO: Handle ingest channel closed, so we don't loose ops"); - }, - _ => {} - } - }, - msg = sync.rx.recv() => { - if let Ok(op) = msg { - let SyncMessage::Created = op else { continue; }; - - node.nlm.alert_new_ops(id, &library.sync).await; - } - }, - } + p2p::sync::originator(id, &library.sync, &node.nlm, &node.p2p).await; } } }); diff --git a/core/src/p2p/p2p_manager.rs b/core/src/p2p/p2p_manager.rs index 12b1d29b8..eb7870688 100644 --- a/core/src/p2p/p2p_manager.rs +++ b/core/src/p2p/p2p_manager.rs @@ -87,8 +87,12 @@ impl P2PManager { // TODO: Delay building this until the libraries are loaded let metadata_manager = MetadataManager::new(config); - let (manager, stream) = - Manager::new(SPACEDRIVE_APP_ID, &keypair, metadata_manager.clone()).await?; + let (manager, stream) = sd_p2p::Manager::::new( + SPACEDRIVE_APP_ID, + &keypair, + metadata_manager.clone(), + ) + .await?; info!( "Node '{}' is now online listening at addresses: {:?}", @@ -257,9 +261,6 @@ impl P2PManager { .await; } Header::Sync(library_id) => { - // Header -> Tunnel -> SyncMessage - use sd_core_sync::ingest; - let mut tunnel = Tunnel::responder(stream).await.unwrap(); let msg = @@ -270,31 +271,15 @@ impl P2PManager { dbg!(&msg); - let ingest = &library.sync.ingest; - match msg { SyncMessage::NewOperations => { - // The ends up in `NetworkedLibraryManager::request_and_ingest_ops`. - // TODO: Throw tunnel around like this makes it soooo confusing. - ingest - .event_tx - .send(ingest::Event::Notification( - ingest::NotificationEvent { tunnel }, - )) - .await - .ok(); - } - SyncMessage::OperationsRequest(_) => { - todo!("this should be received somewhere else!"); - } - SyncMessage::OperationsRequestResponse(_) => { - todo!("unreachable but add proper error handling") + super::sync::responder(tunnel, library).await; } }; } Header::Connected(identities) => { Self::resync_handler( - node.nlm.clone(), + &node.nlm, &mut stream, event.peer_id, metadata_manager.get().instances, @@ -368,7 +353,7 @@ impl P2PManager { } pub async fn resync_handler( - nlm: Arc, + nlm: &NetworkedLibraries, stream: &mut (impl AsyncRead + AsyncWrite + Unpin), peer_id: PeerId, local_identities: Vec, diff --git a/core/src/p2p/pairing/mod.rs b/core/src/p2p/pairing/mod.rs index b170ba005..a33ec736a 100644 --- a/core/src/p2p/pairing/mod.rs +++ b/core/src/p2p/pairing/mod.rs @@ -329,7 +329,7 @@ impl PairingManager { }; P2PManager::resync_handler( - node.nlm.clone(), + &node.nlm, &mut stream, peer_id, self.metadata_manager.get().instances, @@ -339,9 +339,7 @@ impl PairingManager { self.emit_progress(pairing_id, PairingStatus::PairingComplete(library_id)); - node.nlm - .alert_new_ops(library_id, &library.sync.clone()) - .await; + super::sync::originator(library_id, &library.sync, &node.nlm, &node.p2p).await; stream.flush().await.unwrap(); } diff --git a/core/src/p2p/sync/mod.rs b/core/src/p2p/sync/mod.rs index dd72f1ad1..14a8b5437 100644 --- a/core/src/p2p/sync/mod.rs +++ b/core/src/p2p/sync/mod.rs @@ -1,11 +1,18 @@ use std::{collections::HashMap, sync::Arc}; +use sd_core_sync::ingest; use sd_p2p::{ + proto::{decode, encode}, spacetunnel::{RemoteIdentity, Tunnel}, DiscoveredPeer, PeerId, }; +use sd_sync::CRDTOperation; use sync::GetOpsArgs; -use tokio::{io::AsyncWriteExt, sync::RwLock}; + +use tokio::{ + io::{AsyncRead, AsyncWriteExt}, + sync::RwLock, +}; use tracing::{debug, error}; use uuid::Uuid; @@ -32,7 +39,7 @@ pub struct LibraryData { pub struct NetworkedLibraries { p2p: Arc, - libraries: RwLock>, + pub(crate) libraries: RwLock>, } impl NetworkedLibraries { @@ -205,12 +212,51 @@ impl NetworkedLibraries { } } } +} - // TODO: Error handling - pub async fn alert_new_ops(&self, library_id: Uuid, sync: &Arc) { - debug!("NetworkedLibraryManager::alert_new_ops({library_id})"); +// These functions could be moved to some separate protocol abstraction +// which would be pretty cool. +// +// TODO: Error handling - let libraries = self.libraries.read().await; +pub use originator::run as originator; +mod originator { + use super::*; + use responder::tx as rx; + + pub mod tx { + use super::*; + + pub struct Operations(pub Vec); + + impl Operations { + // TODO: Per field errors for better error handling + pub async fn from_stream( + stream: &mut (impl AsyncRead + Unpin), + ) -> std::io::Result { + Ok(Self( + rmp_serde::from_slice(&decode::buf(stream).await.unwrap()).unwrap(), + )) + } + + pub fn to_bytes(&self) -> Vec { + let Self(args) = self; + let mut buf = vec![]; + + // TODO: Error handling + encode::buf(&mut buf, &rmp_serde::to_vec_named(&args).unwrap()); + buf + } + } + } + + pub async fn run( + library_id: Uuid, + sync: &Arc, + nlm: &NetworkedLibraries, + p2p: &Arc, + ) { + let libraries = nlm.libraries.read().await; let library = libraries.get(&library_id).unwrap(); // libraries only connecting one-way atm @@ -223,7 +269,7 @@ impl NetworkedLibraries { }; let sync = sync.clone(); - let p2p = self.p2p.clone(); + let p2p = p2p.clone(); tokio::spawn(async move { debug!( @@ -250,14 +296,9 @@ impl NetworkedLibraries { .unwrap(); tunnel.flush().await.unwrap(); - while let Ok(SyncMessage::OperationsRequest(args)) = - SyncMessage::from_stream(&mut tunnel).await + while let Ok(rx::GetOperations(args)) = + rx::GetOperations::from_stream(&mut tunnel).await { - // let args = match .unwrap() { - // => resp, - // _ => todo!("unreachable but proper error handling"), - // }; - let ops = sync.get_ops(args).await.unwrap(); debug!( @@ -266,7 +307,7 @@ impl NetworkedLibraries { ); tunnel - .write_all(&SyncMessage::OperationsRequestResponse(ops).to_bytes()) + .write_all(&tx::Operations(ops).to_bytes()) .await .unwrap(); tunnel.flush().await.unwrap(); @@ -274,47 +315,85 @@ impl NetworkedLibraries { }); } } +} - // Ask the remote for operations and then ingest them - pub async fn request_ops( - &self, - tunnel: &mut Tunnel, - args: GetOpsArgs, - ) -> Vec { - tunnel - .write_all(&SyncMessage::OperationsRequest(args).to_bytes()) - .await - .unwrap(); - tunnel.flush().await.unwrap(); +pub use responder::run as responder; +mod responder { + use super::*; + use originator::tx as rx; - let SyncMessage::OperationsRequestResponse(ops) = SyncMessage::from_stream(tunnel).await.unwrap() else { - todo!("unreachable but proper error handling") - }; + pub mod tx { + use super::*; - // debug!("Received sync events response w/ id '{id}' from peer '{peer_id:?}' for library '{library_id:?}'"); + pub struct GetOperations(pub GetOpsArgs); - ops + impl GetOperations { + // TODO: Per field errors for better error handling + pub async fn from_stream( + stream: &mut (impl AsyncRead + Unpin), + ) -> std::io::Result { + Ok(Self( + // TODO: Error handling + rmp_serde::from_slice(&decode::buf(stream).await.unwrap()).unwrap(), + )) + } + + pub fn to_bytes(&self) -> Vec { + let Self(ops) = self; + + let mut buf = vec![]; + + // TODO: Error handling + encode::buf(&mut buf, &rmp_serde::to_vec_named(&ops).unwrap()); + buf + } + } } - // TODO: Error handling - pub async fn exchange_sync_ops( - &self, - mut tunnel: Tunnel, - peer_id: &PeerId, - library_id: Uuid, - sync: &sync::Manager, - args: GetOpsArgs, - ) { - let ops = sync.get_ops(args).await.unwrap(); + pub async fn run(mut tunnel: Tunnel, library: Arc) { + let ingest = &library.sync.ingest; - debug!( - "Sending '{}' sync ops from peer '{peer_id:?}' for library '{library_id:?}'", - ops.len() - ); + let Ok(mut rx) = + ingest.req_rx.try_lock() else { + return; + }; - tunnel - .write_all(&SyncMessage::OperationsRequestResponse(ops).to_bytes()) + ingest + .event_tx + .send(ingest::Event::Notification) .await .unwrap(); + + while let Some(req) = rx.recv().await { + use sync::ingest::*; + + const OPS_PER_REQUEST: u32 = 100; + + let Request::Messages { timestamps } = req else { continue }; + + tunnel + .write_all( + &tx::GetOperations(sync::GetOpsArgs { + clocks: timestamps, + count: OPS_PER_REQUEST, + }) + .to_bytes(), + ) + .await + .unwrap(); + tunnel.flush().await.unwrap(); + + let rx::Operations(ops) = rx::Operations::from_stream(&mut tunnel).await.unwrap(); + + ingest + .event_tx + .send(Event::Messages(MessagesEvent { + instance_id: library.sync.instance, + has_more: ops.len() == OPS_PER_REQUEST as usize, + messages: ops, + })) + .await + .expect("TODO: Handle ingest channel closed, so we don't loose ops"); + } } } diff --git a/core/src/p2p/sync/proto.rs b/core/src/p2p/sync/proto.rs index bb14540f3..9d4e8a6aa 100644 --- a/core/src/p2p/sync/proto.rs +++ b/core/src/p2p/sync/proto.rs @@ -1,13 +1,9 @@ -use sd_core_sync::GetOpsArgs; -use sd_p2p::proto::{decode, encode}; -use sd_sync::CRDTOperation; use tokio::io::{AsyncRead, AsyncReadExt}; +// will probs have more variants in future #[derive(Debug, PartialEq, Eq)] pub enum SyncMessage { NewOperations, - OperationsRequest(GetOpsArgs), - OperationsRequestResponse(Vec), } impl SyncMessage { @@ -15,13 +11,6 @@ impl SyncMessage { pub async fn from_stream(stream: &mut (impl AsyncRead + Unpin)) -> std::io::Result { match stream.read_u8().await? { b'N' => Ok(Self::NewOperations), - b'R' => Ok(Self::OperationsRequest( - rmp_serde::from_slice(&decode::buf(stream).await.unwrap()).unwrap(), - )), - b'P' => Ok(Self::OperationsRequestResponse( - // TODO: Error handling - rmp_serde::from_slice(&decode::buf(stream).await.unwrap()).unwrap(), - )), header => Err(std::io::Error::new( std::io::ErrorKind::InvalidData, format!("Invalid sync message header: {}", (header as char)), @@ -32,20 +21,6 @@ impl SyncMessage { pub fn to_bytes(&self) -> Vec { match self { Self::NewOperations => vec![b'N'], - Self::OperationsRequest(args) => { - let mut buf = vec![b'R']; - - // TODO: Error handling - encode::buf(&mut buf, &rmp_serde::to_vec_named(&args).unwrap()); - buf - } - Self::OperationsRequestResponse(ops) => { - let mut buf = vec![b'P']; - - // TODO: Error handling - encode::buf(&mut buf, &rmp_serde::to_vec_named(&ops).unwrap()); - buf - } } } }