diff --git a/apps/tauri/src-tauri/gen/schemas/capabilities.json b/apps/tauri/src-tauri/gen/schemas/capabilities.json index 566898008..4e430b752 100644 --- a/apps/tauri/src-tauri/gen/schemas/capabilities.json +++ b/apps/tauri/src-tauri/gen/schemas/capabilities.json @@ -1 +1 @@ -{"default":{"identifier":"default","description":"Default permissions for Spacedrive","local":true,"windows":["main"],"permissions":["core:default","core:event:allow-listen","core:event:allow-emit","core:window:allow-create","core:window:allow-close","core:window:allow-get-all-windows","core:webview:allow-create-webview-window","core:path:default","dialog:allow-open","dialog:allow-save","shell:allow-open","fs:allow-home-read-recursive"]}} \ No newline at end of file +{"default":{"identifier":"default","description":"Default permissions for Spacedrive","local":true,"windows":["main"],"permissions":["core:default","core:event:allow-listen","core:event:allow-emit","core:window:allow-create","core:window:allow-close","core:window:allow-get-all-windows","core:webview:allow-create-webview-window","core:path:default","dialog:allow-open","dialog:allow-save","shell:allow-open","fs:allow-home-read-recursive","clipboard-manager:allow-read-text","clipboard-manager:allow-write-text"]}} \ No newline at end of file diff --git a/core/src/library/manager.rs b/core/src/library/manager.rs index d578f2230..36d968d99 100644 --- a/core/src/library/manager.rs +++ b/core/src/library/manager.rs @@ -464,6 +464,14 @@ impl LibraryManager { libraries.insert(config.id, library.clone()); } + // Spawn statistics recalculation listener + // This listens for ResourceChanged events and recalculates statistics every 5 seconds + // while events are flowing + super::statistics_listener::spawn_statistics_listener( + library.clone(), + self.event_bus.clone(), + ); + // Initialize sidecar manager before resuming jobs if let Some(sidecar_manager) = context.get_sidecar_manager().await { if let Err(e) = sidecar_manager.init_library(&library).await { diff --git a/core/src/library/mod.rs b/core/src/library/mod.rs index ae67cf2cf..d4621ebb4 100644 --- a/core/src/library/mod.rs +++ b/core/src/library/mod.rs @@ -8,6 +8,7 @@ pub(crate) mod config; mod error; mod lock; mod manager; +mod statistics_listener; mod sync_helpers; pub use config::{LibraryConfig, LibrarySettings, LibraryStatistics}; diff --git a/core/src/library/statistics_listener.rs b/core/src/library/statistics_listener.rs new file mode 100644 index 000000000..1e51446db --- /dev/null +++ b/core/src/library/statistics_listener.rs @@ -0,0 +1,247 @@ +//! Background listener that recalculates library statistics while ResourceEvents flow + +use super::Library; +use crate::infra::event::{Event, EventBus, EventSubscriber}; +use std::sync::Arc; +use std::time::Duration; +use tokio::sync::broadcast::error::RecvError; +use tokio::time::{interval, sleep}; +use tracing::{debug, error, info, warn}; + +/// Duration to wait between recalculations while events are flowing +const RECALCULATION_INTERVAL: Duration = Duration::from_secs(5); + +/// Duration to wait for new events before stopping the listener +/// If no events arrive within this window, we consider the "activity burst" finished +const IDLE_TIMEOUT: Duration = Duration::from_secs(10); + +/// Spawn a background task that recalculates library statistics periodically +/// while ResourceChanged events are flowing through the event bus +/// +/// The listener uses a throttling mechanism: +/// - Recalculates at most every 5 seconds while events are flowing +/// - Stops recalculating after 10 seconds of no events +/// - Automatically restarts when new events arrive +pub fn spawn_statistics_listener(library: Arc, event_bus: Arc) { + let library_id = library.id(); + let library_name = tokio::task::block_in_place(|| { + tokio::runtime::Handle::current().block_on(async { library.name().await }) + }); + + info!( + library_id = %library_id, + library_name = %library_name, + "Spawning statistics recalculation listener" + ); + + tokio::spawn(async move { + let mut subscriber = event_bus.subscribe(); + + // Wait for first ResourceChanged event to start + loop { + match subscriber.recv().await { + Ok(event) => { + if is_resource_changed_event(&event) { + debug!( + library_id = %library_id, + library_name = %library_name, + "First ResourceChanged event detected, starting active recalculation mode" + ); + break; + } + } + Err(RecvError::Lagged(skipped)) => { + warn!( + library_id = %library_id, + library_name = %library_name, + skipped = skipped, + "Event subscriber lagged, some events were skipped" + ); + // Continue listening + } + Err(RecvError::Closed) => { + info!( + library_id = %library_id, + library_name = %library_name, + "Event bus closed, statistics listener shutting down" + ); + return; + } + } + } + + // Main loop: active recalculation while events are flowing + loop { + if let Err(e) = + run_active_recalculation_cycle(&library, &mut subscriber, library_id, &library_name) + .await + { + error!( + library_id = %library_id, + library_name = %library_name, + error = %e, + "Error in statistics recalculation cycle" + ); + } + + // After an active cycle ends (idle timeout), wait for next ResourceChanged event + debug!( + library_id = %library_id, + library_name = %library_name, + "Active recalculation cycle ended, waiting for next ResourceChanged event" + ); + + loop { + match subscriber.recv().await { + Ok(event) => { + if is_resource_changed_event(&event) { + debug!( + library_id = %library_id, + library_name = %library_name, + "New ResourceChanged event detected, restarting active recalculation" + ); + break; // Restart active recalculation + } + } + Err(RecvError::Lagged(skipped)) => { + warn!( + library_id = %library_id, + library_name = %library_name, + skipped = skipped, + "Event subscriber lagged during idle wait" + ); + } + Err(RecvError::Closed) => { + info!( + library_id = %library_id, + library_name = %library_name, + "Event bus closed, statistics listener shutting down" + ); + return; + } + } + } + } + }); +} + +/// Run one active recalculation cycle while events are flowing +/// +/// Recalculates statistics every 5 seconds while ResourceChanged events arrive. +/// Returns when no events have been received for 10 seconds (idle timeout). +async fn run_active_recalculation_cycle( + library: &Arc, + subscriber: &mut EventSubscriber, + library_id: uuid::Uuid, + library_name: &str, +) -> Result<(), Box> { + let mut recalc_interval = interval(RECALCULATION_INTERVAL); + recalc_interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip); + + // Trigger immediate recalculation at start of active cycle + if let Err(e) = library.recalculate_statistics().await { + warn!( + library_id = %library_id, + library_name = %library_name, + error = %e, + "Failed to trigger statistics recalculation" + ); + } else { + debug!( + library_id = %library_id, + library_name = %library_name, + "Triggered statistics recalculation at start of active cycle" + ); + } + + let mut last_event_time = tokio::time::Instant::now(); + let mut event_count = 0u64; + + loop { + tokio::select! { + // Check for idle timeout + _ = sleep(IDLE_TIMEOUT) => { + let elapsed_since_last_event = tokio::time::Instant::now() - last_event_time; + if elapsed_since_last_event >= IDLE_TIMEOUT { + info!( + library_id = %library_id, + library_name = %library_name, + event_count = event_count, + "No events for {} seconds, ending active recalculation cycle", + IDLE_TIMEOUT.as_secs() + ); + break; // End active cycle + } + } + + // Recalculation interval tick + _ = recalc_interval.tick() => { + if let Err(e) = library.recalculate_statistics().await { + warn!( + library_id = %library_id, + library_name = %library_name, + error = %e, + "Failed to trigger periodic statistics recalculation" + ); + } else { + debug!( + library_id = %library_id, + library_name = %library_name, + event_count = event_count, + "Triggered periodic statistics recalculation" + ); + } + } + + // Listen for events + result = subscriber.recv() => { + match result { + Ok(event) => { + if is_resource_changed_event(&event) { + last_event_time = tokio::time::Instant::now(); + event_count += 1; + + // Log every 100 events to show activity without spam + if event_count % 100 == 0 { + debug!( + library_id = %library_id, + library_name = %library_name, + event_count = event_count, + "Processed {} ResourceChanged events in this cycle", + event_count + ); + } + } + } + Err(RecvError::Lagged(skipped)) => { + warn!( + library_id = %library_id, + library_name = %library_name, + skipped = skipped, + "Event subscriber lagged during active recalculation" + ); + last_event_time = tokio::time::Instant::now(); + } + Err(RecvError::Closed) => { + info!( + library_id = %library_id, + library_name = %library_name, + "Event bus closed during active recalculation" + ); + return Err("Event bus closed".into()); + } + } + } + } + } + + Ok(()) +} + +/// Check if an event is a ResourceChanged event +fn is_resource_changed_event(event: &Event) -> bool { + matches!( + event, + Event::ResourceChanged { .. } | Event::ResourceChangedBatch { .. } + ) +} diff --git a/core/src/ops/libraries/info/output.rs b/core/src/ops/libraries/info/output.rs index 0a781c3f9..5344ef58b 100644 --- a/core/src/ops/libraries/info/output.rs +++ b/core/src/ops/libraries/info/output.rs @@ -1,6 +1,9 @@ //! Library information output types -use crate::library::config::{LibrarySettings, LibraryStatistics}; +use crate::{ + domain::resource::Identifiable, + library::config::{LibrarySettings, LibraryStatistics}, +}; use chrono::{DateTime, Utc}; use serde::{Deserialize, Serialize}; use specta::Type; @@ -34,3 +37,16 @@ pub struct LibraryInfoOutput { /// Library statistics pub statistics: LibraryStatistics, } + +impl Identifiable for LibraryInfoOutput { + fn id(&self) -> Uuid { + self.id + } + + fn resource_type() -> &'static str { + "library" + } + + // Simple resource with no sync dependencies or special merge behavior + // All default implementations are sufficient +}