diff --git a/src/gui/sniffer.rs b/src/gui/sniffer.rs index fa36a44c..b1fe7d42 100644 --- a/src/gui/sniffer.rs +++ b/src/gui/sniffer.rs @@ -48,7 +48,7 @@ use crate::mmdb::types::mmdb_reader::{MmdbReader, MmdbReaders}; use crate::networking::parse_packets::BackendTrafficMessage; use crate::networking::parse_packets::parse_packets; -use crate::networking::traffic_preview::traffic_preview; +use crate::networking::traffic_preview::{TrafficPreviews, traffic_preview}; use crate::networking::types::capture_context::{ CaptureContext, CaptureSource, CaptureSourcePicklist, MyPcapImport, }; @@ -128,7 +128,11 @@ pub struct Sniffer { /// Flag reporting whether the packet capture is frozen pub frozen: bool, /// Sender to freeze the packet capture - freeze_tx: Option>, + pub freeze_tx: Option>, + /// Sender to freeze the traffic preview + pub freeze_preview_tx: Option>, + /// Traffic previews for charts + pub traffic_previews: TrafficPreviews, } impl Sniffer { @@ -172,6 +176,8 @@ pub fn new(conf: Conf) -> Self { host_data_states: HostDataStates::default(), frozen: false, freeze_tx: None, + freeze_preview_tx: None, + traffic_previews: TrafficPreviews::default(), } } @@ -271,15 +277,22 @@ pub fn update(&mut self, message: Message) -> Task { match message { Message::StartApp(id) => { self.id = id; + let (tx, rx) = async_channel::unbounded(); + let (freeze_tx, freeze_rx) = tokio::sync::broadcast::channel(1_048_575); + let freeze_rx2 = freeze_tx.subscribe(); let _ = thread::Builder::new() .name("thread_traffic_preview".to_string()) .spawn(move || { - traffic_preview(); + traffic_preview(&tx, (freeze_rx, freeze_rx2)); }) .log_err(location!()); + self.freeze_preview_tx = Some(freeze_tx); return Task::batch([ Sniffer::register_sigint_handler(), Task::perform(set_newer_release_status(), Message::SetNewerReleaseStatus), + Task::run(rx, |traffic_preview| { + Message::TrafficPreview(traffic_preview) + }), ]); } Message::TickRun(cap_id, msg, host_msgs, no_more_packets) => { @@ -622,6 +635,9 @@ pub fn update(&mut self, message: Message) -> Task { let _ = tx.send(()); } } + Message::TrafficPreview(preview) => { + self.traffic_previews.refresh(preview); + } } Task::none() } diff --git a/src/gui/types/message.rs b/src/gui/types/message.rs index d54b8a2e..61ca326f 100644 --- a/src/gui/types/message.rs +++ b/src/gui/types/message.rs @@ -5,6 +5,7 @@ use crate::gui::pages::types::running_page::RunningPage; use crate::gui::pages::types::settings_page::SettingsPage; use crate::gui::styles::types::gradient_type::GradientType; +use crate::networking::traffic_preview::TrafficPreview; use crate::networking::types::capture_context::CaptureSourcePicklist; use crate::networking::types::data_representation::DataRepr; use crate::networking::types::host::{Host, HostMessage}; @@ -145,4 +146,6 @@ pub enum Message { RemoteNotificationsUrl(String), /// Pause or resume live capture Freeze, + /// Traffic preview + TrafficPreview(TrafficPreview), } diff --git a/src/networking/parse_packets.rs b/src/networking/parse_packets.rs index 8ab1b577..19ba6ef9 100644 --- a/src/networking/parse_packets.rs +++ b/src/networking/parse_packets.rs @@ -507,7 +507,7 @@ pub(super) fn packet_stream( tx: &std::sync::mpsc::SyncSender<(Result, Option)>, freeze_rx: &mut Receiver<()>, filters: &Filters, - dev_info: Option, + dev_info: Option<&DevInfo>, ) { loop { // check if we need to freeze the parsing @@ -524,7 +524,7 @@ pub(super) fn packet_stream( let packet_owned = packet_res.map(|p| PacketOwned { header: *p.header, data: p.data.into(), - dev_info: dev_info.clone(), + dev_info: dev_info.cloned(), }); if tx.send((packet_owned, cap.stats().ok())).is_err() { return; diff --git a/src/networking/traffic_preview.rs b/src/networking/traffic_preview.rs index f651df0f..3cb47b21 100644 --- a/src/networking/traffic_preview.rs +++ b/src/networking/traffic_preview.rs @@ -5,11 +5,35 @@ use crate::networking::types::my_device::MyDevice; use crate::networking::types::my_link_type::MyLinkType; use crate::utils::error_logger::{ErrorLogger, Location}; +use async_channel::Sender; use pcap::Device; +use std::collections::{HashMap, VecDeque}; use std::thread; use std::time::{Duration, Instant}; +use tokio::sync::broadcast::Receiver; -pub fn traffic_preview() { +#[derive(Default, Debug, Clone)] +pub struct TrafficPreview { + pub data: HashMap, +} + +#[derive(Default, Debug, Clone)] +pub struct TrafficPreviews { + pub data: HashMap>, +} + +impl TrafficPreviews { + pub fn refresh(&mut self, msg: TrafficPreview) { + for (dev, pkts) in msg.data { + self.data + .entry(dev) + .and_modify(|v| v.push_back(pkts)) + .or_insert(VecDeque::from([pkts])); + } + } +} + +pub fn traffic_preview(tx: &Sender, freeze_rxs: (Receiver<()>, Receiver<()>)) { let (freeze_tx, mut freeze_rx) = tokio::sync::broadcast::channel(1_048_575); let (pcap_tx, pcap_rx) = std::sync::mpsc::sync_channel(10_000); for dev in Device::list().unwrap_or_default() { @@ -28,21 +52,30 @@ pub fn traffic_preview() { let _ = thread::Builder::new() .name("thread_device_traffic_preview".to_string()) .spawn(move || { - packet_stream(cap, &pcap_tx, &mut freeze_rx, &Filters::default(), dev_info); + packet_stream( + cap, + &pcap_tx, + &mut freeze_rx, + &Filters::default(), + dev_info.as_ref(), + ); }) .log_err(location!()); } + let mut traffic_preview = TrafficPreview::default(); + let mut first_packet_ticks = None; + loop { // check if we need to freeze the parsing if freeze_rx.try_recv().is_ok() { // wait until unfreeze let _ = freeze_rx.blocking_recv(); // reset the first packet ticks - // first_packet_ticks = Some(Instant::now()); + first_packet_ticks = Some(Instant::now()); } - let (packet_res, cap_stats) = pcap_rx + let (packet_res, _) = pcap_rx .recv_timeout(Duration::from_millis(150)) .unwrap_or((Err(pcap::Error::TimeoutExpired), None)); @@ -50,13 +83,24 @@ pub fn traffic_preview() { // return; // } + maybe_send_traffic_preview(&mut traffic_preview, &mut first_packet_ticks, tx); + if let Ok(packet) = packet_res { let my_link_type = packet.dev_info.as_ref().unwrap().my_link_type; if get_sniffable_headers(&packet.data, my_link_type).is_some() { - println!( - "Received packet on {}", - packet.dev_info.as_ref().map_or("?", |d| d.name.as_str()), - ); + let Some(dev_info) = packet.dev_info else { + continue; + }; + + if first_packet_ticks.is_none() { + first_packet_ticks = Some(Instant::now()); + } + + traffic_preview + .data + .entry(dev_info.name) + .and_modify(|p| *p += 1) + .or_insert(1); } } } @@ -67,3 +111,15 @@ pub(super) struct DevInfo { name: String, my_link_type: MyLinkType, } + +fn maybe_send_traffic_preview( + traffic_preview: &mut TrafficPreview, + first_packet_ticks: &mut Option, + tx: &Sender, +) { + if first_packet_ticks.is_some_and(|i| i.elapsed() >= Duration::from_millis(1000)) { + *first_packet_ticks = + first_packet_ticks.and_then(|i| i.checked_add(Duration::from_millis(1000))); + let _ = tx.send_blocking(std::mem::take(traffic_preview)); + } +} diff --git a/src/networking/types/info_traffic.rs b/src/networking/types/info_traffic.rs index 4d9d16f4..896252b3 100644 --- a/src/networking/types/info_traffic.rs +++ b/src/networking/types/info_traffic.rs @@ -26,7 +26,7 @@ pub struct InfoTraffic { } impl InfoTraffic { - pub fn refresh(&mut self, msg: &mut InfoTraffic) { + pub fn refresh(&mut self, msg: &mut Self) { self.tot_data_info.refresh(msg.tot_data_info); self.dropped_packets = msg.dropped_packets;