restructuring backend for traffic previews (WIP)

This commit is contained in:
GyulyVGC
2025-12-18 01:10:48 +01:00
parent 36b63021e5
commit 71d0cb4b77
6 changed files with 77 additions and 87 deletions

View File

@@ -50,7 +50,7 @@ pub fn update_charts_data(&mut self, packets: u128) {
}
pub fn view(&self) -> Element<'_, Message, StyleType> {
Column::new().height(50).push(ChartWidget::new(self)).into()
Column::new().height(40).push(ChartWidget::new(self)).into()
}
pub fn change_style(&mut self, style: StyleType) {

View File

@@ -4,7 +4,6 @@
use std::fmt::Write;
use crate::chart::types::preview_chart::PreviewChart;
use crate::gui::components::button::button_open_file;
use crate::gui::sniffer::Sniffer;
use crate::gui::styles::button::ButtonType;
@@ -178,7 +177,8 @@ fn get_col_adapter(
language: Language,
) -> Column<'_, Message, StyleType> {
let mut dev_str_list = vec![];
for my_dev in &sniffer.my_devices {
// TODO: do not iterate here
for (_, (my_dev, _)) in &sniffer.preview_charts {
let mut title = String::new();
#[allow(unused_mut)]
let mut subtitle: Option<&String> = None;
@@ -233,7 +233,7 @@ fn get_col_adapter(
} else {
Some(Text::new(addrs).font(font))
};
let preview_chart = sniffer.preview_charts.get(name);
let my_device_chart = sniffer.preview_charts.get(name);
scroll_adapters.push(
Button::new(
Column::new()
@@ -246,7 +246,7 @@ fn get_col_adapter(
)
.push_maybe(subtitle.map(|sub| Text::new(sub).font(font)))
.push_maybe(addrs_text)
.push_maybe(preview_chart.map(PreviewChart::view)),
.push_maybe(my_device_chart.map(|(_, c)| c.view())),
)
.padding([20, 30])
.width(Length::Fill)

View File

@@ -9,7 +9,6 @@
use iced::widget::Column;
use iced::window::{Id, Level};
use iced::{Element, Point, Size, Subscription, Task, window};
use pcap::Device;
use rfd::FileHandle;
use std::collections::{HashMap, HashSet, VecDeque};
use std::net::IpAddr;
@@ -96,8 +95,6 @@ pub struct Sniffer {
pub newer_release_available: Option<bool>,
/// Network device to be analyzed, or PCAP file to be imported
pub capture_source: CaptureSource,
/// List of network devices
pub my_devices: Vec<MyDevice>,
/// Signals if a pcap error occurred
pub pcap_error: Option<String>,
/// Messages status
@@ -105,7 +102,8 @@ pub struct Sniffer {
/// Traffic chart displayed in the Overview page
pub traffic_chart: TrafficChart,
/// Traffic preview charts displayed in the initial page
pub preview_charts: HashMap<String, PreviewChart>,
// TODO: make this a Vec<(MyDevice, PreviewChart)> to keep the order of the devices consistent
pub preview_charts: HashMap<String, (MyDevice, PreviewChart)>,
/// Currently displayed modal; None if no modal is displayed
pub modal: Option<MyModal>,
/// Currently displayed settings page; None if settings is closed
@@ -157,7 +155,6 @@ pub fn new(conf: Conf) -> Self {
logged_notifications: (VecDeque::new(), 0),
newer_release_available: None,
capture_source,
my_devices: Vec::new(),
pcap_error: None,
dots_pulse: (".".to_string(), 0),
traffic_chart: TrafficChart::new(style, language, data_repr),
@@ -603,7 +600,7 @@ pub fn update(&mut self, message: Message) -> Task<Message> {
}
Message::Periodic => {
self.update_waiting_dots();
self.fetch_devices();
self.capture_source.set_addresses();
self.update_threshold();
}
Message::ExpandNotification(id, expand) => {
@@ -637,16 +634,18 @@ pub fn update(&mut self, message: Message) -> Task<Message> {
}
}
Message::TrafficPreview(msg) => {
// TODO: remove or hide inactive devices
for (dev, packets) in msg.data {
self.preview_charts
.entry(dev)
.and_modify(|chart| {
.entry(dev.get_name().clone())
.and_modify(|(my_dev, chart)| {
*my_dev = dev.clone();
chart.update_charts_data(packets);
})
.or_insert({
let mut chart = PreviewChart::new(self.conf.settings.style);
chart.update_charts_data(packets);
chart
(dev, chart)
});
}
}
@@ -911,7 +910,7 @@ fn reset(&mut self) {
}
fn set_device(&mut self, name: &str) {
for my_dev in &self.my_devices {
for (_, (my_dev, _)) in &self.preview_charts {
if my_dev.get_name().eq(&name) {
self.conf.device.device_name = name.to_string();
self.capture_source = CaptureSource::Device(my_dev.clone());
@@ -920,15 +919,6 @@ fn set_device(&mut self, name: &str) {
}
}
fn fetch_devices(&mut self) {
self.my_devices.clear();
self.capture_source.set_addresses();
for dev in Device::list().log_err(location!()).unwrap_or_default() {
let my_dev = MyDevice::from_pcap_device(dev);
self.my_devices.push(my_dev);
}
}
fn update_waiting_dots(&mut self) {
if self.dots_pulse.0.len() > 2 {
self.dots_pulse.0 = String::new();
@@ -1187,7 +1177,7 @@ pub fn is_capture_source_consistent(&self) -> bool {
fn change_charts_style(&mut self) {
let style = self.conf.settings.style;
self.traffic_chart.change_style(style);
for chart in self.preview_charts.values_mut() {
for (_, chart) in self.preview_charts.values_mut() {
chart.change_style(style);
}
}

View File

@@ -1,30 +1,82 @@
use crate::gui::types::filters::Filters;
use crate::location;
use crate::networking::parse_packets::{get_sniffable_headers, packet_stream};
use crate::networking::parse_packets::{PacketOwned, get_sniffable_headers, packet_stream};
use crate::networking::types::capture_context::{CaptureContext, CaptureSource};
use crate::networking::types::my_device::MyDevice;
use crate::networking::types::my_link_type::MyLinkType;
use crate::utils::error_logger::{ErrorLogger, Location};
use async_channel::Sender;
use pcap::Device;
use pcap::{Device, Stat};
use std::collections::HashMap;
use std::thread;
use std::time::{Duration, Instant};
use tokio::sync::broadcast::Receiver;
#[derive(Default, Debug, Clone)]
#[derive(Default, Clone, Debug)]
pub struct TrafficPreview {
pub data: HashMap<String, u128>,
pub data: Vec<(MyDevice, u128)>,
}
pub fn traffic_preview(tx: &Sender<TrafficPreview>, freeze_rxs: (Receiver<()>, Receiver<()>)) {
let mut ticks = Instant::now();
let (freeze_tx, mut freeze_rx) = tokio::sync::broadcast::channel(1_048_575);
let (pcap_tx, pcap_rx) = std::sync::mpsc::sync_channel(10_000);
let mut data = HashMap::new();
handle_devices_and_previews(&mut data, tx, freeze_tx.clone(), pcap_tx.clone());
loop {
// check if we need to freeze the parsing
if freeze_rx.try_recv().is_ok() {
// wait until unfreeze
let _ = freeze_rx.blocking_recv();
// reset the first packet ticks
// first_packet_ticks = Instant::now();
}
let (packet_res, _) = pcap_rx
.recv_timeout(Duration::from_millis(150))
.unwrap_or((Err(pcap::Error::TimeoutExpired), None));
if ticks.elapsed() >= Duration::from_millis(1000) {
ticks = ticks
.checked_add(Duration::from_millis(1000))
.unwrap_or(Instant::now());
handle_devices_and_previews(&mut data, tx, freeze_tx.clone(), pcap_tx.clone());
}
if let Ok(packet) = packet_res {
let my_link_type = packet.dev_info.as_ref().unwrap().my_link_type;
if get_sniffable_headers(&packet.data, my_link_type).is_some() {
let Some(dev_info) = packet.dev_info else {
continue;
};
data.entry(dev_info.name)
.and_modify(|p| *p += 1)
.or_insert(1);
}
}
}
}
fn handle_devices_and_previews(
data: &mut HashMap<String, u128>,
tx: &Sender<TrafficPreview>,
freeze_tx: tokio::sync::broadcast::Sender<()>,
pcap_tx: std::sync::mpsc::SyncSender<(Result<PacketOwned, pcap::Error>, Option<Stat>)>,
) {
let mut traffic_preview = TrafficPreview::default();
for dev in Device::list().unwrap_or_default() {
let mut freeze_rx = freeze_tx.subscribe();
let pcap_tx = pcap_tx.clone();
let dev_name = dev.name.clone();
let my_dev = MyDevice::from_pcap_device(dev);
if let Some(n) = data.get(&dev_name) {
traffic_preview.data.push((my_dev, *n));
continue;
}
data.insert(dev_name.clone(), 0);
let mut freeze_rx = freeze_tx.subscribe();
let pcap_tx = pcap_tx.clone();
let capture_source = CaptureSource::Device(my_dev);
let capture_context = CaptureContext::new(&capture_source, None, &Filters::default());
let my_link_type = capture_context.my_link_type();
@@ -46,48 +98,8 @@ pub fn traffic_preview(tx: &Sender<TrafficPreview>, freeze_rxs: (Receiver<()>, R
})
.log_err(location!());
}
let mut traffic_preview = TrafficPreview::default();
let mut first_packet_ticks = None;
loop {
// check if we need to freeze the parsing
if freeze_rx.try_recv().is_ok() {
// wait until unfreeze
let _ = freeze_rx.blocking_recv();
// reset the first packet ticks
first_packet_ticks = Some(Instant::now());
}
let (packet_res, _) = pcap_rx
.recv_timeout(Duration::from_millis(150))
.unwrap_or((Err(pcap::Error::TimeoutExpired), None));
// if tx.is_closed() {
// return;
// }
maybe_send_traffic_preview(&mut traffic_preview, &mut first_packet_ticks, tx);
if let Ok(packet) = packet_res {
let my_link_type = packet.dev_info.as_ref().unwrap().my_link_type;
if get_sniffable_headers(&packet.data, my_link_type).is_some() {
let Some(dev_info) = packet.dev_info else {
continue;
};
if first_packet_ticks.is_none() {
first_packet_ticks = Some(Instant::now());
}
traffic_preview
.data
.entry(dev_info.name)
.and_modify(|p| *p += 1)
.or_insert(1);
}
}
}
let _ = tx.send_blocking(traffic_preview);
data.iter_mut().for_each(|(_, v)| *v = 0);
}
#[derive(Clone)]
@@ -95,15 +107,3 @@ pub(super) struct DevInfo {
name: String,
my_link_type: MyLinkType,
}
fn maybe_send_traffic_preview(
traffic_preview: &mut TrafficPreview,
first_packet_ticks: &mut Option<Instant>,
tx: &Sender<TrafficPreview>,
) {
if first_packet_ticks.is_some_and(|i| i.elapsed() >= Duration::from_millis(1000)) {
*first_packet_ticks =
first_packet_ticks.and_then(|i| i.checked_add(Duration::from_millis(1000)));
let _ = tx.send_blocking(std::mem::take(traffic_preview));
}
}

View File

@@ -5,7 +5,7 @@
/// Represents the current inspected device.
/// Used to keep in sync the device addresses in case of changes
/// (e.g., device not connected to the internet acquires new IP address)
#[derive(Clone)]
#[derive(Clone, Debug)]
pub struct MyDevice {
name: String,
desc: Option<String>,

View File

@@ -4,7 +4,7 @@
use crate::translations::translations_3::link_type_translation;
/// Currently supported link types
#[derive(Copy, Clone, Default)]
#[derive(Copy, Clone, Default, Debug)]
pub enum MyLinkType {
Null(Linktype),
Ethernet(Linktype),