use asynchronous channel to update app state from backend (WIP)

This commit is contained in:
GyulyVGC
2025-05-13 00:59:30 +02:00
parent 0043ba5e39
commit da54f97d45
19 changed files with 340 additions and 205 deletions

1
Cargo.lock generated
View File

@@ -4690,6 +4690,7 @@ dependencies = [
name = "sniffnet"
version = "1.3.2"
dependencies = [
"async-channel",
"chrono",
"clap",
"confy",

View File

@@ -56,6 +56,7 @@ phf_shared = "0.11.3"
splines = "4.4.2"
clap = { version = "4.5.37", features = ["derive"] }
tokio = "1.44.2"
async-channel = "2.3.1"
[target.'cfg(windows)'.dependencies]
gag = "1.0.0"

View File

@@ -65,22 +65,18 @@ fn page_content<'a>(sniffer: &Sniffer, key: &AddressPortPair) -> Container<'a, M
let font = style.get_extension().font;
let font_headers = style.get_extension().font_headers;
let info_traffic_lock = sniffer.info_traffic.lock().unwrap();
let val = info_traffic_lock
let info_traffic = &sniffer.info_traffic;
let val = info_traffic
.map
.get(key)
.unwrap_or(&InfoAddressPortPair::default())
.clone();
let address_to_lookup = get_address_to_lookup(key, val.traffic_direction);
let host_option = info_traffic_lock
.addresses_resolved
.get(&address_to_lookup)
.cloned();
let host_info_option = info_traffic_lock
let host_option = sniffer.addresses_resolved.get(&address_to_lookup).cloned();
let host_info_option = info_traffic
.hosts
.get(&host_option.clone().unwrap_or_default().1)
.copied();
drop(info_traffic_lock);
let header_and_content = Column::new().width(Length::Fill).push(page_header(
font,

View File

@@ -98,7 +98,7 @@ pub fn overview_page(sniffer: &Sniffer) -> Container<Message, StyleType> {
move |_| lazy_col_info(sniffer),
);
let num_favorites = sniffer.info_traffic.lock().unwrap().favorite_hosts.len();
let num_favorites = sniffer.info_traffic.favorite_hosts.len();
let container_report = lazy(
(
filtered,

View File

@@ -1,6 +1,5 @@
use std::cmp::min;
use std::net::IpAddr;
use std::sync::Mutex;
use iced::widget::{Column, Container, Row, Rule, Space, Text, lazy, vertical_space};
use iced::{Alignment, Font, Length};
@@ -40,7 +39,7 @@ pub fn thumbnail_page(sniffer: &Sniffer) -> Container<Message, StyleType> {
.align_x(Alignment::Center);
}
let info_traffic = sniffer.info_traffic.clone();
let info_traffic = &sniffer.info_traffic;
let chart_type = sniffer.traffic_chart.chart_type;
let lazy_report = lazy(filtered, move |_| {
@@ -48,9 +47,9 @@ pub fn thumbnail_page(sniffer: &Sniffer) -> Container<Message, StyleType> {
.padding([5, 0])
.height(Length::Fill)
.align_y(Alignment::Start)
.push(host_col(&info_traffic, chart_type, font))
.push(host_col(info_traffic, chart_type, font))
.push(Rule::vertical(10))
.push(service_col(&info_traffic, chart_type, font))
.push(service_col(info_traffic, chart_type, font))
});
let content = Column::new()
@@ -61,7 +60,7 @@ pub fn thumbnail_page(sniffer: &Sniffer) -> Container<Message, StyleType> {
}
fn host_col<'a>(
info_traffic: &Mutex<InfoTraffic>,
info_traffic: &InfoTraffic,
chart_type: ChartType,
font: Font,
) -> Column<'a, Message, StyleType> {
@@ -103,7 +102,7 @@ fn host_col<'a>(
}
fn service_col<'a>(
info_traffic: &Mutex<InfoTraffic>,
info_traffic: &InfoTraffic,
chart_type: ChartType,
font: Font,
) -> Column<'a, Message, StyleType> {

View File

@@ -1,6 +1,7 @@
//! Module defining the application structure: messages, updates, subscriptions.
use std::collections::{HashSet, VecDeque};
use std::collections::{HashMap, HashSet, VecDeque};
use std::net::IpAddr;
use std::path::PathBuf;
use std::sync::{Arc, Mutex};
use std::thread;
@@ -45,8 +46,9 @@
use crate::mmdb::country::COUNTRY_MMDB;
use crate::mmdb::types::mmdb_reader::{MmdbReader, MmdbReaders};
use crate::networking::types::capture_context::{CaptureContext, CaptureSource, MyPcapImport};
use crate::networking::types::data_info_host::DataInfoHost;
use crate::networking::types::filters::Filters;
use crate::networking::types::host::Host;
use crate::networking::types::host::{Host, NewHostMessage};
use crate::networking::types::host_data_states::HostDataStates;
use crate::networking::types::ip_collection::AddressCollection;
use crate::networking::types::my_device::MyDevice;
@@ -82,7 +84,9 @@ pub struct Sniffer {
/// Capture number, incremented at every new run
pub current_capture_id: Arc<Mutex<usize>>,
/// Capture data updated by thread parsing packets
pub info_traffic: Arc<Mutex<InfoTraffic>>,
pub info_traffic: InfoTraffic,
/// Map of the resolved addresses with their full rDNS value and the corresponding host
pub addresses_resolved: HashMap<IpAddr, (String, Host)>,
/// Reports if a newer release of the software is available on GitHub
pub newer_release_available: Option<bool>,
/// Traffic data displayed in GUI
@@ -146,7 +150,8 @@ pub fn new(configs: &Arc<Mutex<Configs>>) -> Self {
Self {
configs: configs.clone(),
current_capture_id: Arc::new(Mutex::new(0)),
info_traffic: Arc::new(Mutex::new(InfoTraffic::new())),
info_traffic: InfoTraffic::default(),
addresses_resolved: HashMap::new(),
newer_release_available: None,
runtime_data: RunTimeData::new(),
capture_source: CaptureSource::Device(device),
@@ -240,7 +245,7 @@ fn time_subscription(&self) -> Subscription<Message> {
if self.running_page.eq(&RunningPage::Init) {
iced::time::every(Duration::from_millis(PERIOD_TICK)).map(|_| Message::TickInit)
} else {
iced::time::every(Duration::from_millis(PERIOD_TICK)).map(|_| Message::TickRun)
Subscription::none()
}
}
@@ -262,7 +267,7 @@ fn check_updates_stream() -> impl Stream<Item = Message> {
pub fn update(&mut self, message: Message) -> Task<Message> {
match message {
Message::TickRun => return self.refresh_data(),
Message::TickRun(info_traffic) => return self.refresh_data(info_traffic),
Message::AdapterSelection(name) => self.set_adapter(&name),
Message::IpVersionSelection(version, insert) => {
if insert {
@@ -296,7 +301,7 @@ pub fn update(&mut self, message: Message) -> Task<Message> {
self.report_sort_type = sort;
}
Message::OpenWebPage(web_page) => Self::open_web(&web_page),
Message::Start => self.start(),
Message::Start => return self.start(),
Message::Reset => return self.reset(),
Message::Style(style) => {
self.configs.lock().unwrap().settings.style = style;
@@ -370,11 +375,10 @@ pub fn update(&mut self, message: Message) -> Task<Message> {
Message::CtrlDPressed => return self.shortcut_ctrl_d(),
Message::Search(parameters) => {
// update comboboxes
let mut host_data = self.host_data_states.data.lock().unwrap();
let host_data = &mut self.host_data_states.data;
host_data.countries.1 = self.search.country != parameters.country;
host_data.asns.1 = self.search.as_name != parameters.as_name;
host_data.domains.1 = self.search.domain != parameters.domain;
drop(host_data);
self.host_data_states.update_states(&parameters);
self.page_number = 1;
@@ -549,6 +553,7 @@ pub fn update(&mut self, message: Message) -> Task<Message> {
self.capture_source = CaptureSource::File(MyPcapImport::new(path));
}
}
Message::NewHost(host_msg) => self.handle_new_host(host_msg),
Message::TickInit => {}
}
Task::none()
@@ -650,28 +655,26 @@ pub fn scale_factor(&self) -> f64 {
self.configs.lock().unwrap().settings.scale_factor
}
fn refresh_data(&mut self) -> Task<Message> {
let info_traffic_lock = self.info_traffic.lock().unwrap();
let timestamp = info_traffic_lock.latest_packet_timestamp;
self.runtime_data.all_packets = info_traffic_lock.all_packets;
if info_traffic_lock.tot_in_packets + info_traffic_lock.tot_out_packets == 0 {
drop(info_traffic_lock);
fn refresh_data(&mut self, info_traffic: InfoTraffic) -> Task<Message> {
self.info_traffic.refresh(info_traffic);
let info_traffic = &self.info_traffic;
self.runtime_data.all_packets = info_traffic.all_packets;
if info_traffic.tot_in_packets + info_traffic.tot_out_packets == 0 {
return self.update(Message::Waiting);
}
self.runtime_data.tot_out_packets = info_traffic_lock.tot_out_packets;
self.runtime_data.tot_in_packets = info_traffic_lock.tot_in_packets;
self.runtime_data.all_bytes = info_traffic_lock.all_bytes;
self.runtime_data.tot_in_bytes = info_traffic_lock.tot_in_bytes;
self.runtime_data.tot_out_bytes = info_traffic_lock.tot_out_bytes;
self.runtime_data.dropped_packets = info_traffic_lock.dropped_packets;
drop(info_traffic_lock);
// todo: remove runtime_data
self.runtime_data.tot_out_packets = info_traffic.tot_out_packets;
self.runtime_data.tot_in_packets = info_traffic.tot_in_packets;
self.runtime_data.all_bytes = info_traffic.all_bytes;
self.runtime_data.tot_in_bytes = info_traffic.tot_in_bytes;
self.runtime_data.tot_out_bytes = info_traffic.tot_out_bytes;
self.runtime_data.dropped_packets = info_traffic.dropped_packets;
let emitted_notifications = notify_and_log(
&mut self.runtime_data,
self.configs.lock().unwrap().settings.notifications,
&self.info_traffic.clone(),
timestamp,
info_traffic,
);
self.info_traffic.lock().unwrap().favorites_last_interval = HashSet::new();
self.info_traffic.favorites_last_interval = HashSet::new();
self.runtime_data.tot_emitted_notifications += emitted_notifications;
if self.thumbnail || self.running_page.ne(&RunningPage::Notifications) {
self.unread_notifications += emitted_notifications;
@@ -718,7 +721,7 @@ fn open_web(web_page: &WebPage) {
child.wait().unwrap_or_default();
}
fn start(&mut self) {
fn start(&mut self) -> Task<Message> {
if matches!(&self.capture_source, CaptureSource::Device(_)) {
let current_device_name = &self.capture_source.get_name();
self.set_adapter(current_device_name);
@@ -727,8 +730,7 @@ fn start(&mut self) {
let pcap_path = self.export_pcap.full_path();
let capture_context = CaptureContext::new(&capture_source, pcap_path.as_ref());
self.pcap_error = capture_context.error().map(ToString::to_string);
let info_traffic_mutex = self.info_traffic.clone();
*info_traffic_mutex.lock().unwrap() = InfoTraffic::new();
self.info_traffic = InfoTraffic::default();
self.runtime_data = RunTimeData::new();
let ConfigSettings {
style, language, ..
@@ -741,9 +743,9 @@ fn start(&mut self) {
let current_capture_id = self.current_capture_id.clone();
let filters = self.filters.clone();
let mmdb_readers = self.mmdb_readers.clone();
let host_data = self.host_data_states.data.clone();
self.capture_source
.set_link_type(capture_context.my_link_type());
let (tx, rx) = async_channel::unbounded();
let _ = thread::Builder::new()
.name("thread_parse_packets".to_string())
.spawn(move || {
@@ -751,14 +753,15 @@ fn start(&mut self) {
&current_capture_id,
&capture_source,
&filters,
&info_traffic_mutex,
&mmdb_readers,
capture_context,
&host_data,
&tx,
);
})
.log_err(location!());
return Task::stream(rx);
}
Task::none()
}
fn reset(&mut self) -> Task<Message> {
@@ -797,7 +800,7 @@ fn update_waiting_dots(&mut self) {
}
fn add_or_remove_favorite(&mut self, host: &Host, add: bool) {
let mut info_traffic = self.info_traffic.lock().unwrap();
let info_traffic = &mut self.info_traffic;
if add {
info_traffic.favorite_hosts.insert(host.clone());
} else {
@@ -806,7 +809,6 @@ fn add_or_remove_favorite(&mut self, host: &Host, add: bool) {
if let Some(host_info) = info_traffic.hosts.get_mut(host) {
host_info.is_favorite = add;
}
drop(info_traffic);
}
fn close_settings(&mut self) {
@@ -916,9 +918,7 @@ fn shortcut_esc(&mut self) -> Task<Message> {
// also called when the backspace shortcut is pressed
fn reset_button_pressed(&mut self) -> Task<Message> {
if self.running_page.ne(&RunningPage::Init) {
return if self.info_traffic.lock().unwrap().all_packets == 0
&& self.settings_page.is_none()
{
return if self.info_traffic.all_packets == 0 && self.settings_page.is_none() {
self.update(Message::Reset)
} else {
self.update(Message::ShowModal(MyModal::Reset))
@@ -928,9 +928,7 @@ fn reset_button_pressed(&mut self) -> Task<Message> {
}
fn quit_wrapper(&mut self) -> Task<Message> {
if self.running_page.eq(&RunningPage::Init)
|| self.info_traffic.lock().unwrap().all_packets == 0
{
if self.running_page.eq(&RunningPage::Init) || self.info_traffic.all_packets == 0 {
self.update(Message::Quit)
} else if self.thumbnail {
// TODO: uncomment once issue #653 is fixed
@@ -981,6 +979,46 @@ async fn open_file(old_file: String, file_info: FileInfo, language: Language) ->
picked.path().to_string_lossy().to_string()
}
fn handle_new_host(&mut self, host_msg: NewHostMessage) {
let NewHostMessage {
host,
other_data,
is_loopback,
is_local,
is_bogon,
traffic_type,
address_to_lookup,
rdns,
} = host_msg;
self.info_traffic
.hosts
.entry(host.clone())
.and_modify(|data_info_host| {
data_info_host.data_info += other_data;
})
.or_insert_with(|| DataInfoHost {
data_info: other_data,
is_favorite: false,
is_loopback,
is_local,
is_bogon,
traffic_type,
});
self.addresses_resolved
.insert(address_to_lookup, (rdns, host.clone()));
// update host data states including the new host
self.host_data_states.data.update(&host);
// check if the newly resolved host was featured in the favorites (possible in case of already existing host)
// todo: check if this is the correct and only place to do this
if self.info_traffic.favorite_hosts.contains(&host) {
self.info_traffic.favorites_last_interval.insert(host);
}
}
}
#[cfg(test)]

View File

@@ -5,7 +5,8 @@
use crate::gui::pages::types::running_page::RunningPage;
use crate::gui::pages::types::settings_page::SettingsPage;
use crate::gui::styles::types::gradient_type::GradientType;
use crate::networking::types::host::Host;
use crate::networking::types::host::{Host, NewHostMessage};
use crate::networking::types::info_traffic::InfoTraffic;
use crate::notifications::types::notifications::Notification;
use crate::report::types::search_parameters::SearchParameters;
use crate::report::types::sort_type::SortType;
@@ -16,10 +17,10 @@
#[derive(Debug, Clone)]
/// Messages types that permit reacting to application interactions/subscriptions
pub enum Message {
/// Every 5 seconds
/// Every 1 second on initial page
TickInit,
/// Every 1 second
TickRun,
/// Sent by the backend parsing packets
TickRun(InfoTraffic),
/// Select adapter
AdapterSelection(String),
/// Select IP filter
@@ -130,4 +131,6 @@ pub enum Message {
SetNewerReleaseStatus(Option<bool>),
/// Set the pcap import path
SetPcapImport(String),
/// A new host has been resolved
NewHost(NewHostMessage),
}

View File

@@ -1,11 +1,12 @@
use std::collections::HashMap;
use std::net::{IpAddr, Ipv4Addr, Ipv6Addr};
use std::sync::Mutex;
use std::sync::{Arc, Mutex};
use dns_lookup::lookup_addr;
use etherparse::{EtherType, LaxPacketHeaders, LinkHeader, NetHeaders, TransportHeader};
use pcap::{Address, Device};
use crate::gui::types::message::Message;
use crate::mmdb::asn::get_asn;
use crate::mmdb::country::get_country;
use crate::mmdb::types::mmdb_reader::MmdbReaders;
@@ -13,9 +14,7 @@
use crate::networking::types::arp_type::ArpType;
use crate::networking::types::bogon::is_bogon;
use crate::networking::types::capture_context::CaptureSource;
use crate::networking::types::data_info_host::DataInfoHost;
use crate::networking::types::host::Host;
use crate::networking::types::host_data_states::HostData;
use crate::networking::types::host::{Host, NewHostMessage};
use crate::networking::types::icmp_type::{IcmpType, IcmpTypeV4, IcmpTypeV6};
use crate::networking::types::info_address_port_pair::InfoAddressPortPair;
use crate::networking::types::packet_filters_fields::PacketFiltersFields;
@@ -23,9 +22,11 @@
use crate::networking::types::service_query::ServiceQuery;
use crate::networking::types::traffic_direction::TrafficDirection;
use crate::networking::types::traffic_type::TrafficType;
use crate::secondary_threads::parse_packets::AddressesResolutionState;
use crate::utils::error_logger::{ErrorLogger, Location};
use crate::utils::formatted_strings::get_domain_from_r_dns;
use crate::{InfoTraffic, IpVersion, Protocol, location};
use async_channel::Sender;
use std::fmt::Write;
include!(concat!(env!("OUT_DIR"), "/services.rs"));
@@ -258,7 +259,7 @@ pub fn get_service(
/// Function to insert the source and destination of a packet into the shared map containing the analyzed traffic.
pub fn modify_or_insert_in_map(
info_traffic_mutex: &Mutex<InfoTraffic>,
info_traffic: &mut InfoTraffic,
key: &AddressPortPair,
cs: &CaptureSource,
mac_addresses: (Option<String>, Option<String>),
@@ -269,7 +270,8 @@ pub fn modify_or_insert_in_map(
let mut traffic_direction = TrafficDirection::default();
let mut service = Service::Unknown;
if !info_traffic_mutex.lock().unwrap().map.contains_key(key) {
// todo: use a full map...
if !info_traffic.map.contains_key(key) {
// first occurrence of key
// update device addresses
@@ -299,9 +301,7 @@ pub fn modify_or_insert_in_map(
service = get_service(key, traffic_direction, &my_interface_addresses);
}
let mut info_traffic = info_traffic_mutex.lock().unwrap();
let timestamp = info_traffic.latest_packet_timestamp;
let timestamp = info_traffic.last_packet_timestamp;
let new_info: InfoAddressPortPair = info_traffic
.map
.entry(*key)
@@ -344,26 +344,27 @@ pub fn modify_or_insert_in_map(
})
.clone();
if let Some(host_info) = info_traffic
.addresses_resolved
.get(&get_address_to_lookup(key, new_info.traffic_direction))
.cloned()
{
if info_traffic.favorite_hosts.contains(&host_info.1) {
info_traffic.favorites_last_interval.insert(host_info.1);
}
}
// todo!
// if let Some(host_info) = info_traffic
// .addresses_resolved
// .get(&get_address_to_lookup(key, new_info.traffic_direction))
// .cloned()
// {
// if info_traffic.favorite_hosts.contains(&host_info.1) {
// info_traffic.favorites_last_interval.insert(host_info.1);
// }
// }
new_info
}
pub fn reverse_dns_lookup(
info_traffic: &Mutex<InfoTraffic>,
tx: &Sender<Message>,
resolutions_state: &Arc<Mutex<AddressesResolutionState>>,
key: &AddressPortPair,
traffic_direction: TrafficDirection,
cs: &CaptureSource,
mmdb_readers: &MmdbReaders,
host_data: &Mutex<HostData>,
) {
let address_to_lookup = get_address_to_lookup(key, traffic_direction);
let my_interface_addresses = cs.get_addresses().lock().unwrap().clone();
@@ -382,7 +383,7 @@ pub fn reverse_dns_lookup(
let is_bogon = is_bogon(&address_to_lookup);
let country = get_country(&address_to_lookup, &mmdb_readers.country);
let asn = get_asn(&address_to_lookup, &mmdb_readers.asn);
let r_dns = if let Ok(result) = lookup_result {
let rdns = if let Ok(result) = lookup_result {
if result.is_empty() {
address_to_lookup.to_string()
} else {
@@ -392,45 +393,34 @@ pub fn reverse_dns_lookup(
address_to_lookup.to_string()
};
let new_host = Host {
domain: get_domain_from_r_dns(r_dns.clone()),
domain: get_domain_from_r_dns(rdns.clone()),
asn,
country,
};
let mut info_traffic_lock = info_traffic.lock().unwrap();
// collect the data exchanged from the same address so far and remove the address from the collection of addresses waiting a rDNS
let other_data = info_traffic_lock
let mut resolutions_lock = resolutions_state.lock().unwrap();
let other_data = resolutions_lock
.addresses_waiting_resolution
.remove(&address_to_lookup)
.unwrap_or_default();
// insert the newly resolved host in the collections, with the data it exchanged so far
info_traffic_lock
resolutions_lock
.addresses_resolved
.insert(address_to_lookup, (r_dns, new_host.clone()));
info_traffic_lock
.hosts
.entry(new_host.clone())
.and_modify(|data_info_host| {
data_info_host.data_info += other_data;
})
.or_insert_with(|| DataInfoHost {
data_info: other_data,
is_favorite: false,
is_loopback,
is_local,
is_bogon,
traffic_type,
});
.insert(address_to_lookup, (rdns.clone(), new_host.clone()));
drop(resolutions_lock);
// update host data states including the new host
host_data.lock().unwrap().update(&new_host);
// check if the newly resolved host was featured in the favorites (possible in case of already existing host)
if info_traffic_lock.favorite_hosts.contains(&new_host) {
info_traffic_lock.favorites_last_interval.insert(new_host);
}
drop(info_traffic_lock);
let msg_data = NewHostMessage {
host: new_host,
other_data,
is_loopback,
is_local,
is_bogon,
traffic_type,
address_to_lookup,
rdns,
};
let _ = tx.send_blocking(Message::NewHost(msg_data));
}
/// Returns the traffic direction observed (incoming or outgoing)

View File

@@ -4,7 +4,7 @@
use etherparse::ArpOperation;
use std::fmt::Write;
#[derive(Copy, Clone, PartialEq, Eq, Hash, Default)]
#[derive(Copy, Clone, PartialEq, Eq, Hash, Default, Debug)]
pub enum ArpType {
Request,
Reply,

View File

@@ -11,7 +11,7 @@
/// Amount of exchanged data (packets and bytes) incoming and outgoing, with the timestamp of the latest occurrence
// data fields are private to make them only editable via the provided methods: needed to correctly refresh timestamps
#[derive(Clone, Default, Copy)]
#[derive(Clone, Default, Copy, Debug)]
pub struct DataInfo {
/// Incoming packets
incoming_packets: u128,

View File

@@ -4,7 +4,7 @@
use crate::networking::types::traffic_type::TrafficType;
/// Host-related information.
#[derive(Clone, Copy, Default)]
#[derive(Clone, Copy, Default, Debug)]
pub struct DataInfoHost {
/// Incoming and outgoing packets and bytes
pub data_info: DataInfo,

View File

@@ -1,5 +1,8 @@
use crate::countries::types::country::Country;
use crate::networking::types::asn::Asn;
use crate::networking::types::data_info::DataInfo;
use crate::networking::types::traffic_type::TrafficType;
use std::net::IpAddr;
/// Struct to represent a network host
#[derive(Default, PartialEq, Eq, Hash, Clone, Debug)]
@@ -23,3 +26,15 @@ pub struct ThumbnailHost {
/// Text describing the host in the thumbnail
pub text: String,
}
#[derive(Clone, Debug)]
pub struct NewHostMessage {
pub host: Host,
pub other_data: DataInfo,
pub is_loopback: bool,
pub is_local: bool,
pub is_bogon: Option<&'static str>,
pub traffic_type: TrafficType,
pub address_to_lookup: IpAddr,
pub rdns: String,
}

View File

@@ -1,6 +1,5 @@
use std::collections::BTreeSet;
use std::net::IpAddr;
use std::sync::{Arc, Mutex};
use iced::widget::combo_box;
@@ -13,14 +12,14 @@
/// It also stores combobox states for the host-related filters
#[derive(Default)]
pub struct HostDataStates {
pub data: Arc<Mutex<HostData>>,
pub data: HostData,
pub states: HostStates,
}
impl HostDataStates {
pub fn update_states(&mut self, search: &SearchParameters) {
let states = &mut self.states;
let mut data = self.data.lock().unwrap();
let data = &mut self.data;
if data.domains.1 {
states.domains = combo_box::State::with_selection(

View File

@@ -4,7 +4,7 @@
use etherparse::{Icmpv4Type, Icmpv6Type};
use std::fmt::Write;
#[derive(Copy, Clone, PartialEq, Eq, Hash)]
#[derive(Copy, Clone, PartialEq, Eq, Hash, Debug)]
pub enum IcmpType {
V4(IcmpTypeV4),
V6(IcmpTypeV6),
@@ -47,7 +47,7 @@ fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
}
}
#[derive(Copy, Clone, PartialEq, Eq, Hash, Default)]
#[derive(Copy, Clone, PartialEq, Eq, Hash, Default, Debug)]
#[allow(clippy::module_name_repetitions)]
pub enum IcmpTypeV4 {
EchoReply,
@@ -163,7 +163,7 @@ fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
}
}
#[derive(Copy, Clone, PartialEq, Eq, Hash, Default)]
#[derive(Copy, Clone, PartialEq, Eq, Hash, Default, Debug)]
#[allow(clippy::module_name_repetitions)]
pub enum IcmpTypeV6 {
DestinationUnreachable,

View File

@@ -11,7 +11,7 @@
/// Struct useful to format the output report file and to keep track of statistics about the sniffed traffic.
///
/// Each `InfoAddressPortPair` struct is associated to a single address:port pair.
#[derive(Clone, Default)]
#[derive(Clone, Default, Debug)]
pub struct InfoAddressPortPair {
/// Source MAC address
pub mac_address1: Option<String>,
@@ -34,3 +34,23 @@ pub struct InfoAddressPortPair {
/// Types of the ARP operations, with the relative count (this is empty if not ARP)
pub arp_types: HashMap<ArpType, usize>,
}
impl InfoAddressPortPair {
pub fn refresh(&mut self, other: &Self) {
self.transmitted_bytes += other.transmitted_bytes;
self.transmitted_packets += other.transmitted_packets;
self.final_timestamp = other.final_timestamp;
for (icmp_type, count) in &other.icmp_types {
self.icmp_types
.entry(*icmp_type)
.and_modify(|v| *v += count)
.or_insert(*count);
}
for (arp_type, count) in &other.arp_types {
self.arp_types
.entry(*arp_type)
.and_modify(|v| *v += count)
.or_insert(*count);
}
}
}

View File

@@ -9,9 +9,9 @@
use crate::networking::types::info_address_port_pair::InfoAddressPortPair;
use crate::networking::types::traffic_direction::TrafficDirection;
use std::collections::{HashMap, HashSet};
use std::net::IpAddr;
/// Struct to be shared between the threads in charge of parsing packets and update reports.
#[derive(Debug, Clone, Default)]
pub struct InfoTraffic {
/// Total amount of filtered bytes received.
pub tot_in_bytes: u128,
@@ -27,46 +27,23 @@ pub struct InfoTraffic {
pub all_bytes: u128,
/// Number of dropped packets
pub dropped_packets: u32,
/// Timestamp of the latest packet
pub last_packet_timestamp: i64,
/// Map of the filtered traffic
pub map: HashMap<AddressPortPair, InfoAddressPortPair>,
/// Map of the upper layer services with their data info
pub services: HashMap<Service, DataInfo>,
/// Map of the hosts with their data info
pub hosts: HashMap<Host, DataInfoHost>,
// todo!!!
/// Collection of the favorite hosts
pub favorite_hosts: HashSet<Host>,
/// Collection of favorite hosts that exchanged data in the last interval
pub favorites_last_interval: HashSet<Host>,
/// Map of the upper layer services with their data info
pub services: HashMap<Service, DataInfo>,
/// Map of the addresses waiting for a rDNS resolution; used to NOT send multiple rDNS for the same address
pub addresses_waiting_resolution: HashMap<IpAddr, DataInfo>,
/// Map of the resolved addresses with their full rDNS value and the corresponding host
pub addresses_resolved: HashMap<IpAddr, (String, Host)>,
/// Map of the hosts with their data info
pub hosts: HashMap<Host, DataInfoHost>,
/// Timestamp of the latest packet
pub latest_packet_timestamp: i64,
}
impl InfoTraffic {
/// Constructs a new `InfoTraffic` element.
pub fn new() -> Self {
InfoTraffic {
tot_in_bytes: 0,
tot_out_bytes: 0,
tot_in_packets: 0,
tot_out_packets: 0,
all_packets: 0,
all_bytes: 0,
dropped_packets: 0,
map: HashMap::new(),
favorite_hosts: HashSet::new(),
favorites_last_interval: HashSet::new(),
services: HashMap::new(),
addresses_waiting_resolution: HashMap::new(),
addresses_resolved: HashMap::new(),
hosts: HashMap::new(),
latest_packet_timestamp: 0,
}
}
pub fn add_packet(&mut self, bytes: u128, traffic_direction: TrafficDirection) {
if traffic_direction == TrafficDirection::Outgoing {
//increment number of sent packets and bytes
@@ -78,4 +55,51 @@ pub fn add_packet(&mut self, bytes: u128, traffic_direction: TrafficDirection) {
self.tot_in_bytes += bytes;
}
}
pub fn refresh(&mut self, other: Self) {
self.tot_in_bytes += other.tot_in_bytes;
self.tot_out_bytes += other.tot_out_bytes;
self.tot_in_packets += other.tot_in_packets;
self.tot_out_packets += other.tot_out_packets;
self.all_packets += other.all_packets;
self.all_bytes += other.all_bytes;
self.dropped_packets = other.dropped_packets;
self.last_packet_timestamp = other.last_packet_timestamp;
for (key, value) in other.map {
self.map
.entry(key)
.and_modify(|x| x.refresh(&value))
.or_insert(value);
}
for (key, value) in other.services {
self.services
.entry(key)
.and_modify(|x| *x += value)
.or_insert(value);
}
for (key, value) in other.hosts {
self.hosts
.entry(key)
.and_modify(|x| x.data_info += value.data_info)
.or_insert(value);
}
// todo: remove this
// let mut total_packets = 0;
// for (_, data_info_host) in self.hosts.iter() {
// total_packets += data_info_host.data_info.tot_packets();
// }
// println!("Total packets from all hosts: {}", total_packets);
}
pub fn take(&mut self, last_packet_timestamp: i64) -> Self {
let info_traffic = InfoTraffic {
last_packet_timestamp,
..InfoTraffic::default()
};
std::mem::replace(self, info_traffic)
}
}

View File

@@ -1,5 +1,3 @@
use std::sync::Mutex;
use crate::networking::types::data_info_host::DataInfoHost;
use crate::notifications::types::logged_notification::{
BytesThresholdExceeded, FavoriteTransmitted, LoggedNotification, PacketsThresholdExceeded,
@@ -15,11 +13,11 @@
pub fn notify_and_log(
runtime_data: &mut RunTimeData,
notifications: Notifications,
info_traffic: &Mutex<InfoTraffic>,
timestamp: i64,
info_traffic: &InfoTraffic,
) -> usize {
let mut already_emitted_sound = false;
let mut emitted_notifications = 0;
let timestamp = info_traffic.last_packet_timestamp;
// packets threshold
if let Some(threshold) = notifications.packets_notification.threshold {
let sent_packets_entry = runtime_data.tot_out_packets - runtime_data.tot_out_packets_prev;
@@ -75,21 +73,16 @@ pub fn notify_and_log(
}
// from favorites
if notifications.favorite_notification.notify_on_favorite
&& !info_traffic
.lock()
.unwrap()
.favorites_last_interval
.is_empty()
&& !info_traffic.favorites_last_interval.is_empty()
{
let info_traffic_lock = info_traffic.lock().unwrap();
for host in info_traffic_lock.favorites_last_interval.clone() {
for host in info_traffic.favorites_last_interval.clone() {
//log this notification
emitted_notifications += 1;
if runtime_data.logged_notifications.len() >= 30 {
runtime_data.logged_notifications.pop_back();
}
let data_info_host = *info_traffic_lock
let data_info_host = *info_traffic
.hosts
.get(&host)
.unwrap_or(&DataInfoHost::default());
@@ -103,7 +96,6 @@ pub fn notify_and_log(
},
));
}
drop(info_traffic_lock);
if !already_emitted_sound && notifications.favorite_notification.sound.ne(&Sound::None) {
// emit sound
play(

View File

@@ -1,5 +1,4 @@
use std::cmp::min;
use std::sync::Mutex;
use crate::networking::manage_packets::get_address_to_lookup;
use crate::networking::types::address_port_pair::AddressPortPair;
@@ -17,15 +16,15 @@ pub fn get_searched_entries(
sniffer: &Sniffer,
) -> (Vec<(AddressPortPair, InfoAddressPortPair)>, usize, DataInfo) {
let mut agglomerate = DataInfo::default();
let info_traffic_lock = sniffer.info_traffic.lock().unwrap();
let mut all_results: Vec<(&AddressPortPair, &InfoAddressPortPair)> = info_traffic_lock
let info_traffic = &sniffer.info_traffic;
let mut all_results: Vec<(&AddressPortPair, &InfoAddressPortPair)> = info_traffic
.map
.iter()
.filter(|(key, value)| {
let address_to_lookup = &get_address_to_lookup(key, value.traffic_direction);
let r_dns_host = info_traffic_lock.addresses_resolved.get(address_to_lookup);
let r_dns_host = sniffer.addresses_resolved.get(address_to_lookup);
let is_favorite = if let Some(e) = r_dns_host {
info_traffic_lock
info_traffic
.hosts
.get(&e.1)
.unwrap_or(&DataInfoHost::default())
@@ -82,12 +81,11 @@ pub fn get_searched_entries(
}
pub fn get_host_entries(
info_traffic: &Mutex<InfoTraffic>,
info_traffic: &InfoTraffic,
chart_type: ChartType,
sort_type: SortType,
) -> Vec<(Host, DataInfoHost)> {
let info_traffic_lock = info_traffic.lock().unwrap();
let mut sorted_vec: Vec<(&Host, &DataInfoHost)> = info_traffic_lock.hosts.iter().collect();
let mut sorted_vec: Vec<(&Host, &DataInfoHost)> = info_traffic.hosts.iter().collect();
sorted_vec.sort_by(|&(_, a), &(_, b)| a.data_info.compare(&b.data_info, sort_type, chart_type));
@@ -99,12 +97,11 @@ pub fn get_host_entries(
}
pub fn get_service_entries(
info_traffic: &Mutex<InfoTraffic>,
info_traffic: &InfoTraffic,
chart_type: ChartType,
sort_type: SortType,
) -> Vec<(Service, DataInfo)> {
let info_traffic_lock = info_traffic.lock().unwrap();
let mut sorted_vec: Vec<(&Service, &DataInfo)> = info_traffic_lock
let mut sorted_vec: Vec<(&Service, &DataInfo)> = info_traffic
.services
.iter()
.filter(|(service, _)| service != &&Service::NotApplicable)

View File

@@ -1,24 +1,29 @@
//! Module containing functions executed by the thread in charge of parsing sniffed packets and
//! inserting them in the shared map.
use std::sync::{Arc, Mutex};
use std::thread;
use async_channel::Sender;
use etherparse::err::ip::{HeaderError, LaxHeaderSliceError};
use etherparse::err::{Layer, LenError};
use etherparse::{LaxPacketHeaders, LenSource};
use pcap::Packet;
use std::collections::HashMap;
use std::net::IpAddr;
use std::sync::{Arc, Mutex};
use std::thread;
use crate::gui::types::message::Message;
use crate::mmdb::types::mmdb_reader::MmdbReaders;
use crate::networking::manage_packets::{
analyze_headers, get_address_to_lookup, modify_or_insert_in_map, reverse_dns_lookup,
analyze_headers, get_address_to_lookup, get_traffic_type, is_local_connection,
modify_or_insert_in_map, reverse_dns_lookup,
};
use crate::networking::types::arp_type::ArpType;
use crate::networking::types::bogon::is_bogon;
use crate::networking::types::capture_context::{CaptureContext, CaptureSource};
use crate::networking::types::data_info::DataInfo;
use crate::networking::types::data_info_host::DataInfoHost;
use crate::networking::types::filters::Filters;
use crate::networking::types::host::Host;
use crate::networking::types::host_data_states::HostData;
use crate::networking::types::icmp_type::IcmpType;
use crate::networking::types::info_address_port_pair::InfoAddressPortPair;
use crate::networking::types::my_link_type::MyLinkType;
@@ -32,15 +37,17 @@ pub fn parse_packets(
current_capture_id: &Mutex<usize>,
cs: &CaptureSource,
filters: &Filters,
info_traffic_mutex: &Arc<Mutex<InfoTraffic>>,
mmdb_readers: &MmdbReaders,
capture_context: CaptureContext,
host_data: &Arc<Mutex<HostData>>,
tx: &Sender<Message>,
) {
let my_link_type = capture_context.my_link_type();
let (mut cap, mut savefile) = capture_context.consume();
let capture_id = *current_capture_id.lock().unwrap();
let mut info_traffic = InfoTraffic::default();
let resolutions_state = Arc::new(Mutex::new(AddressesResolutionState::default()));
loop {
match cap.next_packet() {
@@ -61,10 +68,13 @@ pub fn parse_packets(
let mut icmp_type = IcmpType::default();
let mut arp_type = ArpType::default();
let mut packet_filters_fields = PacketFiltersFields::default();
#[allow(clippy::useless_conversion)]
{
info_traffic_mutex.lock().unwrap().latest_packet_timestamp =
i64::from(packet.header.ts.tv_sec);
let this_packet_timestamp = i64::from(packet.header.ts.tv_sec);
if info_traffic.last_packet_timestamp != this_packet_timestamp {
let _ = tx.send_blocking(Message::TickRun(
info_traffic.take(this_packet_timestamp),
));
}
let key_option = analyze_headers(
@@ -90,7 +100,7 @@ pub fn parse_packets(
}
// update the shared map
new_info = modify_or_insert_in_map(
info_traffic_mutex,
&mut info_traffic,
&key,
cs,
mac_addresses,
@@ -100,7 +110,6 @@ pub fn parse_packets(
);
}
let mut info_traffic = info_traffic_mutex.lock().unwrap();
//increment number of sniffed packets and bytes
info_traffic.all_packets += 1;
info_traffic.all_bytes += exchanged_bytes;
@@ -115,12 +124,16 @@ pub fn parse_packets(
// check the rDNS status of this address and act accordingly
let address_to_lookup =
get_address_to_lookup(&key, new_info.traffic_direction);
let r_dns_already_resolved = info_traffic
let r_dns_already_resolved = resolutions_state
.lock()
.unwrap()
.addresses_resolved
.contains_key(&address_to_lookup);
let mut r_dns_waiting_resolution = false;
if !r_dns_already_resolved {
r_dns_waiting_resolution = info_traffic
r_dns_waiting_resolution = resolutions_state
.lock()
.unwrap()
.addresses_waiting_resolution
.contains_key(&address_to_lookup);
}
@@ -131,30 +144,34 @@ pub fn parse_packets(
// Add this address to the map of addresses waiting for a resolution
// Useful to NOT perform again a rDNS lookup for this entry
info_traffic.addresses_waiting_resolution.insert(
address_to_lookup,
DataInfo::new_with_first_packet(
exchanged_bytes,
new_info.traffic_direction,
),
);
resolutions_state
.lock()
.unwrap()
.addresses_waiting_resolution
.insert(
address_to_lookup,
DataInfo::new_with_first_packet(
exchanged_bytes,
new_info.traffic_direction,
),
);
// launch new thread to resolve host name
let key2 = key;
let info_traffic2 = info_traffic_mutex.clone();
let tx2 = tx.clone();
let resolutions_state2 = resolutions_state.clone();
let device2 = cs.clone();
let mmdb_readers_2 = mmdb_readers.clone();
let host_data2 = host_data.clone();
let _ = thread::Builder::new()
.name("thread_reverse_dns_lookup".to_string())
.spawn(move || {
reverse_dns_lookup(
&info_traffic2,
&tx2,
&resolutions_state2,
&key2,
new_info.traffic_direction,
&device2,
&mmdb_readers_2,
&host_data2,
);
})
.log_err(location!());
@@ -162,7 +179,9 @@ pub fn parse_packets(
(true, false) => {
// waiting for a previously requested rDNS resolution
// update the corresponding waiting address data
info_traffic
resolutions_state
.lock()
.unwrap()
.addresses_waiting_resolution
.entry(address_to_lookup)
.and_modify(|data_info| {
@@ -175,17 +194,50 @@ pub fn parse_packets(
(_, true) => {
// rDNS already resolved
// update the corresponding host's data info
let host = info_traffic
let host = resolutions_state
.lock()
.unwrap()
.addresses_resolved
.get(&address_to_lookup)
.unwrap_or(&(String::new(), Host::default()))
.1
.clone();
info_traffic.hosts.entry(host).and_modify(|data_info_host| {
data_info_host
.data_info
.add_packet(exchanged_bytes, new_info.traffic_direction);
});
info_traffic
.hosts
.entry(host)
.and_modify(|data_info_host| {
data_info_host.data_info.add_packet(
exchanged_bytes,
new_info.traffic_direction,
);
})
.or_insert_with(|| {
let traffic_direction = new_info.traffic_direction;
let my_interface_addresses =
cs.get_addresses().lock().unwrap().clone();
let traffic_type = get_traffic_type(
&address_to_lookup,
&my_interface_addresses,
traffic_direction,
);
let is_loopback = address_to_lookup.is_loopback();
let is_local = is_local_connection(
&address_to_lookup,
&my_interface_addresses,
);
let is_bogon = is_bogon(&address_to_lookup);
DataInfoHost {
data_info: DataInfo::new_with_first_packet(
exchanged_bytes,
traffic_direction,
),
is_favorite: false,
is_loopback,
is_local,
is_bogon,
traffic_type,
}
});
}
}
@@ -260,3 +312,11 @@ fn matches(value: u32) -> bool {
))
}
}
#[derive(Default)]
pub struct AddressesResolutionState {
/// Map of the addresses waiting for a rDNS resolution; used to NOT send multiple rDNS for the same address
pub addresses_waiting_resolution: HashMap<IpAddr, DataInfo>,
/// Map of the resolved addresses with their full rDNS value and the corresponding host
pub addresses_resolved: HashMap<IpAddr, (String, Host)>,
}