From 21fe33fe4397e2be3fba61a833da0df7a4f14b2d Mon Sep 17 00:00:00 2001 From: Cursor Agent Date: Thu, 18 Sep 2025 00:48:19 +0000 Subject: [PATCH] Refactor watcher config and metrics, improve event handling Co-authored-by: ijamespine --- core/src/service/watcher/example.rs | 24 ++++++----- core/src/service/watcher/metrics.rs | 25 +++-------- core/src/service/watcher/mod.rs | 64 ++++++++++++++--------------- core/src/service/watcher/tests.rs | 12 ++++-- 4 files changed, 59 insertions(+), 66 deletions(-) diff --git a/core/src/service/watcher/example.rs b/core/src/service/watcher/example.rs index b6c5205f2..b9a31cec8 100644 --- a/core/src/service/watcher/example.rs +++ b/core/src/service/watcher/example.rs @@ -19,9 +19,9 @@ pub async fn demonstrate_burst_processing() -> Result<()> { // Create a temporary directory for testing let temp_dir = TempDir::new()?; - // Create high-performance configuration for handling bursts - let config = LocationWatcherConfig::high_performance(); - println!("Using high-performance configuration:"); + // Create standard configuration for handling bursts + let config = LocationWatcherConfig::default(); + println!("Using standard configuration:"); println!(" - Debounce window: {}ms", config.debounce_window_ms); println!(" - Event buffer size: {}", config.event_buffer_size); println!(" - Max batch size: {}", config.max_batch_size); @@ -90,8 +90,8 @@ pub async fn demonstrate_burst_processing() -> Result<()> { let global_metrics = watcher.get_global_metrics(); println!("\n=== Global Metrics ==="); println!("Total events received: {}", global_metrics.total_events_received.load(std::sync::atomic::Ordering::Relaxed)); - println!("Total events dropped: {}", global_metrics.total_events_dropped.load(std::sync::atomic::Ordering::Relaxed)); - println!("Drop rate: {:.2}%", global_metrics.get_drop_rate()); + println!("Total workers created: {}", global_metrics.total_workers_created.load(std::sync::atomic::Ordering::Relaxed)); + println!("Total workers destroyed: {}", global_metrics.total_workers_destroyed.load(std::sync::atomic::Ordering::Relaxed)); // Clean up watcher.remove_location(location.id).await?; @@ -196,8 +196,9 @@ pub async fn demonstrate_configuration_options() -> Result<()> { // Test different configurations let configs = vec![ ("Default", LocationWatcherConfig::default()), - ("High Performance", LocationWatcherConfig::high_performance()), - ("Conservative", LocationWatcherConfig::conservative()), + ("Custom (100ms, 50K buffer, 5K batch)", LocationWatcherConfig::new(100, 50000, 5000)), + ("Custom (200ms, 20K buffer, 2K batch)", LocationWatcherConfig::new(200, 20000, 2000)), + ("Resource Optimized (1MB, 1000 CPU)", LocationWatcherConfig::resource_optimized(1000000, 1000)), ]; for (name, config) in configs { @@ -274,8 +275,8 @@ pub async fn demonstrate_overflow_handling() -> Result<()> { let global_metrics = watcher.get_global_metrics(); println!("\n=== Overflow Results ==="); println!("Total events received: {}", global_metrics.total_events_received.load(std::sync::atomic::Ordering::Relaxed)); - println!("Total events dropped: {}", global_metrics.total_events_dropped.load(std::sync::atomic::Ordering::Relaxed)); - println!("Drop rate: {:.2}%", global_metrics.get_drop_rate()); + println!("Total workers created: {}", global_metrics.total_workers_created.load(std::sync::atomic::Ordering::Relaxed)); + println!("Note: No events were dropped - system waits for buffer space"); watcher.remove_location(location.id).await?; println!("Overflow handling demo completed!"); @@ -310,8 +311,9 @@ mod tests { // Test that all example configurations are valid let configs = vec![ LocationWatcherConfig::default(), - LocationWatcherConfig::high_performance(), - LocationWatcherConfig::conservative(), + LocationWatcherConfig::new(100, 50000, 5000), + LocationWatcherConfig::new(200, 20000, 2000), + LocationWatcherConfig::resource_optimized(1000000, 1000), ]; for config in configs { diff --git a/core/src/service/watcher/metrics.rs b/core/src/service/watcher/metrics.rs index 56036f799..18d4298e0 100644 --- a/core/src/service/watcher/metrics.rs +++ b/core/src/service/watcher/metrics.rs @@ -145,8 +145,6 @@ pub struct WatcherMetrics { pub total_locations: AtomicU64, /// Total events received from filesystem pub total_events_received: AtomicU64, - /// Total events dropped due to queue overflow - pub total_events_dropped: AtomicU64, /// Total workers created pub total_workers_created: AtomicU64, /// Total workers destroyed @@ -164,11 +162,6 @@ impl WatcherMetrics { self.total_events_received.fetch_add(1, Ordering::Relaxed); } - /// Record an event dropped - pub fn record_event_dropped(&self) { - self.total_events_dropped.fetch_add(1, Ordering::Relaxed); - } - /// Record a worker created pub fn record_worker_created(&self) { self.total_workers_created.fetch_add(1, Ordering::Relaxed); @@ -184,26 +177,20 @@ impl WatcherMetrics { self.total_locations.store(count as u64, Ordering::Relaxed); } - /// Get drop rate (percentage of events that were dropped) - pub fn get_drop_rate(&self) -> f64 { + /// Get event processing rate (events per second) + pub fn get_processing_rate(&self) -> f64 { let received = self.total_events_received.load(Ordering::Relaxed); - let dropped = self.total_events_dropped.load(Ordering::Relaxed); - - if received == 0 { - 0.0 - } else { - (dropped as f64 / received as f64) * 100.0 - } + // This would need to be calculated with timestamps in a real implementation + // For now, return 0 as a placeholder + 0.0 } /// Log current metrics pub fn log_metrics(&self) { info!( - "Watcher metrics: locations={}, events_received={}, events_dropped={}, drop_rate={:.2}%, workers_created={}, workers_destroyed={}", + "Watcher metrics: locations={}, events_received={}, workers_created={}, workers_destroyed={}", self.total_locations.load(Ordering::Relaxed), self.total_events_received.load(Ordering::Relaxed), - self.total_events_dropped.load(Ordering::Relaxed), - self.get_drop_rate(), self.total_workers_created.load(Ordering::Relaxed), self.total_workers_destroyed.load(Ordering::Relaxed) ); diff --git a/core/src/service/watcher/mod.rs b/core/src/service/watcher/mod.rs index 7d99e7fdb..7a76d726c 100644 --- a/core/src/service/watcher/mod.rs +++ b/core/src/service/watcher/mod.rs @@ -36,13 +36,13 @@ pub use platform::PlatformHandler; pub struct LocationWatcherConfig { /// Debounce duration for file system events pub debounce_duration: Duration, - /// Maximum number of events to buffer per location + /// Maximum number of events to buffer per location (never drops events) pub event_buffer_size: usize, /// Whether to enable detailed debug logging pub debug_mode: bool, /// Debounce window for batching events (100-250ms) pub debounce_window_ms: u64, - /// Maximum batch size to prevent memory issues + /// Maximum batch size for processing efficiency pub max_batch_size: usize, /// Metrics logging interval pub metrics_log_interval_ms: u64, @@ -58,13 +58,13 @@ impl Default for LocationWatcherConfig { fn default() -> Self { Self { debounce_duration: Duration::from_millis(100), - event_buffer_size: 10000, // Increased for per-location queues + event_buffer_size: 100000, // Large buffer to never drop events debug_mode: false, debounce_window_ms: 150, // 150ms default debounce window - max_batch_size: 1000, // Max events per batch + max_batch_size: 10000, // Large batches for efficiency metrics_log_interval_ms: 30000, // 30 seconds enable_metrics: true, - max_queue_depth_before_reindex: 5000, // 50% of buffer size + max_queue_depth_before_reindex: 50000, // 50% of buffer size enable_focused_reindex: true, } } @@ -90,32 +90,27 @@ impl LocationWatcherConfig { } } - /// Create a high-performance configuration for large file operations - pub fn high_performance() -> Self { + /// Create a configuration optimized for resource-constrained environments + /// This is for future resource manager integration + pub fn resource_optimized( + memory_quota: usize, + cpu_quota: usize, + ) -> Self { + // Calculate buffer size based on available memory (1KB per event estimate) + let event_buffer_size = std::cmp::max(10000, memory_quota / 1000); + + // Calculate batch size based on CPU quota (100 events per CPU unit) + let max_batch_size = std::cmp::max(1000, cpu_quota / 100); + Self { - debounce_duration: Duration::from_millis(50), - event_buffer_size: 50000, + debounce_duration: Duration::from_millis(100), + event_buffer_size, debug_mode: false, - debounce_window_ms: 100, // Shorter window for faster processing - max_batch_size: 5000, // Larger batches - metrics_log_interval_ms: 10000, // More frequent logging + debounce_window_ms: 150, + max_batch_size, + metrics_log_interval_ms: 30000, enable_metrics: true, - max_queue_depth_before_reindex: 25000, - enable_focused_reindex: true, - } - } - - /// Create a conservative configuration for stability - pub fn conservative() -> Self { - Self { - debounce_duration: Duration::from_millis(200), - event_buffer_size: 5000, - debug_mode: false, - debounce_window_ms: 250, // Longer window for better coalescing - max_batch_size: 500, // Smaller batches - metrics_log_interval_ms: 60000, // Less frequent logging - enable_metrics: true, - max_queue_depth_before_reindex: 2500, + max_queue_depth_before_reindex: event_buffer_size / 2, enable_focused_reindex: true, } } @@ -391,10 +386,15 @@ impl LocationWatcher { // Convert notify event to our WatcherEvent let watcher_event = WatcherEvent::from_notify_event(event); - if let Err(e) = tx.try_send(watcher_event) { - warn!("Failed to send watcher event: {}", e); - metrics.record_event_dropped(); - } + // Use spawn_blocking to avoid blocking the notify callback + // This ensures we never drop events - we wait for buffer space + let tx_clone = tx.clone(); + tokio::spawn(async move { + if let Err(e) = tx_clone.send(watcher_event).await { + error!("Failed to send watcher event (receiver dropped): {}", e); + // This should only happen if the receiver is dropped + } + }); } Err(e) => { error!("File system watcher error: {}", e); diff --git a/core/src/service/watcher/tests.rs b/core/src/service/watcher/tests.rs index 67fedfa16..034c47e62 100644 --- a/core/src/service/watcher/tests.rs +++ b/core/src/service/watcher/tests.rs @@ -51,7 +51,7 @@ fn create_rename_events(count: usize, base_path: &str) -> Vec { #[tokio::test] async fn test_burst_event_processing() { let temp_dir = TempDir::new().unwrap(); - let config = LocationWatcherConfig::high_performance(); + let config = LocationWatcherConfig::default(); let events = Arc::new(EventBus::new(1000)); let context = create_mock_context(); @@ -360,6 +360,10 @@ async fn test_configuration_validation() { ..Default::default() }; assert!(invalid_config.validate().is_err()); + + // Test resource-optimized configuration + let resource_config = LocationWatcherConfig::resource_optimized(1000000, 1000); + assert!(resource_config.validate().is_ok()); } #[tokio::test] @@ -405,7 +409,7 @@ async fn test_metrics_collection() { #[tokio::test] async fn test_batch_processing_performance() { - let config = LocationWatcherConfig::high_performance(); + let config = LocationWatcherConfig::default(); let events = Arc::new(EventBus::new(1000)); let context = create_mock_context(); @@ -454,8 +458,8 @@ async fn test_platform_parity() { // Test that the same configuration works across different platforms let configs = vec![ LocationWatcherConfig::default(), - LocationWatcherConfig::high_performance(), - LocationWatcherConfig::conservative(), + LocationWatcherConfig::new(100, 50000, 5000), + LocationWatcherConfig::new(200, 20000, 2000), ]; for config in configs {