Isolate sync p2p logic (#1198)

* remove p2p from sd-core-sync and isolate p2p handling

* re-enable tests and stop being silly

* move sync protocol to modules

* don't emit messages

* cleanup

* formatting
This commit is contained in:
Brendan Allan
2023-08-11 01:32:20 -07:00
committed by GitHub
parent d930b22265
commit 262a0dd922
11 changed files with 192 additions and 215 deletions

BIN
Cargo.lock generated
View File

Binary file not shown.

View File

@@ -11,7 +11,6 @@ emit-messages = []
sd-prisma = { path = "../../../crates/prisma" }
sd-sync = { path = "../../../crates/sync" }
sd-utils = { path = "../../../crates/utils" }
sd-p2p = { path = "../../../crates/p2p" }
prisma-client-rust = { workspace = true }
serde = { workspace = true }

View File

@@ -18,34 +18,15 @@ impl<T: ActorTypes> ActorIO<T> {
}
pub struct HandlerIO<T: ActorTypes> {
handler: T::Handler,
req_rx: mpsc::Receiver<T::Request>,
pub event_tx: mpsc::Sender<T::Event>,
pub req_rx: mpsc::Receiver<T::Request>,
}
pub type SplitHandlerIO<T> = (
<T as ActorTypes>::Handler,
mpsc::Receiver<<T as ActorTypes>::Request>,
);
impl<T: ActorTypes> HandlerIO<T> {
pub fn split(self) -> SplitHandlerIO<T> {
(self.handler, self.req_rx)
}
}
pub fn create_actor_io<T: ActorTypes>(
make_handler: fn(mpsc::Sender<T::Event>) -> T::Handler,
) -> (ActorIO<T>, HandlerIO<T>) {
pub fn create_actor_io<T: ActorTypes>() -> (ActorIO<T>, HandlerIO<T>) {
let (req_tx, req_rx) = mpsc::channel(20);
let (event_tx, event_rx) = mpsc::channel(20);
(
ActorIO { event_rx, req_tx },
HandlerIO {
handler: make_handler(event_tx),
req_rx,
},
)
(ActorIO { event_rx, req_tx }, HandlerIO { event_tx, req_rx })
}
#[macro_export]

View File

@@ -1,11 +1,10 @@
use std::{ops::Deref, sync::Arc};
use sd_p2p::spacetunnel::Tunnel;
use sd_prisma::{prisma::*, prisma_sync::ModelSyncData};
use sd_sync::*;
use sd_utils::uuid_to_bytes;
use serde_json::to_vec;
use tokio::sync::mpsc;
use tokio::sync::{mpsc, Mutex};
use uhlc::{Timestamp, NTP64};
use uuid::Uuid;
@@ -14,17 +13,14 @@ use crate::{actor::*, wait, SharedState};
#[must_use]
/// Stuff that can be handled outside the actor
pub enum Request {
Messages {
tunnel: Tunnel,
timestamps: Vec<(Uuid, NTP64)>,
},
Messages { timestamps: Vec<(Uuid, NTP64)> },
Ingested,
}
/// Stuff that the actor consumes
#[derive(Debug)]
pub enum Event {
Notification(NotificationEvent),
Notification,
Messages(MessagesEvent),
}
@@ -32,7 +28,7 @@ pub enum Event {
pub enum State {
#[default]
WaitingForNotification,
RetrievingMessages(Tunnel),
RetrievingMessages,
Ingesting(MessagesEvent),
}
@@ -46,14 +42,13 @@ impl Actor {
async fn tick(mut self) -> Option<Self> {
let state = match self.state.take()? {
State::WaitingForNotification => {
let notification = wait!(self.io.event_rx, Event::Notification(n) => n);
wait!(self.io.event_rx, Event::Notification);
State::RetrievingMessages(notification.tunnel)
State::RetrievingMessages
}
State::RetrievingMessages(tunnel) => {
State::RetrievingMessages => {
self.io
.send(Request::Messages {
tunnel,
timestamps: self
.timestamps
.read()
@@ -80,7 +75,7 @@ impl Actor {
println!("Ingested {count} messages!");
match event.has_more {
true => State::RetrievingMessages(event.tunnel),
true => State::RetrievingMessages,
false => State::WaitingForNotification,
}
}
@@ -92,8 +87,8 @@ impl Actor {
})
}
pub fn spawn(shared: Arc<SharedState>) -> SplitHandlerIO<Self> {
let (actor_io, handler_io) = create_actor_io::<Self>(|event_tx| Handler { event_tx });
pub fn spawn(shared: Arc<SharedState>) -> Handler {
let (actor_io, handler_io) = create_actor_io::<Self>();
tokio::spawn(async move {
let mut this = Self {
@@ -110,7 +105,10 @@ impl Actor {
}
});
handler_io.split()
Handler {
event_tx: handler_io.event_tx,
req_rx: Arc::new(Mutex::new(handler_io.req_rx)),
}
}
async fn receive_crdt_operation(&mut self, op: CRDTOperation) {
@@ -248,6 +246,7 @@ impl Deref for Actor {
pub struct Handler {
pub event_tx: mpsc::Sender<Event>,
pub req_rx: Arc<Mutex<mpsc::Receiver<Request>>>,
}
#[derive(Debug)]
@@ -255,12 +254,6 @@ pub struct MessagesEvent {
pub instance_id: Uuid,
pub messages: Vec<CRDTOperation>,
pub has_more: bool,
pub tunnel: Tunnel,
}
#[derive(Debug)]
pub struct NotificationEvent {
pub tunnel: Tunnel,
}
impl ActorTypes for Actor {

View File

@@ -4,7 +4,7 @@ use sd_utils::uuid_to_bytes;
use crate::{db_operation::*, *};
use std::{cmp::Ordering, ops::Deref, sync::Arc};
use tokio::sync::{broadcast, mpsc};
use tokio::sync::broadcast;
use uhlc::{HLCBuilder, HLC};
use uuid::Uuid;
@@ -17,7 +17,6 @@ pub struct Manager {
pub struct SyncManagerNew {
pub manager: Manager,
pub rx: broadcast::Receiver<SyncMessage>,
pub ingest_rx: mpsc::Receiver<ingest::Request>,
}
#[derive(serde::Serialize, serde::Deserialize, Debug, PartialEq, Eq)]
@@ -40,12 +39,11 @@ impl Manager {
clock,
});
let (ingest, ingest_rx) = ingest::Actor::spawn(shared.clone());
let ingest = ingest::Actor::spawn(shared.clone());
SyncManagerNew {
manager: Self { shared, tx, ingest },
rx,
ingest_rx,
}
}

View File

@@ -5,30 +5,23 @@ use sd_utils::uuid_to_bytes;
use prisma_client_rust::chrono::Utc;
use serde_json::json;
use std::{sync::Arc, time::Duration};
use tokio::sync::{broadcast, mpsc};
use std::sync::Arc;
use tokio::sync::broadcast;
use uuid::Uuid;
fn db_path(id: Uuid) -> String {
format!("./tests/test-{id}.db")
format!("/tmp/test-{id}.db")
}
#[derive(Clone)]
struct Instance {
id: Uuid,
_peer_id: sd_p2p::PeerId,
db: Arc<prisma::PrismaClient>,
sync: Arc<sd_core_sync::Manager>,
}
impl Instance {
async fn new(
id: Uuid,
) -> (
Arc<Self>,
broadcast::Receiver<SyncMessage>,
mpsc::Receiver<ingest::Request>,
) {
async fn new(id: Uuid) -> (Arc<Self>, broadcast::Receiver<SyncMessage>) {
let db = Arc::new(
prisma::PrismaClient::_builder()
.with_url(format!("file:{}", db_path(id)))
@@ -60,11 +53,9 @@ impl Instance {
Arc::new(Self {
id,
db,
_peer_id: sd_p2p::PeerId::random(),
sync: Arc::new(sync.manager),
}),
sync.rx,
sync.ingest_rx,
)
}
@@ -110,8 +101,8 @@ impl Instance {
#[tokio::test]
async fn bruh() -> Result<(), Box<dyn std::error::Error>> {
let (instance1, mut sync_rx1, _) = Instance::new(Uuid::new_v4()).await;
let (instance2, _, mut ingest_rx2) = Instance::new(Uuid::new_v4()).await;
let (instance1, mut sync_rx1) = Instance::new(Uuid::new_v4()).await;
let (instance2, mut sync_rx2) = Instance::new(Uuid::new_v4()).await;
Instance::pair(&instance1, &instance2).await;
@@ -122,9 +113,13 @@ async fn bruh() -> Result<(), Box<dyn std::error::Error>> {
async move {
while let Ok(msg) = sync_rx1.recv().await {
match msg {
SyncMessage::Created => {
instance2.sync.ingest.event_tx.send(todo!()).await.unwrap()
}
SyncMessage::Created => instance2
.sync
.ingest
.event_tx
.send(ingest::Event::Notification)
.await
.unwrap(),
_ => {}
}
}
@@ -136,7 +131,7 @@ async fn bruh() -> Result<(), Box<dyn std::error::Error>> {
let instance2 = instance2.clone();
async move {
while let Some(msg) = ingest_rx2.recv().await {
while let Some(msg) = instance2.sync.ingest.req_rx.lock().await.recv().await {
match msg {
ingest::Request::Messages { timestamps, .. } => {
let messages = instance1
@@ -148,12 +143,10 @@ async fn bruh() -> Result<(), Box<dyn std::error::Error>> {
.await
.unwrap();
instance2
.sync
.ingest
let ingest = &instance2.sync.ingest;
ingest
.event_tx
.send(ingest::Event::Messages(ingest::MessagesEvent {
tunnel: todo!(),
messages,
has_more: false,
instance_id: instance1.id,
@@ -161,7 +154,9 @@ async fn bruh() -> Result<(), Box<dyn std::error::Error>> {
.await
.unwrap();
}
_ => {}
ingest::Request::Ingested => {
instance2.sync.tx.send(SyncMessage::Ingested).ok();
}
}
}
}
@@ -202,10 +197,18 @@ async fn bruh() -> Result<(), Box<dyn std::error::Error>> {
})
.await?;
tokio::time::sleep(Duration::from_millis(10)).await;
assert!(matches!(sync_rx2.recv().await?, SyncMessage::Ingested));
// assert_eq!(out.len(), 3);
// assert!(matches!(out[0].typ, CRDTOperationType::Shared(_)));
let out = instance2
.sync
.get_ops(GetOpsArgs {
clocks: vec![],
count: 100,
})
.await?;
assert_eq!(out.len(), 3);
assert!(matches!(out[0].typ, CRDTOperationType::Shared(_)));
instance1.teardown().await;
instance2.teardown().await;

View File

@@ -3,7 +3,7 @@ use crate::{
location::indexer,
node::Platform,
object::tag,
p2p::IdentityOrRemoteIdentity,
p2p::{self, IdentityOrRemoteIdentity},
prisma::location,
sync,
util::{
@@ -399,43 +399,9 @@ impl Libraries {
async move {
loop {
tokio::select! {
req = sync.ingest_rx.recv() => {
use sd_core_sync::ingest;
let Ok(SyncMessage::Created) = sync.rx.recv().await else { continue };
let Some(req) = req else { continue; };
const OPS_PER_REQUEST: u32 = 100;
match req {
ingest::Request::Messages { mut tunnel, timestamps } => {
let ops = node.nlm.request_ops(
&mut tunnel,
sd_core_sync::GetOpsArgs { clocks: timestamps, count: OPS_PER_REQUEST },
).await;
library.sync.ingest
.event_tx
.send(ingest::Event::Messages(ingest::MessagesEvent {
tunnel,
instance_id: library.sync.instance,
has_more: ops.len() == OPS_PER_REQUEST as usize,
messages: ops,
}))
.await
.expect("TODO: Handle ingest channel closed, so we don't loose ops");
},
_ => {}
}
},
msg = sync.rx.recv() => {
if let Ok(op) = msg {
let SyncMessage::Created = op else { continue; };
node.nlm.alert_new_ops(id, &library.sync).await;
}
},
}
p2p::sync::originator(id, &library.sync, &node.nlm, &node.p2p).await;
}
}
});

View File

@@ -87,8 +87,12 @@ impl P2PManager {
// TODO: Delay building this until the libraries are loaded
let metadata_manager = MetadataManager::new(config);
let (manager, stream) =
Manager::new(SPACEDRIVE_APP_ID, &keypair, metadata_manager.clone()).await?;
let (manager, stream) = sd_p2p::Manager::<PeerMetadata>::new(
SPACEDRIVE_APP_ID,
&keypair,
metadata_manager.clone(),
)
.await?;
info!(
"Node '{}' is now online listening at addresses: {:?}",
@@ -257,9 +261,6 @@ impl P2PManager {
.await;
}
Header::Sync(library_id) => {
// Header -> Tunnel -> SyncMessage
use sd_core_sync::ingest;
let mut tunnel = Tunnel::responder(stream).await.unwrap();
let msg =
@@ -270,31 +271,15 @@ impl P2PManager {
dbg!(&msg);
let ingest = &library.sync.ingest;
match msg {
SyncMessage::NewOperations => {
// The ends up in `NetworkedLibraryManager::request_and_ingest_ops`.
// TODO: Throw tunnel around like this makes it soooo confusing.
ingest
.event_tx
.send(ingest::Event::Notification(
ingest::NotificationEvent { tunnel },
))
.await
.ok();
}
SyncMessage::OperationsRequest(_) => {
todo!("this should be received somewhere else!");
}
SyncMessage::OperationsRequestResponse(_) => {
todo!("unreachable but add proper error handling")
super::sync::responder(tunnel, library).await;
}
};
}
Header::Connected(identities) => {
Self::resync_handler(
node.nlm.clone(),
&node.nlm,
&mut stream,
event.peer_id,
metadata_manager.get().instances,
@@ -368,7 +353,7 @@ impl P2PManager {
}
pub async fn resync_handler(
nlm: Arc<NetworkedLibraries>,
nlm: &NetworkedLibraries,
stream: &mut (impl AsyncRead + AsyncWrite + Unpin),
peer_id: PeerId,
local_identities: Vec<RemoteIdentity>,

View File

@@ -329,7 +329,7 @@ impl PairingManager {
};
P2PManager::resync_handler(
node.nlm.clone(),
&node.nlm,
&mut stream,
peer_id,
self.metadata_manager.get().instances,
@@ -339,9 +339,7 @@ impl PairingManager {
self.emit_progress(pairing_id, PairingStatus::PairingComplete(library_id));
node.nlm
.alert_new_ops(library_id, &library.sync.clone())
.await;
super::sync::originator(library_id, &library.sync, &node.nlm, &node.p2p).await;
stream.flush().await.unwrap();
}

View File

@@ -1,11 +1,18 @@
use std::{collections::HashMap, sync::Arc};
use sd_core_sync::ingest;
use sd_p2p::{
proto::{decode, encode},
spacetunnel::{RemoteIdentity, Tunnel},
DiscoveredPeer, PeerId,
};
use sd_sync::CRDTOperation;
use sync::GetOpsArgs;
use tokio::{io::AsyncWriteExt, sync::RwLock};
use tokio::{
io::{AsyncRead, AsyncWriteExt},
sync::RwLock,
};
use tracing::{debug, error};
use uuid::Uuid;
@@ -32,7 +39,7 @@ pub struct LibraryData {
pub struct NetworkedLibraries {
p2p: Arc<P2PManager>,
libraries: RwLock<HashMap<Uuid /* Library ID */, LibraryData>>,
pub(crate) libraries: RwLock<HashMap<Uuid /* Library ID */, LibraryData>>,
}
impl NetworkedLibraries {
@@ -205,12 +212,51 @@ impl NetworkedLibraries {
}
}
}
}
// TODO: Error handling
pub async fn alert_new_ops(&self, library_id: Uuid, sync: &Arc<sync::Manager>) {
debug!("NetworkedLibraryManager::alert_new_ops({library_id})");
// These functions could be moved to some separate protocol abstraction
// which would be pretty cool.
//
// TODO: Error handling
let libraries = self.libraries.read().await;
pub use originator::run as originator;
mod originator {
use super::*;
use responder::tx as rx;
pub mod tx {
use super::*;
pub struct Operations(pub Vec<CRDTOperation>);
impl Operations {
// TODO: Per field errors for better error handling
pub async fn from_stream(
stream: &mut (impl AsyncRead + Unpin),
) -> std::io::Result<Self> {
Ok(Self(
rmp_serde::from_slice(&decode::buf(stream).await.unwrap()).unwrap(),
))
}
pub fn to_bytes(&self) -> Vec<u8> {
let Self(args) = self;
let mut buf = vec![];
// TODO: Error handling
encode::buf(&mut buf, &rmp_serde::to_vec_named(&args).unwrap());
buf
}
}
}
pub async fn run(
library_id: Uuid,
sync: &Arc<sync::Manager>,
nlm: &NetworkedLibraries,
p2p: &Arc<super::P2PManager>,
) {
let libraries = nlm.libraries.read().await;
let library = libraries.get(&library_id).unwrap();
// libraries only connecting one-way atm
@@ -223,7 +269,7 @@ impl NetworkedLibraries {
};
let sync = sync.clone();
let p2p = self.p2p.clone();
let p2p = p2p.clone();
tokio::spawn(async move {
debug!(
@@ -250,14 +296,9 @@ impl NetworkedLibraries {
.unwrap();
tunnel.flush().await.unwrap();
while let Ok(SyncMessage::OperationsRequest(args)) =
SyncMessage::from_stream(&mut tunnel).await
while let Ok(rx::GetOperations(args)) =
rx::GetOperations::from_stream(&mut tunnel).await
{
// let args = match .unwrap() {
// => resp,
// _ => todo!("unreachable but proper error handling"),
// };
let ops = sync.get_ops(args).await.unwrap();
debug!(
@@ -266,7 +307,7 @@ impl NetworkedLibraries {
);
tunnel
.write_all(&SyncMessage::OperationsRequestResponse(ops).to_bytes())
.write_all(&tx::Operations(ops).to_bytes())
.await
.unwrap();
tunnel.flush().await.unwrap();
@@ -274,47 +315,85 @@ impl NetworkedLibraries {
});
}
}
}
// Ask the remote for operations and then ingest them
pub async fn request_ops(
&self,
tunnel: &mut Tunnel,
args: GetOpsArgs,
) -> Vec<sd_sync::CRDTOperation> {
tunnel
.write_all(&SyncMessage::OperationsRequest(args).to_bytes())
.await
.unwrap();
tunnel.flush().await.unwrap();
pub use responder::run as responder;
mod responder {
use super::*;
use originator::tx as rx;
let SyncMessage::OperationsRequestResponse(ops) = SyncMessage::from_stream(tunnel).await.unwrap() else {
todo!("unreachable but proper error handling")
};
pub mod tx {
use super::*;
// debug!("Received sync events response w/ id '{id}' from peer '{peer_id:?}' for library '{library_id:?}'");
pub struct GetOperations(pub GetOpsArgs);
ops
impl GetOperations {
// TODO: Per field errors for better error handling
pub async fn from_stream(
stream: &mut (impl AsyncRead + Unpin),
) -> std::io::Result<Self> {
Ok(Self(
// TODO: Error handling
rmp_serde::from_slice(&decode::buf(stream).await.unwrap()).unwrap(),
))
}
pub fn to_bytes(&self) -> Vec<u8> {
let Self(ops) = self;
let mut buf = vec![];
// TODO: Error handling
encode::buf(&mut buf, &rmp_serde::to_vec_named(&ops).unwrap());
buf
}
}
}
// TODO: Error handling
pub async fn exchange_sync_ops(
&self,
mut tunnel: Tunnel,
peer_id: &PeerId,
library_id: Uuid,
sync: &sync::Manager,
args: GetOpsArgs,
) {
let ops = sync.get_ops(args).await.unwrap();
pub async fn run(mut tunnel: Tunnel, library: Arc<Library>) {
let ingest = &library.sync.ingest;
debug!(
"Sending '{}' sync ops from peer '{peer_id:?}' for library '{library_id:?}'",
ops.len()
);
let Ok(mut rx) =
ingest.req_rx.try_lock() else {
return;
};
tunnel
.write_all(&SyncMessage::OperationsRequestResponse(ops).to_bytes())
ingest
.event_tx
.send(ingest::Event::Notification)
.await
.unwrap();
while let Some(req) = rx.recv().await {
use sync::ingest::*;
const OPS_PER_REQUEST: u32 = 100;
let Request::Messages { timestamps } = req else { continue };
tunnel
.write_all(
&tx::GetOperations(sync::GetOpsArgs {
clocks: timestamps,
count: OPS_PER_REQUEST,
})
.to_bytes(),
)
.await
.unwrap();
tunnel.flush().await.unwrap();
let rx::Operations(ops) = rx::Operations::from_stream(&mut tunnel).await.unwrap();
ingest
.event_tx
.send(Event::Messages(MessagesEvent {
instance_id: library.sync.instance,
has_more: ops.len() == OPS_PER_REQUEST as usize,
messages: ops,
}))
.await
.expect("TODO: Handle ingest channel closed, so we don't loose ops");
}
}
}

View File

@@ -1,13 +1,9 @@
use sd_core_sync::GetOpsArgs;
use sd_p2p::proto::{decode, encode};
use sd_sync::CRDTOperation;
use tokio::io::{AsyncRead, AsyncReadExt};
// will probs have more variants in future
#[derive(Debug, PartialEq, Eq)]
pub enum SyncMessage {
NewOperations,
OperationsRequest(GetOpsArgs),
OperationsRequestResponse(Vec<CRDTOperation>),
}
impl SyncMessage {
@@ -15,13 +11,6 @@ impl SyncMessage {
pub async fn from_stream(stream: &mut (impl AsyncRead + Unpin)) -> std::io::Result<Self> {
match stream.read_u8().await? {
b'N' => Ok(Self::NewOperations),
b'R' => Ok(Self::OperationsRequest(
rmp_serde::from_slice(&decode::buf(stream).await.unwrap()).unwrap(),
)),
b'P' => Ok(Self::OperationsRequestResponse(
// TODO: Error handling
rmp_serde::from_slice(&decode::buf(stream).await.unwrap()).unwrap(),
)),
header => Err(std::io::Error::new(
std::io::ErrorKind::InvalidData,
format!("Invalid sync message header: {}", (header as char)),
@@ -32,20 +21,6 @@ impl SyncMessage {
pub fn to_bytes(&self) -> Vec<u8> {
match self {
Self::NewOperations => vec![b'N'],
Self::OperationsRequest(args) => {
let mut buf = vec![b'R'];
// TODO: Error handling
encode::buf(&mut buf, &rmp_serde::to_vec_named(&args).unwrap());
buf
}
Self::OperationsRequestResponse(ops) => {
let mut buf = vec![b'P'];
// TODO: Error handling
encode::buf(&mut buf, &rmp_serde::to_vec_named(&ops).unwrap());
buf
}
}
}
}