diff --git a/src/client.rs b/src/client.rs index 76369ca..6bb2440 100644 --- a/src/client.rs +++ b/src/client.rs @@ -18,6 +18,7 @@ use std::{io, result::Result}; use crate::dbg_msg; use crate::oauth::{force_refresh_token, token_daemon, Oauth}; +use crate::p2p::ONLINE; use crate::server::RequestExt; use crate::utils::{format_url, Post}; @@ -466,6 +467,7 @@ pub async fn json(path: String, quarantine: bool) -> Result { if status.is_server_error() { Err("Reddit is having issues, check if there's an outage".to_string()) } else { + ONLINE.store(false, Ordering::SeqCst); err("Failed to parse page JSON data", e.to_string(), path) } } diff --git a/src/lib.rs b/src/lib.rs index 32ab892..1aa22c6 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -4,6 +4,7 @@ pub mod duplicates; pub mod instance_info; pub mod oauth; pub mod oauth_resources; +pub mod p2p; pub mod post; pub mod search; pub mod server; @@ -11,4 +12,3 @@ pub mod settings; pub mod subreddit; pub mod user; pub mod utils; -pub mod p2p; \ No newline at end of file diff --git a/src/main.rs b/src/main.rs index 47c4f96..f869d08 100644 --- a/src/main.rs +++ b/src/main.rs @@ -4,7 +4,9 @@ use cached::proc_macro::cached; use clap::{Arg, ArgAction, Command}; +use redlib::p2p::ONLINE; use std::str::FromStr; +use std::sync::atomic::Ordering; use futures_lite::FutureExt; use hyper::Uri; @@ -227,6 +229,16 @@ async fn main() { } } + // Manual overrides for online value + app.at("/force_offline").get(|_| { + ONLINE.store(false, Ordering::SeqCst); + resource("", "text/plain", false).boxed() + }); + app.at("/force_online").get(|_| { + ONLINE.store(true, Ordering::SeqCst); + resource("", "text/plain", false).boxed() + }); + // Read static files app.at("/style.css").get(|_| style().boxed()); app diff --git a/src/p2p.rs b/src/p2p.rs index 3457eb6..9708cb5 100644 --- a/src/p2p.rs +++ b/src/p2p.rs @@ -1,4 +1,7 @@ -use std::str::FromStr; +use std::{ + str::FromStr, + sync::atomic::{AtomicBool, Ordering}, +}; use bytes::Bytes; use dashmap::DashMap; @@ -6,19 +9,18 @@ use ed25519_dalek::Signature; use futures_lite::StreamExt; use iroh::{protocol::Router, Endpoint, NodeAddr, PublicKey, SecretKey}; use iroh_gossip::{ - net::{Event, Gossip, GossipEvent, GossipReceiver}, + net::{Event, Gossip, GossipEvent, GossipReceiver, GossipSender}, proto::TopicId, ALPN as GOSSIP_ALPN, }; use once_cell::sync::Lazy; use serde::{Deserialize, Serialize}; -use tokio::task; +use tokio::{task, time::sleep}; use crate::config; -static TICKET: &str = ""; - -static DASHMAP: Lazy> = Lazy::new(DashMap::new); +pub static DASHMAP: Lazy> = Lazy::new(DashMap::new); +pub static ONLINE: Lazy = Lazy::new(AtomicBool::default); pub async fn main() -> Result<(), Box> { let endpoint = Endpoint::builder().discovery_n0().bind().await?; @@ -27,8 +29,19 @@ pub async fn main() -> Result<(), Box> { let gossip = Gossip::builder().spawn(builder.endpoint().clone()).await?; let _router: Router = builder.accept(GOSSIP_ALPN, gossip.clone()).spawn().await?; - let (topic, peers) = { - let Ticket { topic, peers } = Ticket::from_str(TICKET)?; + // there are two ways to run the p2p chat + // 1. "bootstrap" mode - this requires REDLIB_P2P_BOOTSTRAP=true and REDLIB_P2P_TOPIC set to the topic ID you want to use + // in this mode, the node will create the topic and start a chat room + // 2. "join" mode - this requires REDLIB_P2P_BOOTSTRAP=false (or unset) and REDLIB_P2P_TICKET set to a ticket. + // in this mode, the node will join the existing chat room with the given topic ID, and connect to the peers listed + // in the ticket + let (topic, peers) = if std::env::var("REDLIB_P2P_BOOTSTRAP").unwrap_or_default() == "true" { + let topic = std::env::var("REDLIB_P2P_TOPIC").map(|s| TopicId::from_str(&s).unwrap()).unwrap(); + println!("> opening chat room for topic {topic}"); + (topic, vec![]) + } else { + let ticket_str = std::env::var("REDLIB_P2P_TICKET").expect("REDLIB_P2P_TICKET not set"); + let Ticket { topic, peers } = Ticket::from_str(&ticket_str)?; println!("> joining chat room for topic {topic}"); (topic, peers) }; @@ -53,15 +66,19 @@ pub async fn main() -> Result<(), Box> { let (sender, receiver) = gossip.subscribe_and_join(topic, peer_ids).await?.split(); println!("> connected!"); + let secret_key = endpoint.secret_key().clone(); + let message = Message { hostname: config::get_setting("REDLIB_FULL_URL").unwrap_or_default(), online: true, }; - let encoded_message = SignedMessage::sign_and_encode(endpoint.secret_key(), &message)?; + let encoded_message = SignedMessage::sign_and_encode(&secret_key, &message)?; sender.broadcast(encoded_message).await?; task::spawn(subscribe_loop(receiver)); + task::spawn(sender_loop(sender, secret_key)); + Ok(()) } @@ -81,6 +98,19 @@ async fn subscribe_loop(mut receiver: GossipReceiver) { } } +async fn sender_loop(sender: GossipSender, secret_key: SecretKey) { + loop { + let message = Message { + hostname: config::get_setting("REDLIB_FULL_URL").unwrap_or_default(), + online: ONLINE.load(Ordering::SeqCst), + }; + let encoded_message = SignedMessage::sign_and_encode(&secret_key, &message).unwrap(); + let _ = sender.broadcast(encoded_message).await; + + sleep(std::time::Duration::from_secs(10)).await; + } +} + #[derive(Debug, Serialize, Deserialize)] struct Ticket { topic: TopicId,