mirror of
https://github.com/spacedriveapp/spacedrive.git
synced 2026-04-21 15:07:54 -04:00
chore: update submodule and add statistics listener for library management
- Updated the submodule reference in the landing app. - Added a statistics recalculation listener in the LibraryManager to monitor resource changes. - Included clipboard permissions in the capabilities configuration for enhanced functionality. - Improved the LibraryInfoOutput struct to implement the Identifiable trait for better resource management.
This commit is contained in:
@@ -1 +1 @@
|
||||
{"default":{"identifier":"default","description":"Default permissions for Spacedrive","local":true,"windows":["main"],"permissions":["core:default","core:event:allow-listen","core:event:allow-emit","core:window:allow-create","core:window:allow-close","core:window:allow-get-all-windows","core:webview:allow-create-webview-window","core:path:default","dialog:allow-open","dialog:allow-save","shell:allow-open","fs:allow-home-read-recursive"]}}
|
||||
{"default":{"identifier":"default","description":"Default permissions for Spacedrive","local":true,"windows":["main"],"permissions":["core:default","core:event:allow-listen","core:event:allow-emit","core:window:allow-create","core:window:allow-close","core:window:allow-get-all-windows","core:webview:allow-create-webview-window","core:path:default","dialog:allow-open","dialog:allow-save","shell:allow-open","fs:allow-home-read-recursive","clipboard-manager:allow-read-text","clipboard-manager:allow-write-text"]}}
|
||||
@@ -464,6 +464,14 @@ impl LibraryManager {
|
||||
libraries.insert(config.id, library.clone());
|
||||
}
|
||||
|
||||
// Spawn statistics recalculation listener
|
||||
// This listens for ResourceChanged events and recalculates statistics every 5 seconds
|
||||
// while events are flowing
|
||||
super::statistics_listener::spawn_statistics_listener(
|
||||
library.clone(),
|
||||
self.event_bus.clone(),
|
||||
);
|
||||
|
||||
// Initialize sidecar manager before resuming jobs
|
||||
if let Some(sidecar_manager) = context.get_sidecar_manager().await {
|
||||
if let Err(e) = sidecar_manager.init_library(&library).await {
|
||||
|
||||
@@ -8,6 +8,7 @@ pub(crate) mod config;
|
||||
mod error;
|
||||
mod lock;
|
||||
mod manager;
|
||||
mod statistics_listener;
|
||||
mod sync_helpers;
|
||||
|
||||
pub use config::{LibraryConfig, LibrarySettings, LibraryStatistics};
|
||||
|
||||
247
core/src/library/statistics_listener.rs
Normal file
247
core/src/library/statistics_listener.rs
Normal file
@@ -0,0 +1,247 @@
|
||||
//! Background listener that recalculates library statistics while ResourceEvents flow
|
||||
|
||||
use super::Library;
|
||||
use crate::infra::event::{Event, EventBus, EventSubscriber};
|
||||
use std::sync::Arc;
|
||||
use std::time::Duration;
|
||||
use tokio::sync::broadcast::error::RecvError;
|
||||
use tokio::time::{interval, sleep};
|
||||
use tracing::{debug, error, info, warn};
|
||||
|
||||
/// Duration to wait between recalculations while events are flowing
|
||||
const RECALCULATION_INTERVAL: Duration = Duration::from_secs(5);
|
||||
|
||||
/// Duration to wait for new events before stopping the listener
|
||||
/// If no events arrive within this window, we consider the "activity burst" finished
|
||||
const IDLE_TIMEOUT: Duration = Duration::from_secs(10);
|
||||
|
||||
/// Spawn a background task that recalculates library statistics periodically
|
||||
/// while ResourceChanged events are flowing through the event bus
|
||||
///
|
||||
/// The listener uses a throttling mechanism:
|
||||
/// - Recalculates at most every 5 seconds while events are flowing
|
||||
/// - Stops recalculating after 10 seconds of no events
|
||||
/// - Automatically restarts when new events arrive
|
||||
pub fn spawn_statistics_listener(library: Arc<Library>, event_bus: Arc<EventBus>) {
|
||||
let library_id = library.id();
|
||||
let library_name = tokio::task::block_in_place(|| {
|
||||
tokio::runtime::Handle::current().block_on(async { library.name().await })
|
||||
});
|
||||
|
||||
info!(
|
||||
library_id = %library_id,
|
||||
library_name = %library_name,
|
||||
"Spawning statistics recalculation listener"
|
||||
);
|
||||
|
||||
tokio::spawn(async move {
|
||||
let mut subscriber = event_bus.subscribe();
|
||||
|
||||
// Wait for first ResourceChanged event to start
|
||||
loop {
|
||||
match subscriber.recv().await {
|
||||
Ok(event) => {
|
||||
if is_resource_changed_event(&event) {
|
||||
debug!(
|
||||
library_id = %library_id,
|
||||
library_name = %library_name,
|
||||
"First ResourceChanged event detected, starting active recalculation mode"
|
||||
);
|
||||
break;
|
||||
}
|
||||
}
|
||||
Err(RecvError::Lagged(skipped)) => {
|
||||
warn!(
|
||||
library_id = %library_id,
|
||||
library_name = %library_name,
|
||||
skipped = skipped,
|
||||
"Event subscriber lagged, some events were skipped"
|
||||
);
|
||||
// Continue listening
|
||||
}
|
||||
Err(RecvError::Closed) => {
|
||||
info!(
|
||||
library_id = %library_id,
|
||||
library_name = %library_name,
|
||||
"Event bus closed, statistics listener shutting down"
|
||||
);
|
||||
return;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Main loop: active recalculation while events are flowing
|
||||
loop {
|
||||
if let Err(e) =
|
||||
run_active_recalculation_cycle(&library, &mut subscriber, library_id, &library_name)
|
||||
.await
|
||||
{
|
||||
error!(
|
||||
library_id = %library_id,
|
||||
library_name = %library_name,
|
||||
error = %e,
|
||||
"Error in statistics recalculation cycle"
|
||||
);
|
||||
}
|
||||
|
||||
// After an active cycle ends (idle timeout), wait for next ResourceChanged event
|
||||
debug!(
|
||||
library_id = %library_id,
|
||||
library_name = %library_name,
|
||||
"Active recalculation cycle ended, waiting for next ResourceChanged event"
|
||||
);
|
||||
|
||||
loop {
|
||||
match subscriber.recv().await {
|
||||
Ok(event) => {
|
||||
if is_resource_changed_event(&event) {
|
||||
debug!(
|
||||
library_id = %library_id,
|
||||
library_name = %library_name,
|
||||
"New ResourceChanged event detected, restarting active recalculation"
|
||||
);
|
||||
break; // Restart active recalculation
|
||||
}
|
||||
}
|
||||
Err(RecvError::Lagged(skipped)) => {
|
||||
warn!(
|
||||
library_id = %library_id,
|
||||
library_name = %library_name,
|
||||
skipped = skipped,
|
||||
"Event subscriber lagged during idle wait"
|
||||
);
|
||||
}
|
||||
Err(RecvError::Closed) => {
|
||||
info!(
|
||||
library_id = %library_id,
|
||||
library_name = %library_name,
|
||||
"Event bus closed, statistics listener shutting down"
|
||||
);
|
||||
return;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
/// Run one active recalculation cycle while events are flowing
|
||||
///
|
||||
/// Recalculates statistics every 5 seconds while ResourceChanged events arrive.
|
||||
/// Returns when no events have been received for 10 seconds (idle timeout).
|
||||
async fn run_active_recalculation_cycle(
|
||||
library: &Arc<Library>,
|
||||
subscriber: &mut EventSubscriber,
|
||||
library_id: uuid::Uuid,
|
||||
library_name: &str,
|
||||
) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
|
||||
let mut recalc_interval = interval(RECALCULATION_INTERVAL);
|
||||
recalc_interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
|
||||
|
||||
// Trigger immediate recalculation at start of active cycle
|
||||
if let Err(e) = library.recalculate_statistics().await {
|
||||
warn!(
|
||||
library_id = %library_id,
|
||||
library_name = %library_name,
|
||||
error = %e,
|
||||
"Failed to trigger statistics recalculation"
|
||||
);
|
||||
} else {
|
||||
debug!(
|
||||
library_id = %library_id,
|
||||
library_name = %library_name,
|
||||
"Triggered statistics recalculation at start of active cycle"
|
||||
);
|
||||
}
|
||||
|
||||
let mut last_event_time = tokio::time::Instant::now();
|
||||
let mut event_count = 0u64;
|
||||
|
||||
loop {
|
||||
tokio::select! {
|
||||
// Check for idle timeout
|
||||
_ = sleep(IDLE_TIMEOUT) => {
|
||||
let elapsed_since_last_event = tokio::time::Instant::now() - last_event_time;
|
||||
if elapsed_since_last_event >= IDLE_TIMEOUT {
|
||||
info!(
|
||||
library_id = %library_id,
|
||||
library_name = %library_name,
|
||||
event_count = event_count,
|
||||
"No events for {} seconds, ending active recalculation cycle",
|
||||
IDLE_TIMEOUT.as_secs()
|
||||
);
|
||||
break; // End active cycle
|
||||
}
|
||||
}
|
||||
|
||||
// Recalculation interval tick
|
||||
_ = recalc_interval.tick() => {
|
||||
if let Err(e) = library.recalculate_statistics().await {
|
||||
warn!(
|
||||
library_id = %library_id,
|
||||
library_name = %library_name,
|
||||
error = %e,
|
||||
"Failed to trigger periodic statistics recalculation"
|
||||
);
|
||||
} else {
|
||||
debug!(
|
||||
library_id = %library_id,
|
||||
library_name = %library_name,
|
||||
event_count = event_count,
|
||||
"Triggered periodic statistics recalculation"
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
// Listen for events
|
||||
result = subscriber.recv() => {
|
||||
match result {
|
||||
Ok(event) => {
|
||||
if is_resource_changed_event(&event) {
|
||||
last_event_time = tokio::time::Instant::now();
|
||||
event_count += 1;
|
||||
|
||||
// Log every 100 events to show activity without spam
|
||||
if event_count % 100 == 0 {
|
||||
debug!(
|
||||
library_id = %library_id,
|
||||
library_name = %library_name,
|
||||
event_count = event_count,
|
||||
"Processed {} ResourceChanged events in this cycle",
|
||||
event_count
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
Err(RecvError::Lagged(skipped)) => {
|
||||
warn!(
|
||||
library_id = %library_id,
|
||||
library_name = %library_name,
|
||||
skipped = skipped,
|
||||
"Event subscriber lagged during active recalculation"
|
||||
);
|
||||
last_event_time = tokio::time::Instant::now();
|
||||
}
|
||||
Err(RecvError::Closed) => {
|
||||
info!(
|
||||
library_id = %library_id,
|
||||
library_name = %library_name,
|
||||
"Event bus closed during active recalculation"
|
||||
);
|
||||
return Err("Event bus closed".into());
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Check if an event is a ResourceChanged event
|
||||
fn is_resource_changed_event(event: &Event) -> bool {
|
||||
matches!(
|
||||
event,
|
||||
Event::ResourceChanged { .. } | Event::ResourceChangedBatch { .. }
|
||||
)
|
||||
}
|
||||
@@ -1,6 +1,9 @@
|
||||
//! Library information output types
|
||||
|
||||
use crate::library::config::{LibrarySettings, LibraryStatistics};
|
||||
use crate::{
|
||||
domain::resource::Identifiable,
|
||||
library::config::{LibrarySettings, LibraryStatistics},
|
||||
};
|
||||
use chrono::{DateTime, Utc};
|
||||
use serde::{Deserialize, Serialize};
|
||||
use specta::Type;
|
||||
@@ -34,3 +37,16 @@ pub struct LibraryInfoOutput {
|
||||
/// Library statistics
|
||||
pub statistics: LibraryStatistics,
|
||||
}
|
||||
|
||||
impl Identifiable for LibraryInfoOutput {
|
||||
fn id(&self) -> Uuid {
|
||||
self.id
|
||||
}
|
||||
|
||||
fn resource_type() -> &'static str {
|
||||
"library"
|
||||
}
|
||||
|
||||
// Simple resource with no sync dependencies or special merge behavior
|
||||
// All default implementations are sufficient
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user