[ENG-414] P2P Shutdown (#688)

* `Event::Shutdown` in P2P system

* Mdns shutdown broadcast + await shutdown

* Fix mdns service name & prevent poll after shutdown
This commit is contained in:
Oscar Beaumont
2023-04-11 23:05:50 +08:00
committed by GitHub
parent b1d619c42f
commit 5bc234ae87
7 changed files with 103 additions and 11 deletions

View File

@@ -225,6 +225,7 @@ impl Node {
pub async fn shutdown(&self) {
info!("Spacedrive shutting down...");
self.jobs.pause().await;
self.p2p.shutdown().await;
info!("Spacedrive Core shutdown successful!");
}

View File

@@ -71,6 +71,7 @@ impl P2PManager {
let events = tx.clone();
async move {
let mut shutdown = false;
while let Some(event) = stream.next().await {
match event {
Event::PeerDiscovered(event) => {
@@ -135,13 +136,19 @@ impl P2PManager {
}
});
}
Event::Shutdown => {
shutdown = true;
break;
}
_ => debug!("event: {:?}", event),
}
}
error!(
"Manager event stream closed! The core is unstable from this point forward!"
);
if !shutdown {
error!(
"Manager event stream closed! The core is unstable from this point forward!"
);
}
}
});
@@ -290,4 +297,8 @@ impl P2PManager {
i.elapsed()
);
}
pub async fn shutdown(&self) {
self.manager.shutdown().await;
}
}

View File

@@ -59,6 +59,7 @@ async fn main() {
);
tokio::spawn(async move {
let mut shutdown = false;
// Your application must keeping poll this stream to keep the P2P system running
while let Some(event) = stream.next().await {
match event {
@@ -93,15 +94,23 @@ async fn main() {
}
});
}
Event::Shutdown => {
info!("Manager shutdown!");
shutdown = true;
break;
}
_ => debug!("event: {:?}", event),
}
}
error!("Manager event stream closed! The core is unstable from this point forward!");
// process.exit(1); // TODO: Should I?
if !shutdown {
error!("Manager event stream closed! The core is unstable from this point forward!");
// process.exit(1); // TODO: Should I?
}
});
if env::var("PING").as_deref() != Ok("skip") {
let manager = manager.clone();
tokio::spawn(async move {
sleep(Duration::from_millis(500)).await;
@@ -125,4 +134,6 @@ async fn main() {
// https://docs.rs/system_shutdown/latest/system_shutdown/
tokio::time::sleep(Duration::from_secs(100)).await;
manager.shutdown().await; // It is super highly recommended to shutdown the manager before exiting your application so an Mdns update can be broadcasted
}

View File

@@ -32,6 +32,8 @@ pub enum Event<TMetadata: Metadata> {
/// the peer has opened a new substream
#[cfg_attr(any(feature = "serde", feature = "specta"), serde(skip))]
PeerMessage(PeerMessageEvent<TMetadata>),
/// the node is shutting down
Shutdown,
}
#[derive(Debug)]

View File

@@ -1,4 +1,8 @@
use std::{collections::HashSet, net::SocketAddr, sync::Arc};
use std::{
collections::HashSet,
net::SocketAddr,
sync::{atomic::AtomicBool, Arc},
};
use libp2p::{core::muxing::StreamMuxerBox, quic, Swarm, Transport};
use thiserror::Error;
@@ -79,6 +83,7 @@ impl<TMetadata: Metadata> Manager<TMetadata> {
swarm,
mdns,
queued_events: Default::default(),
shutdown: AtomicBool::new(false),
},
))
}
@@ -131,6 +136,17 @@ impl<TMetadata: Metadata> Manager<TMetadata> {
pub async fn broadcast(&self, data: Vec<u8>) {
self.emit(ManagerStreamAction::BroadcastData(data)).await;
}
pub async fn shutdown(&self) {
let (tx, rx) = oneshot::channel();
self.event_stream_tx
.send(ManagerStreamAction::Shutdown(tx))
.await
.unwrap();
rx.await.unwrap_or_else(|_| {
warn!("Error receiving shutdown signal to P2P Manager!");
}); // Await shutdown so we don't kill the app before the Mdns broadcast
}
}
#[derive(Error, Debug)]

View File

@@ -1,4 +1,12 @@
use std::{collections::VecDeque, fmt, net::SocketAddr, sync::Arc};
use std::{
collections::VecDeque,
fmt,
net::SocketAddr,
sync::{
atomic::{AtomicBool, Ordering},
Arc,
},
};
use libp2p::{
futures::StreamExt,
@@ -9,7 +17,7 @@ use libp2p::{
Swarm,
};
use tokio::sync::{mpsc, oneshot};
use tracing::{debug, error, warn};
use tracing::{debug, error, info, warn};
use crate::{
quic_multiaddr_to_socketaddr, socketaddr_to_quic_multiaddr,
@@ -32,6 +40,8 @@ pub enum ManagerStreamAction<TMetadata: Metadata> {
StartStream(PeerId, oneshot::Sender<UnicastStream>),
/// TODO
BroadcastData(Vec<u8>),
/// the node is shutting down. The `ManagerStream` should convert this into `Event::Shutdown`
Shutdown(oneshot::Sender<()>),
}
impl<TMetadata: Metadata> fmt::Debug for ManagerStreamAction<TMetadata> {
@@ -56,6 +66,7 @@ where
pub(crate) swarm: Swarm<SpaceTime<TMetadata>>,
pub(crate) mdns: Mdns<TMetadata>,
pub(crate) queued_events: VecDeque<Event<TMetadata>>,
pub(crate) shutdown: AtomicBool,
}
impl<TMetadata> ManagerStream<TMetadata>
@@ -66,6 +77,10 @@ where
pub async fn next(&mut self) -> Option<Event<TMetadata>> {
// We loop polling internal services until an event comes in that needs to be sent to the parent application.
loop {
if self.shutdown.load(Ordering::Relaxed) {
panic!("`ManagerStream::next` called after shutdown event. This is a mistake in your application code!");
}
if let Some(event) = self.queued_events.pop_front() {
return Some(event);
}
@@ -79,14 +94,18 @@ where
},
event = self.event_stream_rx.recv() => {
// If the sender has shut down we return `None` to also shut down too.
if let Some(event) = self.handle_manager_stream_action(event?) {
if let Some(event) = self.handle_manager_stream_action(event?).await {
return Some(event);
}
}
event = self.swarm.select_next_some() => {
match event {
SwarmEvent::Behaviour(event) => {
if let Some(event) = self.handle_manager_stream_action(event) {
if let Some(event) = self.handle_manager_stream_action(event).await {
if let Event::Shutdown { .. } = event {
self.shutdown.store(true, Ordering::Relaxed);
}
return Some(event);
}
},
@@ -149,7 +168,7 @@ where
}
}
fn handle_manager_stream_action(
async fn handle_manager_stream_action(
&mut self,
event: ManagerStreamAction<TMetadata>,
) -> Option<Event<TMetadata>> {
@@ -205,6 +224,15 @@ where
});
}
}
ManagerStreamAction::Shutdown(tx) => {
info!("Shutting down P2P Manager...");
self.mdns.shutdown().await;
tx.send(()).unwrap_or_else(|_| {
warn!("Error sending shutdown signal to P2P Manager!");
});
return Some(Event::Shutdown);
}
}
None

View File

@@ -257,4 +257,27 @@ where
Box::pin(sleep_until(Instant::now() + Duration::from_millis(200)));
}
}
pub async fn shutdown(&self) {
match self
.mdns_daemon
.unregister(&format!("{}.{}", self.peer_id, self.service_name))
.map(|chan| chan.recv())
{
Ok(Ok(_)) => {}
Ok(Err(err)) => {
warn!(
"shutdown error recieving shutdown status from mdns service: {}",
err
);
}
Err(err) => {
warn!("shutdown error unregistering mdns service: {}", err);
}
}
self.mdns_daemon.shutdown().unwrap_or_else(|err| {
error!("shutdown error shutting down mdns daemon: {}", err);
});
}
}