From 5bc234ae8748979ca6bdcf80e8ee7f7fcef159bd Mon Sep 17 00:00:00 2001 From: Oscar Beaumont Date: Tue, 11 Apr 2023 23:05:50 +0800 Subject: [PATCH] [ENG-414] P2P Shutdown (#688) * `Event::Shutdown` in P2P system * Mdns shutdown broadcast + await shutdown * Fix mdns service name & prevent poll after shutdown --- core/src/lib.rs | 1 + core/src/p2p/p2p_manager.rs | 17 +++++++++++--- crates/p2p/examples/basic.rs | 15 +++++++++++-- crates/p2p/src/event.rs | 2 ++ crates/p2p/src/manager.rs | 18 ++++++++++++++- crates/p2p/src/manager_stream.rs | 38 +++++++++++++++++++++++++++----- crates/p2p/src/mdns.rs | 23 +++++++++++++++++++ 7 files changed, 103 insertions(+), 11 deletions(-) diff --git a/core/src/lib.rs b/core/src/lib.rs index 8f022122b..225f669a5 100644 --- a/core/src/lib.rs +++ b/core/src/lib.rs @@ -225,6 +225,7 @@ impl Node { pub async fn shutdown(&self) { info!("Spacedrive shutting down..."); self.jobs.pause().await; + self.p2p.shutdown().await; info!("Spacedrive Core shutdown successful!"); } diff --git a/core/src/p2p/p2p_manager.rs b/core/src/p2p/p2p_manager.rs index 54b40192d..ac5beecac 100644 --- a/core/src/p2p/p2p_manager.rs +++ b/core/src/p2p/p2p_manager.rs @@ -71,6 +71,7 @@ impl P2PManager { let events = tx.clone(); async move { + let mut shutdown = false; while let Some(event) = stream.next().await { match event { Event::PeerDiscovered(event) => { @@ -135,13 +136,19 @@ impl P2PManager { } }); } + Event::Shutdown => { + shutdown = true; + break; + } _ => debug!("event: {:?}", event), } } - error!( - "Manager event stream closed! The core is unstable from this point forward!" - ); + if !shutdown { + error!( + "Manager event stream closed! The core is unstable from this point forward!" + ); + } } }); @@ -290,4 +297,8 @@ impl P2PManager { i.elapsed() ); } + + pub async fn shutdown(&self) { + self.manager.shutdown().await; + } } diff --git a/crates/p2p/examples/basic.rs b/crates/p2p/examples/basic.rs index ff31354a8..b1fdd79b8 100644 --- a/crates/p2p/examples/basic.rs +++ b/crates/p2p/examples/basic.rs @@ -59,6 +59,7 @@ async fn main() { ); tokio::spawn(async move { + let mut shutdown = false; // Your application must keeping poll this stream to keep the P2P system running while let Some(event) = stream.next().await { match event { @@ -93,15 +94,23 @@ async fn main() { } }); } + Event::Shutdown => { + info!("Manager shutdown!"); + shutdown = true; + break; + } _ => debug!("event: {:?}", event), } } - error!("Manager event stream closed! The core is unstable from this point forward!"); - // process.exit(1); // TODO: Should I? + if !shutdown { + error!("Manager event stream closed! The core is unstable from this point forward!"); + // process.exit(1); // TODO: Should I? + } }); if env::var("PING").as_deref() != Ok("skip") { + let manager = manager.clone(); tokio::spawn(async move { sleep(Duration::from_millis(500)).await; @@ -125,4 +134,6 @@ async fn main() { // https://docs.rs/system_shutdown/latest/system_shutdown/ tokio::time::sleep(Duration::from_secs(100)).await; + + manager.shutdown().await; // It is super highly recommended to shutdown the manager before exiting your application so an Mdns update can be broadcasted } diff --git a/crates/p2p/src/event.rs b/crates/p2p/src/event.rs index 1382c0350..0255cbcda 100644 --- a/crates/p2p/src/event.rs +++ b/crates/p2p/src/event.rs @@ -32,6 +32,8 @@ pub enum Event { /// the peer has opened a new substream #[cfg_attr(any(feature = "serde", feature = "specta"), serde(skip))] PeerMessage(PeerMessageEvent), + /// the node is shutting down + Shutdown, } #[derive(Debug)] diff --git a/crates/p2p/src/manager.rs b/crates/p2p/src/manager.rs index ab9cc3c15..d0f499839 100644 --- a/crates/p2p/src/manager.rs +++ b/crates/p2p/src/manager.rs @@ -1,4 +1,8 @@ -use std::{collections::HashSet, net::SocketAddr, sync::Arc}; +use std::{ + collections::HashSet, + net::SocketAddr, + sync::{atomic::AtomicBool, Arc}, +}; use libp2p::{core::muxing::StreamMuxerBox, quic, Swarm, Transport}; use thiserror::Error; @@ -79,6 +83,7 @@ impl Manager { swarm, mdns, queued_events: Default::default(), + shutdown: AtomicBool::new(false), }, )) } @@ -131,6 +136,17 @@ impl Manager { pub async fn broadcast(&self, data: Vec) { self.emit(ManagerStreamAction::BroadcastData(data)).await; } + + pub async fn shutdown(&self) { + let (tx, rx) = oneshot::channel(); + self.event_stream_tx + .send(ManagerStreamAction::Shutdown(tx)) + .await + .unwrap(); + rx.await.unwrap_or_else(|_| { + warn!("Error receiving shutdown signal to P2P Manager!"); + }); // Await shutdown so we don't kill the app before the Mdns broadcast + } } #[derive(Error, Debug)] diff --git a/crates/p2p/src/manager_stream.rs b/crates/p2p/src/manager_stream.rs index 1c45c74c2..1eb3b48a1 100644 --- a/crates/p2p/src/manager_stream.rs +++ b/crates/p2p/src/manager_stream.rs @@ -1,4 +1,12 @@ -use std::{collections::VecDeque, fmt, net::SocketAddr, sync::Arc}; +use std::{ + collections::VecDeque, + fmt, + net::SocketAddr, + sync::{ + atomic::{AtomicBool, Ordering}, + Arc, + }, +}; use libp2p::{ futures::StreamExt, @@ -9,7 +17,7 @@ use libp2p::{ Swarm, }; use tokio::sync::{mpsc, oneshot}; -use tracing::{debug, error, warn}; +use tracing::{debug, error, info, warn}; use crate::{ quic_multiaddr_to_socketaddr, socketaddr_to_quic_multiaddr, @@ -32,6 +40,8 @@ pub enum ManagerStreamAction { StartStream(PeerId, oneshot::Sender), /// TODO BroadcastData(Vec), + /// the node is shutting down. The `ManagerStream` should convert this into `Event::Shutdown` + Shutdown(oneshot::Sender<()>), } impl fmt::Debug for ManagerStreamAction { @@ -56,6 +66,7 @@ where pub(crate) swarm: Swarm>, pub(crate) mdns: Mdns, pub(crate) queued_events: VecDeque>, + pub(crate) shutdown: AtomicBool, } impl ManagerStream @@ -66,6 +77,10 @@ where pub async fn next(&mut self) -> Option> { // We loop polling internal services until an event comes in that needs to be sent to the parent application. loop { + if self.shutdown.load(Ordering::Relaxed) { + panic!("`ManagerStream::next` called after shutdown event. This is a mistake in your application code!"); + } + if let Some(event) = self.queued_events.pop_front() { return Some(event); } @@ -79,14 +94,18 @@ where }, event = self.event_stream_rx.recv() => { // If the sender has shut down we return `None` to also shut down too. - if let Some(event) = self.handle_manager_stream_action(event?) { + if let Some(event) = self.handle_manager_stream_action(event?).await { return Some(event); } } event = self.swarm.select_next_some() => { match event { SwarmEvent::Behaviour(event) => { - if let Some(event) = self.handle_manager_stream_action(event) { + if let Some(event) = self.handle_manager_stream_action(event).await { + if let Event::Shutdown { .. } = event { + self.shutdown.store(true, Ordering::Relaxed); + } + return Some(event); } }, @@ -149,7 +168,7 @@ where } } - fn handle_manager_stream_action( + async fn handle_manager_stream_action( &mut self, event: ManagerStreamAction, ) -> Option> { @@ -205,6 +224,15 @@ where }); } } + ManagerStreamAction::Shutdown(tx) => { + info!("Shutting down P2P Manager..."); + self.mdns.shutdown().await; + tx.send(()).unwrap_or_else(|_| { + warn!("Error sending shutdown signal to P2P Manager!"); + }); + + return Some(Event::Shutdown); + } } None diff --git a/crates/p2p/src/mdns.rs b/crates/p2p/src/mdns.rs index 0c1891bf3..53661bcf8 100644 --- a/crates/p2p/src/mdns.rs +++ b/crates/p2p/src/mdns.rs @@ -257,4 +257,27 @@ where Box::pin(sleep_until(Instant::now() + Duration::from_millis(200))); } } + + pub async fn shutdown(&self) { + match self + .mdns_daemon + .unregister(&format!("{}.{}", self.peer_id, self.service_name)) + .map(|chan| chan.recv()) + { + Ok(Ok(_)) => {} + Ok(Err(err)) => { + warn!( + "shutdown error recieving shutdown status from mdns service: {}", + err + ); + } + Err(err) => { + warn!("shutdown error unregistering mdns service: {}", err); + } + } + + self.mdns_daemon.shutdown().unwrap_or_else(|err| { + error!("shutdown error shutting down mdns daemon: {}", err); + }); + } }