send traffic preview messages and update previews in Sniffer struct

This commit is contained in:
GyulyVGC
2025-12-13 11:47:16 +01:00
parent 9cfd5f1853
commit 60a3e9af8e
5 changed files with 89 additions and 14 deletions

View File

@@ -48,7 +48,7 @@
use crate::mmdb::types::mmdb_reader::{MmdbReader, MmdbReaders};
use crate::networking::parse_packets::BackendTrafficMessage;
use crate::networking::parse_packets::parse_packets;
use crate::networking::traffic_preview::traffic_preview;
use crate::networking::traffic_preview::{TrafficPreviews, traffic_preview};
use crate::networking::types::capture_context::{
CaptureContext, CaptureSource, CaptureSourcePicklist, MyPcapImport,
};
@@ -128,7 +128,11 @@ pub struct Sniffer {
/// Flag reporting whether the packet capture is frozen
pub frozen: bool,
/// Sender to freeze the packet capture
freeze_tx: Option<tokio::sync::broadcast::Sender<()>>,
pub freeze_tx: Option<tokio::sync::broadcast::Sender<()>>,
/// Sender to freeze the traffic preview
pub freeze_preview_tx: Option<tokio::sync::broadcast::Sender<()>>,
/// Traffic previews for charts
pub traffic_previews: TrafficPreviews,
}
impl Sniffer {
@@ -172,6 +176,8 @@ pub fn new(conf: Conf) -> Self {
host_data_states: HostDataStates::default(),
frozen: false,
freeze_tx: None,
freeze_preview_tx: None,
traffic_previews: TrafficPreviews::default(),
}
}
@@ -271,15 +277,22 @@ pub fn update(&mut self, message: Message) -> Task<Message> {
match message {
Message::StartApp(id) => {
self.id = id;
let (tx, rx) = async_channel::unbounded();
let (freeze_tx, freeze_rx) = tokio::sync::broadcast::channel(1_048_575);
let freeze_rx2 = freeze_tx.subscribe();
let _ = thread::Builder::new()
.name("thread_traffic_preview".to_string())
.spawn(move || {
traffic_preview();
traffic_preview(&tx, (freeze_rx, freeze_rx2));
})
.log_err(location!());
self.freeze_preview_tx = Some(freeze_tx);
return Task::batch([
Sniffer::register_sigint_handler(),
Task::perform(set_newer_release_status(), Message::SetNewerReleaseStatus),
Task::run(rx, |traffic_preview| {
Message::TrafficPreview(traffic_preview)
}),
]);
}
Message::TickRun(cap_id, msg, host_msgs, no_more_packets) => {
@@ -622,6 +635,9 @@ pub fn update(&mut self, message: Message) -> Task<Message> {
let _ = tx.send(());
}
}
Message::TrafficPreview(preview) => {
self.traffic_previews.refresh(preview);
}
}
Task::none()
}

View File

@@ -5,6 +5,7 @@
use crate::gui::pages::types::running_page::RunningPage;
use crate::gui::pages::types::settings_page::SettingsPage;
use crate::gui::styles::types::gradient_type::GradientType;
use crate::networking::traffic_preview::TrafficPreview;
use crate::networking::types::capture_context::CaptureSourcePicklist;
use crate::networking::types::data_representation::DataRepr;
use crate::networking::types::host::{Host, HostMessage};
@@ -145,4 +146,6 @@ pub enum Message {
RemoteNotificationsUrl(String),
/// Pause or resume live capture
Freeze,
/// Traffic preview
TrafficPreview(TrafficPreview),
}

View File

@@ -507,7 +507,7 @@ pub(super) fn packet_stream(
tx: &std::sync::mpsc::SyncSender<(Result<PacketOwned, pcap::Error>, Option<pcap::Stat>)>,
freeze_rx: &mut Receiver<()>,
filters: &Filters,
dev_info: Option<DevInfo>,
dev_info: Option<&DevInfo>,
) {
loop {
// check if we need to freeze the parsing
@@ -524,7 +524,7 @@ pub(super) fn packet_stream(
let packet_owned = packet_res.map(|p| PacketOwned {
header: *p.header,
data: p.data.into(),
dev_info: dev_info.clone(),
dev_info: dev_info.cloned(),
});
if tx.send((packet_owned, cap.stats().ok())).is_err() {
return;

View File

@@ -5,11 +5,35 @@
use crate::networking::types::my_device::MyDevice;
use crate::networking::types::my_link_type::MyLinkType;
use crate::utils::error_logger::{ErrorLogger, Location};
use async_channel::Sender;
use pcap::Device;
use std::collections::{HashMap, VecDeque};
use std::thread;
use std::time::{Duration, Instant};
use tokio::sync::broadcast::Receiver;
pub fn traffic_preview() {
#[derive(Default, Debug, Clone)]
pub struct TrafficPreview {
pub data: HashMap<String, u128>,
}
#[derive(Default, Debug, Clone)]
pub struct TrafficPreviews {
pub data: HashMap<String, VecDeque<u128>>,
}
impl TrafficPreviews {
pub fn refresh(&mut self, msg: TrafficPreview) {
for (dev, pkts) in msg.data {
self.data
.entry(dev)
.and_modify(|v| v.push_back(pkts))
.or_insert(VecDeque::from([pkts]));
}
}
}
pub fn traffic_preview(tx: &Sender<TrafficPreview>, freeze_rxs: (Receiver<()>, Receiver<()>)) {
let (freeze_tx, mut freeze_rx) = tokio::sync::broadcast::channel(1_048_575);
let (pcap_tx, pcap_rx) = std::sync::mpsc::sync_channel(10_000);
for dev in Device::list().unwrap_or_default() {
@@ -28,21 +52,30 @@ pub fn traffic_preview() {
let _ = thread::Builder::new()
.name("thread_device_traffic_preview".to_string())
.spawn(move || {
packet_stream(cap, &pcap_tx, &mut freeze_rx, &Filters::default(), dev_info);
packet_stream(
cap,
&pcap_tx,
&mut freeze_rx,
&Filters::default(),
dev_info.as_ref(),
);
})
.log_err(location!());
}
let mut traffic_preview = TrafficPreview::default();
let mut first_packet_ticks = None;
loop {
// check if we need to freeze the parsing
if freeze_rx.try_recv().is_ok() {
// wait until unfreeze
let _ = freeze_rx.blocking_recv();
// reset the first packet ticks
// first_packet_ticks = Some(Instant::now());
first_packet_ticks = Some(Instant::now());
}
let (packet_res, cap_stats) = pcap_rx
let (packet_res, _) = pcap_rx
.recv_timeout(Duration::from_millis(150))
.unwrap_or((Err(pcap::Error::TimeoutExpired), None));
@@ -50,13 +83,24 @@ pub fn traffic_preview() {
// return;
// }
maybe_send_traffic_preview(&mut traffic_preview, &mut first_packet_ticks, tx);
if let Ok(packet) = packet_res {
let my_link_type = packet.dev_info.as_ref().unwrap().my_link_type;
if get_sniffable_headers(&packet.data, my_link_type).is_some() {
println!(
"Received packet on {}",
packet.dev_info.as_ref().map_or("?", |d| d.name.as_str()),
);
let Some(dev_info) = packet.dev_info else {
continue;
};
if first_packet_ticks.is_none() {
first_packet_ticks = Some(Instant::now());
}
traffic_preview
.data
.entry(dev_info.name)
.and_modify(|p| *p += 1)
.or_insert(1);
}
}
}
@@ -67,3 +111,15 @@ pub(super) struct DevInfo {
name: String,
my_link_type: MyLinkType,
}
fn maybe_send_traffic_preview(
traffic_preview: &mut TrafficPreview,
first_packet_ticks: &mut Option<Instant>,
tx: &Sender<TrafficPreview>,
) {
if first_packet_ticks.is_some_and(|i| i.elapsed() >= Duration::from_millis(1000)) {
*first_packet_ticks =
first_packet_ticks.and_then(|i| i.checked_add(Duration::from_millis(1000)));
let _ = tx.send_blocking(std::mem::take(traffic_preview));
}
}

View File

@@ -26,7 +26,7 @@ pub struct InfoTraffic {
}
impl InfoTraffic {
pub fn refresh(&mut self, msg: &mut InfoTraffic) {
pub fn refresh(&mut self, msg: &mut Self) {
self.tot_data_info.refresh(msg.tot_data_info);
self.dropped_packets = msg.dropped_packets;