Enhance filesystem watching and integrate new sound effects

- Introduced a new filesystem watcher service, replacing the previous location watcher with a more robust and platform-agnostic implementation.
- Updated the core context to include the new filesystem watcher and refactored related services for better integration.
- Added support for ephemeral event handling, allowing real-time updates for non-persistent locations.
- Integrated new sound effects for pairing operations in the UI, enhancing user experience during device pairing.
- Updated the File Operation Modal to support both copy and move operations with improved conflict resolution options.
- Refactored related components to ensure consistency and improved performance across the application.
This commit is contained in:
Jamie Pine
2025-12-09 15:19:27 -08:00
parent 591c7461a4
commit 38136ab5a7
49 changed files with 5074 additions and 1547 deletions

View File

@@ -28,8 +28,13 @@ cargo run --bin sd-cli -- <command> # Run CLI (binary is sd-cli, not spaced
- Using `println!` instead of `tracing` macros (`info!`, `debug!`, etc)
- Implementing `Wire` manually instead of using `register_*` macros
- Blocking the async runtime with synchronous I/O operations
### Quick tips
- On frontend apps, such as the interface in React, you must ALWAYS ensure type-safety based on the auto generated TypeScript types from `ts-client`. Never cast to as any or redefine backend types. our hooks are typesafe with correct input/output types, but sometimes you might need to access types directly from the `ts-client`.
- If you have changed types on the backend that are public to the frontend (have `Type` derive), then you must regenerate the types using `cargo run --bin generate_typescript_types`
- Read the `.mdx` files in /docs for context on any part of the app, they are kept up to date.
-
## Architecture Overview

BIN
Cargo.lock generated
View File

Binary file not shown.

View File

Binary file not shown.

View File

@@ -115,6 +115,7 @@ sd-task-system = { path = "../crates/task-system" }
blurhash = "0.2"
image = "0.25"
sd-ffmpeg = { path = "../crates/ffmpeg", optional = true }
sd-fs-watcher = { path = "../crates/fs-watcher" }
sd-images = { path = "../crates/images" }
sd-media-metadata = { path = "../crates/media-metadata" }
tokio-rustls = "0.26"

View File

@@ -48,8 +48,8 @@ pub struct ServiceConfig {
/// Whether volume monitoring is enabled
pub volume_monitoring_enabled: bool,
/// Whether location watcher is enabled
pub location_watcher_enabled: bool,
/// Whether filesystem watcher is enabled
pub fs_watcher_enabled: bool,
}
impl Default for ServiceConfig {
@@ -57,7 +57,7 @@ impl Default for ServiceConfig {
Self {
networking_enabled: true,
volume_monitoring_enabled: true,
location_watcher_enabled: true,
fs_watcher_enabled: true,
}
}
}

View File

@@ -5,7 +5,8 @@ use crate::{
infra::action::manager::ActionManager, infra::event::EventBus, infra::sync::TransactionManager,
library::LibraryManager, ops::indexing::ephemeral::EphemeralIndexCache,
service::network::NetworkingService, service::session::SessionStateService,
service::sidecar_manager::SidecarManager, volume::VolumeManager,
service::sidecar_manager::SidecarManager, service::watcher::FsWatcherService,
volume::VolumeManager,
};
use std::{path::PathBuf, sync::Arc};
use tokio::sync::{Mutex, RwLock};
@@ -22,7 +23,7 @@ pub struct CoreContext {
pub action_manager: Arc<RwLock<Option<Arc<ActionManager>>>>,
pub networking: Arc<RwLock<Option<Arc<NetworkingService>>>>,
pub plugin_manager: Arc<RwLock<Option<Arc<RwLock<crate::infra::extension::PluginManager>>>>>,
pub location_watcher: Arc<RwLock<Option<Arc<crate::service::watcher::LocationWatcher>>>>,
pub fs_watcher: Arc<RwLock<Option<Arc<FsWatcherService>>>>,
// Ephemeral index cache for unmanaged paths
pub ephemeral_index_cache: Arc<EphemeralIndexCache>,
// Job logging configuration
@@ -50,7 +51,7 @@ impl CoreContext {
action_manager: Arc::new(RwLock::new(None)),
networking: Arc::new(RwLock::new(None)),
plugin_manager: Arc::new(RwLock::new(None)),
location_watcher: Arc::new(RwLock::new(None)),
fs_watcher: Arc::new(RwLock::new(None)),
ephemeral_index_cache: Arc::new(
EphemeralIndexCache::new().expect("Failed to create ephemeral index cache"),
),
@@ -102,14 +103,14 @@ impl CoreContext {
*self.networking.write().await = Some(networking);
}
/// Helper method for services to get the location watcher
pub async fn get_location_watcher(&self) -> Option<Arc<crate::service::watcher::LocationWatcher>> {
self.location_watcher.read().await.clone()
/// Helper method for services to get the filesystem watcher
pub async fn get_fs_watcher(&self) -> Option<Arc<FsWatcherService>> {
self.fs_watcher.read().await.clone()
}
/// Method for Core to set location watcher after it's initialized
pub async fn set_location_watcher(&self, watcher: Arc<crate::service::watcher::LocationWatcher>) {
*self.location_watcher.write().await = Some(watcher);
/// Method for Core to set filesystem watcher after it's initialized
pub async fn set_fs_watcher(&self, watcher: Arc<FsWatcherService>) {
*self.fs_watcher.write().await = Some(watcher);
}
/// Helper method to get the action manager

View File

@@ -185,10 +185,8 @@ impl Core {
.set_sidecar_manager(services.sidecar_manager.clone())
.await;
// Set location watcher in context so it can be accessed by jobs (for ephemeral watch registration)
context
.set_location_watcher(services.location_watcher.clone())
.await;
// Set filesystem watcher in context so it can be accessed by jobs (for ephemeral watch registration)
context.set_fs_watcher(services.fs_watcher.clone()).await;
// Auto-load all libraries with context for job manager initialization
info!("Loading existing libraries...");
@@ -229,6 +227,23 @@ impl Core {
info!("Library filesystem watcher started");
}
// Load locations from all libraries into the filesystem watcher
for library in &loaded_libraries {
info!("Loading locations for library {}", library.id());
match services.fs_watcher.load_library_locations(library).await {
Ok(count) => {
info!("Loaded {} locations from library {}", count, library.id());
}
Err(e) => {
error!(
"Failed to load locations for library {}: {}",
library.id(),
e
);
}
}
}
// Initialize sidecar manager for each loaded library
for library in &loaded_libraries {
info!("Initializing sidecar manager for library {}", library.id());

View File

@@ -193,10 +193,10 @@ pub async fn build_dir_entry(
/// creates, and finally modifies.
pub async fn apply_batch<H: ChangeHandler>(
handler: &mut H,
events: Vec<crate::infra::event::FsRawEventKind>,
events: Vec<sd_fs_watcher::FsEvent>,
config: &ChangeConfig<'_>,
) -> Result<()> {
use crate::infra::event::FsRawEventKind;
use sd_fs_watcher::FsEventKind;
if events.is_empty() {
return Ok(());
@@ -208,11 +208,11 @@ pub async fn apply_batch<H: ChangeHandler>(
let mut renames = Vec::new();
for event in events {
match event {
FsRawEventKind::Create { path } => creates.push(path),
FsRawEventKind::Modify { path } => modifies.push(path),
FsRawEventKind::Remove { path } => removes.push(path),
FsRawEventKind::Rename { from, to } => renames.push((from, to)),
match event.kind {
FsEventKind::Create => creates.push(event.path),
FsEventKind::Modify => modifies.push(event.path),
FsEventKind::Remove => removes.push(event.path),
FsEventKind::Rename { from, to } => renames.push((from, to)),
}
}

View File

@@ -13,7 +13,7 @@ use uuid::Uuid;
///
/// This enum represents changes that can come from either:
/// - The `ChangeDetector` during batch indexing scans
/// - The file watcher via `FsRawEventKind` conversion
/// - The file watcher via `FsEvent` conversion
#[derive(Debug, Clone)]
pub enum Change {
/// New file/directory (not in storage).
@@ -60,24 +60,24 @@ impl Change {
}
}
/// Create a Change from an FsRawEventKind (for watcher integration).
/// Create a Change from an FsEvent (for watcher integration).
/// Note: These variants don't have entry_ids since they come from the watcher.
pub fn from_fs_event(event: crate::infra::event::FsRawEventKind) -> Self {
use crate::infra::event::FsRawEventKind;
pub fn from_fs_event(event: sd_fs_watcher::FsEvent) -> Self {
use sd_fs_watcher::FsEventKind;
match event {
FsRawEventKind::Create { path } => Change::New(path),
FsRawEventKind::Modify { path } => Change::Modified {
path,
match event.kind {
FsEventKind::Create => Change::New(event.path),
FsEventKind::Modify => Change::Modified {
path: event.path,
entry_id: 0, // Placeholder - handler will look up real ID
old_modified: None,
new_modified: None,
},
FsRawEventKind::Remove { path } => Change::Deleted {
path,
FsEventKind::Remove => Change::Deleted {
path: event.path,
entry_id: 0, // Placeholder - handler will look up real ID
},
FsRawEventKind::Rename { from, to } => Change::Moved {
FsEventKind::Rename { from, to } => Change::Moved {
old_path: from,
new_path: to,
entry_id: 0, // Placeholder - handler will look up real ID

View File

@@ -11,15 +11,15 @@
//!
//! // Check if an event should be handled by the ephemeral system
//! if let Some(root) = responder::find_ephemeral_root(&path, &context) {
//! responder::process_event(&context, &root, event_kind).await?;
//! responder::process_event(&context, &root, event).await?;
//! }
//! ```
use crate::context::CoreContext;
use crate::infra::event::FsRawEventKind;
use crate::ops::indexing::change_detection::{self, ChangeConfig};
use crate::ops::indexing::rules::RuleToggles;
use anyhow::Result;
use sd_fs_watcher::{FsEvent, FsEventKind};
use std::path::{Path, PathBuf};
use std::sync::Arc;
@@ -34,16 +34,16 @@ pub fn find_ephemeral_root(path: &Path, context: &CoreContext) -> Option<PathBuf
/// Check if any path in a batch of events falls under an ephemeral watched directory.
pub fn find_ephemeral_root_for_events(
events: &[FsRawEventKind],
events: &[FsEvent],
context: &CoreContext,
) -> Option<PathBuf> {
let paths: Vec<&Path> = events
.iter()
.flat_map(|e| match e {
FsRawEventKind::Create { path } => vec![path.as_path()],
FsRawEventKind::Modify { path } => vec![path.as_path()],
FsRawEventKind::Remove { path } => vec![path.as_path()],
FsRawEventKind::Rename { from, to } => vec![from.as_path(), to.as_path()],
.flat_map(|e| match &e.kind {
FsEventKind::Create => vec![e.path.as_path()],
FsEventKind::Modify => vec![e.path.as_path()],
FsEventKind::Remove => vec![e.path.as_path()],
FsEventKind::Rename { from, to } => vec![from.as_path(), to.as_path()],
})
.collect();
@@ -60,13 +60,20 @@ pub fn find_ephemeral_root_for_events(
pub async fn apply_batch(
context: &Arc<CoreContext>,
root_path: &Path,
events: Vec<FsRawEventKind>,
events: Vec<FsEvent>,
rule_toggles: RuleToggles,
) -> Result<()> {
if events.is_empty() {
tracing::debug!("ephemeral::responder::apply_batch() called with empty events");
return Ok(());
}
tracing::debug!(
"ephemeral::responder::apply_batch() processing {} events for root: {}",
events.len(),
root_path.display()
);
let index = context.ephemeral_cache().get_global_index();
let event_bus = context.events.clone();
@@ -85,9 +92,14 @@ pub async fn apply_batch(
pub async fn apply(
context: &Arc<CoreContext>,
root_path: &Path,
event: FsRawEventKind,
event: FsEvent,
rule_toggles: RuleToggles,
) -> Result<()> {
tracing::debug!(
"ephemeral::responder::apply() called for root: {}, event: {:?}",
root_path.display(),
event
);
apply_batch(context, root_path, vec![event], rule_toggles).await
}

View File

@@ -469,3 +469,4 @@ mod tests {
}
}

View File

@@ -146,13 +146,28 @@ impl ChangeHandler for MemoryAdapter {
let entry_uuid = Uuid::new_v4();
let entry_metadata = EntryMetadata::from(metadata.clone());
tracing::debug!(
"MemoryAdapter::create() called for path: {}",
metadata.path.display()
);
let (entry_id, content_kind) = self
.add_entry_internal(&metadata.path, entry_uuid, entry_metadata.clone())
.await?;
if let Some(content_kind) = content_kind {
tracing::debug!(
"Emitting ResourceChanged for ephemeral create: {} (content_kind: {:?})",
metadata.path.display(),
content_kind
);
self.emit_resource_changed(entry_uuid, &metadata.path, &entry_metadata, content_kind)
.await;
} else {
tracing::warn!(
"No content_kind for ephemeral entry, skipping ResourceChanged: {}",
metadata.path.display()
);
}
Ok(EntryRef {

View File

@@ -0,0 +1,177 @@
//! Ephemeral event handler
//!
//! Subscribes to filesystem events and routes them to the ephemeral responder
//! for in-memory index updates. Used for browsing external drives, network shares,
//! and other non-persistent locations.
//!
//! ## Characteristics
//!
//! - **Shallow watching**: Only processes events for immediate children of watched directories
//! - **No batching**: Memory writes are fast, events processed immediately
//! - **Session-based**: Events only processed for active browsing sessions
use crate::context::CoreContext;
use crate::ops::indexing::ephemeral::responder;
use crate::ops::indexing::rules::RuleToggles;
use crate::service::watcher::FsWatcherService;
use anyhow::Result;
use sd_fs_watcher::FsEvent;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use tokio::sync::{broadcast, RwLock};
use tracing::{debug, error, trace, warn};
/// Handler for ephemeral (in-memory) filesystem events
///
/// Subscribes to `FsWatcher` events and routes matching events to the
/// ephemeral responder for immediate in-memory updates.
pub struct EphemeralEventHandler {
/// Core context (contains ephemeral_cache)
context: Arc<CoreContext>,
/// Reference to the filesystem watcher service (set via connect())
fs_watcher: RwLock<Option<Arc<FsWatcherService>>>,
/// Whether the handler is running
is_running: Arc<AtomicBool>,
/// Default rule toggles for filtering
rule_toggles: RuleToggles,
}
impl EphemeralEventHandler {
/// Create a new ephemeral event handler (unconnected)
///
/// Call `connect()` to attach to a FsWatcherService before starting.
pub fn new_unconnected(context: Arc<CoreContext>) -> Self {
Self {
context,
fs_watcher: RwLock::new(None),
is_running: Arc::new(AtomicBool::new(false)),
rule_toggles: RuleToggles::default(),
}
}
/// Create a new ephemeral event handler (connected)
pub fn new(context: Arc<CoreContext>, fs_watcher: Arc<FsWatcherService>) -> Self {
Self {
context,
fs_watcher: RwLock::new(Some(fs_watcher)),
is_running: Arc::new(AtomicBool::new(false)),
rule_toggles: RuleToggles::default(),
}
}
/// Connect to a FsWatcherService
pub fn connect(&self, fs_watcher: Arc<FsWatcherService>) {
// Use blocking_write since this is called during init, not async context
*self.fs_watcher.blocking_write() = Some(fs_watcher);
}
/// Start the event handler
///
/// Spawns a task that subscribes to filesystem events and routes
/// matching events to the ephemeral responder.
pub async fn start(&self) -> Result<()> {
if self.is_running.swap(true, Ordering::SeqCst) {
warn!("EphemeralEventHandler is already running");
return Ok(());
}
let fs_watcher = self.fs_watcher.read().await.clone();
let Some(fs_watcher) = fs_watcher else {
return Err(anyhow::anyhow!(
"EphemeralEventHandler not connected to FsWatcherService"
));
};
debug!("Starting EphemeralEventHandler");
let mut rx = fs_watcher.subscribe();
let context = self.context.clone();
let rule_toggles = self.rule_toggles;
let is_running = self.is_running.clone();
tokio::spawn(async move {
debug!("EphemeralEventHandler task started");
while is_running.load(Ordering::SeqCst) {
match rx.recv().await {
Ok(event) => {
if let Err(e) = Self::handle_event(&context, &event, rule_toggles).await {
error!("Error handling ephemeral event: {}", e);
}
}
Err(broadcast::error::RecvError::Lagged(n)) => {
warn!("EphemeralEventHandler lagged by {} events", n);
// Continue processing - we'll catch up
}
Err(broadcast::error::RecvError::Closed) => {
debug!("FsWatcher channel closed, stopping EphemeralEventHandler");
break;
}
}
}
debug!("EphemeralEventHandler task stopped");
});
Ok(())
}
/// Stop the event handler
pub fn stop(&self) {
debug!("Stopping EphemeralEventHandler");
self.is_running.store(false, Ordering::SeqCst);
}
/// Check if the handler is running
pub fn is_running(&self) -> bool {
self.is_running.load(Ordering::SeqCst)
}
/// Handle a single filesystem event
///
/// Checks if the event's path is under an ephemeral watched directory.
/// For shallow watches, only processes events for immediate children.
async fn handle_event(
context: &Arc<CoreContext>,
event: &FsEvent,
rule_toggles: RuleToggles,
) -> Result<()> {
// Get the parent directory of the event path
let Some(parent) = event.path.parent() else {
trace!("Event path has no parent: {}", event.path.display());
return Ok(());
};
// Check if the parent is being watched (shallow watch = immediate children only)
let watched_paths = context.ephemeral_cache().watched_paths();
// Find if any watched path matches the parent
let matching_root = watched_paths.iter().find(|watched| {
// For shallow watches, parent must exactly match the watched path
parent == watched.as_path()
});
let Some(root_path) = matching_root else {
// Not under any ephemeral watch
trace!("Event not under ephemeral watch: {}", event.path.display());
return Ok(());
};
debug!(
"Ephemeral event matched: {} (root: {})",
event.path.display(),
root_path.display()
);
// Pass FsEvent directly to responder
responder::apply(context, root_path, event.clone(), rule_toggles).await
}
}
#[cfg(test)]
mod tests {
use super::*;
// Integration tests would require full context setup
// The handler logic is straightforward - subscribe, filter, route
}

View File

@@ -0,0 +1,10 @@
//! Event handlers for filesystem changes
//!
//! These handlers subscribe to `FsWatcher` events and route them to the
//! appropriate storage layer (database for persistent, memory for ephemeral).
mod ephemeral;
mod persistent;
pub use ephemeral::EphemeralEventHandler;
pub use persistent::{LocationMeta, PersistentEventHandler};

View File

@@ -0,0 +1,383 @@
//! Persistent event handler
//!
//! Subscribes to filesystem events and routes them to location workers
//! for batched database persistence. Used for indexed locations.
//!
//! ## Characteristics
//!
//! - **Recursive watching**: Processes events for entire directory trees
//! - **Batching**: Events are collected and processed in batches for efficiency
//! - **Location-scoped**: Events are routed to the appropriate location's worker
use crate::context::CoreContext;
use crate::ops::indexing::responder;
use crate::ops::indexing::rules::RuleToggles;
use crate::service::watcher::FsWatcherService;
use anyhow::Result;
use sd_fs_watcher::FsEvent;
use std::collections::HashMap;
use std::path::PathBuf;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use std::time::{Duration, Instant};
use tokio::sync::{broadcast, mpsc, RwLock};
use tracing::{debug, error, info, trace, warn};
use uuid::Uuid;
/// Metadata for a watched location
#[derive(Debug, Clone)]
pub struct LocationMeta {
/// Location UUID
pub id: Uuid,
/// Library UUID this location belongs to
pub library_id: Uuid,
/// Root path of the location
pub root_path: PathBuf,
/// Indexing rule toggles
pub rule_toggles: RuleToggles,
}
/// Configuration for the persistent event handler
#[derive(Debug, Clone)]
pub struct PersistentHandlerConfig {
/// Debounce window for batching events (ms)
pub debounce_window_ms: u64,
/// Maximum batch size
pub max_batch_size: usize,
/// Worker channel buffer size
pub worker_buffer_size: usize,
}
impl Default for PersistentHandlerConfig {
fn default() -> Self {
Self {
debounce_window_ms: 150,
max_batch_size: 10000,
worker_buffer_size: 100000,
}
}
}
/// Handler for persistent (database-backed) filesystem events
///
/// Subscribes to `FsWatcher` events, filters by location scope,
/// and routes to per-location workers for batched processing.
pub struct PersistentEventHandler {
/// Core context for database access
context: Arc<CoreContext>,
/// Reference to the filesystem watcher service (set via connect())
fs_watcher: RwLock<Option<Arc<FsWatcherService>>>,
/// Registered locations (root_path -> meta)
locations: Arc<RwLock<HashMap<PathBuf, LocationMeta>>>,
/// Per-location worker channels
workers: Arc<RwLock<HashMap<Uuid, mpsc::Sender<FsEvent>>>>,
/// Whether the handler is running
is_running: Arc<AtomicBool>,
/// Configuration
config: PersistentHandlerConfig,
}
impl PersistentEventHandler {
/// Create a new persistent event handler (unconnected)
///
/// Call `connect()` to attach to a FsWatcherService before starting.
pub fn new_unconnected(context: Arc<CoreContext>) -> Self {
Self {
context,
fs_watcher: RwLock::new(None),
locations: Arc::new(RwLock::new(HashMap::new())),
workers: Arc::new(RwLock::new(HashMap::new())),
is_running: Arc::new(AtomicBool::new(false)),
config: PersistentHandlerConfig::default(),
}
}
/// Create a new persistent event handler (connected)
pub fn new(context: Arc<CoreContext>, fs_watcher: Arc<FsWatcherService>) -> Self {
Self {
context,
fs_watcher: RwLock::new(Some(fs_watcher)),
locations: Arc::new(RwLock::new(HashMap::new())),
workers: Arc::new(RwLock::new(HashMap::new())),
is_running: Arc::new(AtomicBool::new(false)),
config: PersistentHandlerConfig::default(),
}
}
/// Connect to a FsWatcherService
pub fn connect(&self, fs_watcher: Arc<FsWatcherService>) {
*self.fs_watcher.blocking_write() = Some(fs_watcher);
}
/// Register a location for persistent indexing
pub async fn add_location(&self, meta: LocationMeta) -> Result<()> {
let location_id = meta.id;
let root_path = meta.root_path.clone();
info!(
"Registering location {} at {}",
location_id,
root_path.display()
);
// Add to locations map
{
let mut locations = self.locations.write().await;
locations.insert(root_path.clone(), meta.clone());
}
// Create worker if handler is running
if self.is_running.load(Ordering::SeqCst) {
self.ensure_worker(meta).await?;
}
// Register path with FsWatcher if connected
if let Some(fs_watcher) = self.fs_watcher.read().await.as_ref() {
fs_watcher
.watch_path(&root_path, sd_fs_watcher::WatchConfig::recursive())
.await?;
}
Ok(())
}
/// Unregister a location
pub async fn remove_location(&self, location_id: Uuid) -> Result<()> {
info!("Unregistering location {}", location_id);
// Find and remove the location
let root_path = {
let mut locations = self.locations.write().await;
let path = locations
.iter()
.find(|(_, meta)| meta.id == location_id)
.map(|(path, _)| path.clone());
if let Some(path) = &path {
locations.remove(path);
}
path
};
// Remove worker
{
let mut workers = self.workers.write().await;
workers.remove(&location_id);
}
// Unwatch path if connected
if let Some(path) = root_path {
if let Some(fs_watcher) = self.fs_watcher.read().await.as_ref() {
if let Err(e) = fs_watcher.unwatch_path(&path).await {
warn!("Failed to unwatch path {}: {}", path.display(), e);
}
}
}
Ok(())
}
/// Get all registered locations
pub async fn locations(&self) -> Vec<LocationMeta> {
self.locations.read().await.values().cloned().collect()
}
/// Start the event handler
pub async fn start(&self) -> Result<()> {
if self.is_running.swap(true, Ordering::SeqCst) {
warn!("PersistentEventHandler is already running");
return Ok(());
}
let fs_watcher = self.fs_watcher.read().await.clone();
let Some(fs_watcher) = fs_watcher else {
return Err(anyhow::anyhow!(
"PersistentEventHandler not connected to FsWatcherService"
));
};
debug!("Starting PersistentEventHandler");
// Create workers for all registered locations
let locations: Vec<LocationMeta> = self.locations.read().await.values().cloned().collect();
for meta in locations {
self.ensure_worker(meta).await?;
}
// Start the event routing task
let mut rx = fs_watcher.subscribe();
let locations = self.locations.clone();
let workers = self.workers.clone();
let is_running = self.is_running.clone();
tokio::spawn(async move {
debug!("PersistentEventHandler routing task started");
while is_running.load(Ordering::SeqCst) {
match rx.recv().await {
Ok(event) => {
if let Err(e) = Self::route_event(&event, &locations, &workers).await {
error!("Error routing persistent event: {}", e);
}
}
Err(broadcast::error::RecvError::Lagged(n)) => {
warn!("PersistentEventHandler lagged by {} events", n);
}
Err(broadcast::error::RecvError::Closed) => {
debug!("FsWatcher channel closed, stopping PersistentEventHandler");
break;
}
}
}
debug!("PersistentEventHandler routing task stopped");
});
Ok(())
}
/// Stop the event handler
pub async fn stop(&self) {
debug!("Stopping PersistentEventHandler");
self.is_running.store(false, Ordering::SeqCst);
// Clear workers (dropping senders will stop worker tasks)
let mut workers = self.workers.write().await;
workers.clear();
}
/// Check if the handler is running
pub fn is_running(&self) -> bool {
self.is_running.load(Ordering::SeqCst)
}
/// Ensure a worker exists for a location
async fn ensure_worker(&self, meta: LocationMeta) -> Result<()> {
let mut workers = self.workers.write().await;
if workers.contains_key(&meta.id) {
return Ok(());
}
debug!("Creating worker for location {}", meta.id);
let (tx, rx) = mpsc::channel(self.config.worker_buffer_size);
workers.insert(meta.id, tx);
// Spawn worker task
let context = self.context.clone();
let config = self.config.clone();
tokio::spawn(async move {
if let Err(e) = Self::run_worker(rx, meta, context, config).await {
error!("Location worker failed: {}", e);
}
});
Ok(())
}
/// Route an event to the appropriate location worker
async fn route_event(
event: &FsEvent,
locations: &Arc<RwLock<HashMap<PathBuf, LocationMeta>>>,
workers: &Arc<RwLock<HashMap<Uuid, mpsc::Sender<FsEvent>>>>,
) -> Result<()> {
let locs = locations.read().await;
// Find the best matching location (longest prefix match)
let mut best_match: Option<&LocationMeta> = None;
let mut longest_len = 0;
for (root_path, meta) in locs.iter() {
if event.path.starts_with(root_path) {
let len = root_path.as_os_str().len();
if len > longest_len {
longest_len = len;
best_match = Some(meta);
}
}
}
let Some(location) = best_match else {
trace!("Event not under any location: {}", event.path.display());
return Ok(());
};
// Send to worker
let workers_map = workers.read().await;
if let Some(tx) = workers_map.get(&location.id) {
if let Err(e) = tx.send(event.clone()).await {
warn!(
"Failed to send event to worker for location {}: {}",
location.id, e
);
}
}
Ok(())
}
/// Run the location worker (batching + responder calls)
async fn run_worker(
mut rx: mpsc::Receiver<FsEvent>,
meta: LocationMeta,
context: Arc<CoreContext>,
config: PersistentHandlerConfig,
) -> Result<()> {
info!("Location worker started for {}", meta.id);
while let Some(first_event) = rx.recv().await {
// Start batching window
let mut batch = vec![first_event];
let deadline = Instant::now() + Duration::from_millis(config.debounce_window_ms);
// Collect events within the debounce window
while Instant::now() < deadline && batch.len() < config.max_batch_size {
match rx.try_recv() {
Ok(event) => batch.push(event),
Err(mpsc::error::TryRecvError::Empty) => {
// Brief sleep to avoid busy waiting
tokio::time::sleep(Duration::from_millis(10)).await;
}
Err(mpsc::error::TryRecvError::Disconnected) => break,
}
}
debug!(
"Processing batch of {} events for location {}",
batch.len(),
meta.id
);
// Pass FsEvent batch directly to responder
if let Err(e) = responder::apply_batch(
&context,
meta.library_id,
meta.id,
batch,
meta.rule_toggles,
&meta.root_path,
None, // volume_backend - TODO: resolve from context
)
.await
{
error!("Failed to apply batch for location {}: {}", meta.id, e);
}
}
info!("Location worker stopped for {}", meta.id);
Ok(())
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_config_default() {
let config = PersistentHandlerConfig::default();
assert_eq!(config.debounce_window_ms, 150);
assert_eq!(config.max_batch_size, 10000);
}
}

View File

@@ -522,11 +522,9 @@ impl JobHandler for IndexerJob {
// Automatically add filesystem watch for successfully indexed ephemeral paths
// This enables real-time updates when files change in browsed directories
if let Some(watcher) = ctx.library().core_context().get_location_watcher().await {
if let Err(e) = watcher.add_ephemeral_watch(
local_path.to_path_buf(),
self.config.rule_toggles
).await {
if let Some(watcher) = ctx.library().core_context().get_fs_watcher().await {
if let Err(e) = watcher.watch_ephemeral(local_path.to_path_buf()).await
{
ctx.log(format!(
"Warning: Failed to add ephemeral watch for {}: {}",
local_path.display(),

View File

@@ -24,6 +24,7 @@ pub mod action;
pub mod change_detection;
pub mod database_storage;
pub mod ephemeral;
pub mod handlers;
pub mod hierarchy;
pub mod input;
pub mod job;
@@ -41,10 +42,11 @@ pub mod verify;
pub use action::IndexingAction;
pub use change_detection::{
apply_batch as apply_change_batch, Change, ChangeConfig, ChangeDetector, ChangeHandler,
ChangeType, EntryRef, DatabaseAdapter, DatabaseAdapterForJob,
ChangeType, DatabaseAdapter, DatabaseAdapterForJob, EntryRef,
};
pub use database_storage::{DatabaseStorage, EntryMetadata};
pub use ephemeral::{EphemeralIndex, EphemeralIndexCache, EphemeralIndexStats, MemoryAdapter};
pub use handlers::{EphemeralEventHandler, LocationMeta, PersistentEventHandler};
pub use hierarchy::HierarchyQuery;
pub use input::IndexInput;
pub use job::{IndexMode, IndexScope, IndexerJob, IndexerJobConfig, IndexerOutput};

View File

@@ -1,15 +1,14 @@
//! Persistent location responder.
//!
//! Thin adapter over `DatabaseAdapter` that translates raw filesystem
//! Thin adapter over `DatabaseAdapter` that translates filesystem
//! events into database mutations. The watcher calls `apply_batch` with events;
//! this module delegates to the unified change handling infrastructure.
use crate::context::CoreContext;
use crate::infra::event::FsRawEventKind;
use crate::ops::indexing::change_detection::{self, ChangeConfig, DatabaseAdapter};
use crate::ops::indexing::rules::RuleToggles;
use anyhow::Result;
use sd_fs_watcher::FsEvent;
use std::path::Path;
use std::sync::Arc;
use uuid::Uuid;
@@ -22,7 +21,7 @@ pub async fn apply(
context: &Arc<CoreContext>,
library_id: Uuid,
location_id: Uuid,
kind: FsRawEventKind,
event: FsEvent,
rule_toggles: RuleToggles,
location_root: &Path,
volume_backend: Option<&Arc<dyn crate::volume::VolumeBackend>>,
@@ -31,7 +30,7 @@ pub async fn apply(
context,
library_id,
location_id,
vec![kind],
vec![event],
rule_toggles,
location_root,
volume_backend,
@@ -48,7 +47,7 @@ pub async fn apply_batch(
context: &Arc<CoreContext>,
library_id: Uuid,
location_id: Uuid,
events: Vec<FsRawEventKind>,
events: Vec<FsEvent>,
rule_toggles: RuleToggles,
location_root: &Path,
volume_backend: Option<&Arc<dyn crate::volume::VolumeBackend>>,

View File

@@ -18,19 +18,20 @@ pub mod sidecar_manager;
pub mod sync;
pub mod volume_monitor;
pub mod watcher;
// NOTE: watcher_old/ is kept as reference during migration but not compiled
use device::DeviceService;
use file_sharing::FileSharingService;
use network::NetworkingService;
use sidecar_manager::SidecarManager;
use volume_monitor::{VolumeMonitorConfig, VolumeMonitorService};
use watcher::{LocationWatcher, LocationWatcherConfig};
use watcher::{FsWatcherService, FsWatcherServiceConfig};
/// Container for all background services
#[derive(Clone)]
pub struct Services {
/// File system watcher for locations
pub location_watcher: Arc<LocationWatcher>,
/// Filesystem watcher - detects changes and emits events
pub fs_watcher: Arc<FsWatcherService>,
/// File sharing service
pub file_sharing: Arc<FileSharingService>,
/// Device management service
@@ -52,18 +53,18 @@ impl Services {
pub fn new(context: Arc<CoreContext>) -> Self {
info!("Initializing background services");
let location_watcher_config = LocationWatcherConfig::default();
let location_watcher = Arc::new(LocationWatcher::new(
location_watcher_config,
context.events.clone(),
context.clone(),
));
let fs_watcher_config = FsWatcherServiceConfig::default();
let fs_watcher = Arc::new(FsWatcherService::new(context.clone(), fs_watcher_config));
// Connect handlers to the watcher (they need Arc<FsWatcherService>)
fs_watcher.init_handlers();
let file_sharing = Arc::new(FileSharingService::new(context.clone()));
let device = Arc::new(DeviceService::new(context.clone()));
let sidecar_manager = Arc::new(SidecarManager::new(context.clone()));
let key_manager = context.key_manager.clone();
Self {
location_watcher,
fs_watcher,
file_sharing,
device,
networking: None, // Initialized separately when needed
@@ -83,7 +84,7 @@ impl Services {
pub async fn start_all(&self) -> Result<()> {
info!("Starting all background services");
self.location_watcher.start().await?;
self.fs_watcher.start().await?;
// Start volume monitor if initialized
if let Some(monitor) = &self.volume_monitor {
@@ -97,10 +98,10 @@ impl Services {
pub async fn start_all_with_config(&self, config: &crate::config::ServiceConfig) -> Result<()> {
info!("Starting background services based on configuration");
if config.location_watcher_enabled {
self.location_watcher.start().await?;
if config.fs_watcher_enabled {
self.fs_watcher.start().await?;
} else {
info!("Location watcher disabled in configuration");
info!("Filesystem watcher disabled in configuration");
}
// Start volume monitor if initialized and enabled
@@ -131,7 +132,7 @@ impl Services {
pub async fn stop_all(&self) -> Result<()> {
info!("Stopping all background services");
self.location_watcher.stop().await?;
self.fs_watcher.stop().await?;
// Stop volume monitor if initialized
if let Some(monitor) = &self.volume_monitor {

View File

File diff suppressed because it is too large Load Diff

View File

@@ -0,0 +1,399 @@
//! FsWatcher Service - wraps the sd-fs-watcher crate for use in Spacedrive
//!
//! This service manages the lifecycle of the filesystem watcher and provides
//! the event stream that handlers subscribe to. It owns and starts the
//! `EphemeralEventHandler` and `PersistentEventHandler`.
use crate::context::CoreContext;
use crate::library::Library;
use crate::ops::indexing::handlers::{EphemeralEventHandler, LocationMeta, PersistentEventHandler};
use crate::ops::indexing::rules::RuleToggles;
use crate::service::Service;
use anyhow::Result;
use sd_fs_watcher::{FsEvent, FsWatcher, WatchConfig, WatcherConfig};
use std::path::{Path, PathBuf};
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use std::time::Duration;
use tokio::sync::broadcast;
use tracing::{debug, info, warn};
/// Configuration for the FsWatcher service
#[derive(Debug, Clone)]
pub struct FsWatcherServiceConfig {
/// Size of the internal event buffer
pub event_buffer_size: usize,
/// Tick interval for platform-specific event eviction
pub tick_interval: Duration,
/// Enable debug logging
pub debug_mode: bool,
}
impl Default for FsWatcherServiceConfig {
fn default() -> Self {
Self {
event_buffer_size: 100_000,
tick_interval: Duration::from_millis(100),
debug_mode: false,
}
}
}
impl From<FsWatcherServiceConfig> for WatcherConfig {
fn from(config: FsWatcherServiceConfig) -> Self {
WatcherConfig::default()
.with_buffer_size(config.event_buffer_size)
.with_tick_interval(config.tick_interval)
.with_debug(config.debug_mode)
}
}
/// Filesystem watcher service that wraps sd-fs-watcher
///
/// This service:
/// - Manages the lifecycle of the underlying FsWatcher
/// - Owns and starts the event handlers (PersistentEventHandler, EphemeralEventHandler)
/// - Handles watch registration for paths
///
/// ## Usage
///
/// ```ignore
/// let config = FsWatcherServiceConfig::default();
/// let service = FsWatcherService::new(context, config);
///
/// // Start the service (also starts handlers)
/// service.start().await?;
///
/// // Watch a location (persistent, recursive)
/// service.watch_location(LocationMeta { ... }).await?;
///
/// // Watch an ephemeral path (shallow, in-memory)
/// service.watch_ephemeral("/path/to/browse").await?;
/// ```
pub struct FsWatcherService {
/// Core context for ephemeral cache access
context: Arc<CoreContext>,
/// The underlying filesystem watcher
watcher: FsWatcher,
/// Handler for persistent (database) events
persistent_handler: PersistentEventHandler,
/// Handler for ephemeral (in-memory) events
ephemeral_handler: EphemeralEventHandler,
/// Whether the service is running
is_running: AtomicBool,
/// Configuration
config: FsWatcherServiceConfig,
}
impl FsWatcherService {
/// Create a new FsWatcher service
///
/// Note: Handlers are created but not yet connected. Call `init_handlers()`
/// after wrapping in Arc to connect them to the watcher.
pub fn new(context: Arc<CoreContext>, config: FsWatcherServiceConfig) -> Self {
let watcher_config: WatcherConfig = config.clone().into();
let watcher = FsWatcher::new(watcher_config);
Self {
context: context.clone(),
watcher,
persistent_handler: PersistentEventHandler::new_unconnected(context.clone()),
ephemeral_handler: EphemeralEventHandler::new_unconnected(context),
is_running: AtomicBool::new(false),
config,
}
}
/// Initialize handlers with a reference to self (wrapped in Arc)
///
/// Must be called after the service is wrapped in Arc.
pub fn init_handlers(self: &Arc<Self>) {
self.persistent_handler.connect(self.clone());
self.ephemeral_handler.connect(self.clone());
}
/// Subscribe to filesystem events
///
/// Returns a broadcast receiver that will receive all filesystem events.
/// Multiple subscribers can exist simultaneously.
pub fn subscribe(&self) -> broadcast::Receiver<FsEvent> {
self.watcher.subscribe()
}
/// Watch a path with the given configuration
///
/// For persistent locations, use `WatchConfig::recursive()`.
/// For ephemeral browsing, use `WatchConfig::shallow()`.
pub async fn watch_path(&self, path: impl Into<PathBuf>, config: WatchConfig) -> Result<()> {
let path = path.into();
debug!("Watching path: {}", path.display());
self.watcher.watch_path(&path, config).await?;
Ok(())
}
/// Stop watching a path
pub async fn unwatch_path(&self, path: impl AsRef<std::path::Path>) -> Result<()> {
let path = path.as_ref();
debug!("Unwatching path: {}", path.display());
self.watcher.unwatch(path).await?;
Ok(())
}
/// Get all currently watched paths
pub async fn watched_paths(&self) -> Vec<PathBuf> {
self.watcher.watched_paths().await
}
/// Get the number of events received from the OS
pub fn events_received(&self) -> u64 {
self.watcher.events_received()
}
/// Get the number of events emitted to subscribers
pub fn events_emitted(&self) -> u64 {
self.watcher.events_emitted()
}
/// Get a reference to the underlying watcher
///
/// Use this for advanced operations or when you need direct access
/// to the watcher's capabilities.
pub fn inner(&self) -> &FsWatcher {
&self.watcher
}
/// Watch a location (persistent, recursive)
///
/// The location will be watched recursively and events will be
/// batched and persisted to the database.
pub async fn watch_location(&self, meta: LocationMeta) -> Result<()> {
info!(
"Watching location {} at {}",
meta.id,
meta.root_path.display()
);
self.persistent_handler.add_location(meta).await
}
/// Stop watching a location
pub async fn unwatch_location(&self, location_id: uuid::Uuid) -> Result<()> {
info!("Unwatching location {}", location_id);
self.persistent_handler.remove_location(location_id).await
}
/// Get all watched locations
pub async fn watched_locations(&self) -> Vec<LocationMeta> {
self.persistent_handler.locations().await
}
/// Load and watch all eligible locations from a library
///
/// Only watches locations that:
/// - Are on this device
/// - Have IndexMode != None
pub async fn load_library_locations(&self, library: &Library) -> Result<usize> {
use crate::infra::db::entities::{device, location};
use crate::ops::indexing::path_resolver::PathResolver;
use sea_orm::{ColumnTrait, EntityTrait, QueryFilter};
let db = library.db().conn();
let mut count = 0;
// Get current device UUID and find in this library's database
let current_device_uuid = crate::device::get_current_device_id();
let current_device = device::Entity::find()
.filter(device::Column::Uuid.eq(current_device_uuid))
.one(db)
.await?;
let Some(current_device) = current_device else {
warn!(
"Current device {} not found in library {} database",
current_device_uuid,
library.id()
);
return Ok(0);
};
// Query locations owned by this device
let locations = location::Entity::find()
.filter(location::Column::DeviceId.eq(current_device.id))
.all(db)
.await?;
debug!(
"Found {} locations in library {} for this device",
locations.len(),
library.id()
);
for loc in locations {
// Skip locations without entry_id (not yet indexed)
let Some(entry_id) = loc.entry_id else {
debug!("Skipping location {} - no entry_id", loc.uuid);
continue;
};
// Skip IndexMode::None
if loc.index_mode == "none" {
debug!("Skipping location {} - IndexMode::None", loc.uuid);
continue;
}
// Get the full filesystem path
let path = match PathResolver::get_full_path(db, entry_id).await {
Ok(path) => path,
Err(e) => {
warn!("Failed to resolve path for location {}: {}", loc.uuid, e);
continue;
}
};
// Skip cloud locations
let path_str = path.to_string_lossy();
if path_str.contains("://") && !path_str.starts_with("local://") {
debug!("Skipping cloud location {}: {}", loc.uuid, path_str);
continue;
}
// Check if path exists
if !path.exists() {
warn!(
"Location {} path does not exist: {}",
loc.uuid,
path.display()
);
continue;
}
let meta = LocationMeta {
id: loc.uuid,
library_id: library.id(),
root_path: path,
rule_toggles: RuleToggles::default(),
};
if let Err(e) = self.watch_location(meta).await {
warn!("Failed to watch location {}: {}", loc.uuid, e);
} else {
count += 1;
}
}
info!("Loaded {} locations from library {}", count, library.id());
Ok(count)
}
/// Watch an ephemeral path (shallow, in-memory only)
///
/// Used for browsing external drives, network shares, etc.
/// Registers with ephemeral cache and starts OS-level watching.
pub async fn watch_ephemeral(&self, path: impl Into<PathBuf>) -> Result<()> {
let path = path.into();
debug!("Watching ephemeral path: {}", path.display());
// Register with ephemeral cache so handler knows to process events
self.context
.ephemeral_cache()
.register_for_watching(path.clone());
// Start OS-level watching (shallow = immediate children only)
self.watcher
.watch_path(&path, WatchConfig::shallow())
.await?;
Ok(())
}
/// Stop watching an ephemeral path
pub async fn unwatch_ephemeral(&self, path: &Path) -> Result<()> {
debug!("Unwatching ephemeral path: {}", path.display());
// Unregister from ephemeral cache
self.context
.ephemeral_cache()
.unregister_from_watching(path);
// Stop OS-level watching
self.watcher.unwatch(path).await?;
Ok(())
}
// ==================== Handler Access ====================
/// Get reference to persistent handler
pub fn persistent_handler(&self) -> &PersistentEventHandler {
&self.persistent_handler
}
/// Get reference to ephemeral handler
pub fn ephemeral_handler(&self) -> &EphemeralEventHandler {
&self.ephemeral_handler
}
}
#[async_trait::async_trait]
impl Service for FsWatcherService {
async fn start(&self) -> Result<()> {
if self.is_running.swap(true, Ordering::SeqCst) {
warn!("FsWatcher service is already running");
return Ok(());
}
info!("Starting FsWatcher service");
// Start the underlying watcher first
self.watcher.start().await?;
// Start the event handlers
self.persistent_handler.start().await?;
self.ephemeral_handler.start().await?;
info!("FsWatcher service started (with handlers)");
Ok(())
}
async fn stop(&self) -> Result<()> {
if !self.is_running.swap(false, Ordering::SeqCst) {
return Ok(());
}
info!("Stopping FsWatcher service");
// Stop handlers first
self.persistent_handler.stop().await;
self.ephemeral_handler.stop();
// Then stop the watcher
self.watcher.stop().await?;
info!("FsWatcher service stopped");
Ok(())
}
fn is_running(&self) -> bool {
self.is_running.load(Ordering::SeqCst)
}
fn name(&self) -> &'static str {
"fs_watcher"
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_config_default() {
let config = FsWatcherServiceConfig::default();
assert_eq!(config.event_buffer_size, 100_000);
assert!(!config.debug_mode);
}
// Note: Full service tests require CoreContext which needs async runtime
// See integration tests for complete service lifecycle testing
}

View File

File diff suppressed because it is too large Load Diff

View File

@@ -416,6 +416,7 @@ impl MacOSHandler {
// Check locations first
for location in locations.values() {
if path.starts_with(&location.path) {
debug!("Eviction: matched location for {}", path.display());
events.push(Event::FsRawChange {
library_id: location.library_id,
kind: crate::infra::event::FsRawEventKind::Create {
@@ -429,17 +430,24 @@ impl MacOSHandler {
// If not matched by location, check ephemeral watches
if !matched {
debug!("Eviction: checking ephemeral watches for {}", path.display());
let ephemeral = ephemeral_watches.read().await;
if let Some(parent) = path.parent() {
debug!("Eviction: parent is {}, ephemeral watch count: {}", parent.display(), ephemeral.len());
if ephemeral.contains_key(parent) {
debug!("Eviction: MATCHED ephemeral watch, emitting FsRawChange for {}", path.display());
events.push(Event::FsRawChange {
library_id: Uuid::nil(), // Ephemeral events use nil UUID
kind: crate::infra::event::FsRawEventKind::Create {
path: path.clone(),
},
});
} else {
debug!("Eviction: no ephemeral watch for parent {}", parent.display());
}
}
} else {
debug!("Eviction: already matched location, skipping ephemeral check");
}
}
}
@@ -459,6 +467,7 @@ impl MacOSHandler {
// Emit create event (responder will detect if it's an update via inode)
let locations = watched_locations.read().await;
let mut matched = false;
for location in locations.values() {
if path.starts_with(&location.path) {
events.push(Event::FsRawChange {
@@ -467,9 +476,25 @@ impl MacOSHandler {
path: path.clone(),
},
});
matched = true;
break;
}
}
// Check ephemeral watches if not matched
if !matched {
let ephemeral = ephemeral_watches.read().await;
if let Some(parent) = path.parent() {
if ephemeral.contains_key(parent) {
events.push(Event::FsRawChange {
library_id: Uuid::nil(),
kind: crate::infra::event::FsRawEventKind::Create {
path: path.clone(),
},
});
}
}
}
}
}
*reincident_files = reincident_to_keep;
@@ -481,6 +506,7 @@ impl MacOSHandler {
async fn handle_rename_create_eviction(
&self,
watched_locations: &Arc<RwLock<HashMap<Uuid, WatchedLocation>>>,
ephemeral_watches: &Arc<RwLock<HashMap<PathBuf, EphemeralWatch>>>,
) -> Result<Vec<Event>> {
let mut events = Vec::new();
let mut new_paths = self.new_paths_map.write().await;
@@ -494,6 +520,7 @@ impl MacOSHandler {
match tokio::fs::metadata(&path).await {
Ok(metadata) => {
let locations = watched_locations.read().await;
let mut matched = false;
for location in locations.values() {
if path.starts_with(&location.path) {
events.push(Event::FsRawChange {
@@ -507,9 +534,25 @@ impl MacOSHandler {
let mut to_recalc = self.to_recalculate_size.write().await;
to_recalc.insert(parent.to_path_buf(), Instant::now());
}
matched = true;
break;
}
}
// Check ephemeral watches if not matched
if !matched {
let ephemeral = ephemeral_watches.read().await;
if let Some(parent) = path.parent() {
if ephemeral.contains_key(parent) {
events.push(Event::FsRawChange {
library_id: Uuid::nil(),
kind: crate::infra::event::FsRawEventKind::Create {
path: path.clone(),
},
});
}
}
}
}
Err(_) => {
// File no longer exists, ignore
@@ -529,6 +572,7 @@ impl MacOSHandler {
async fn handle_rename_remove_eviction(
&self,
watched_locations: &Arc<RwLock<HashMap<Uuid, WatchedLocation>>>,
ephemeral_watches: &Arc<RwLock<HashMap<PathBuf, EphemeralWatch>>>,
) -> Result<Vec<Event>> {
let mut events = Vec::new();
let mut old_paths = self.old_paths_map.write().await;
@@ -538,6 +582,7 @@ impl MacOSHandler {
if instant.elapsed() > HUNDRED_MILLIS {
// Path has timed out, treat as removal
let locations = watched_locations.read().await;
let mut matched = false;
for location in locations.values() {
if path.starts_with(&location.path) {
events.push(Event::FsRawChange {
@@ -551,9 +596,25 @@ impl MacOSHandler {
let mut to_recalc = self.to_recalculate_size.write().await;
to_recalc.insert(parent.to_path_buf(), Instant::now());
}
matched = true;
break;
}
}
// Check ephemeral watches if not matched
if !matched {
let ephemeral = ephemeral_watches.read().await;
if let Some(parent) = path.parent() {
if ephemeral.contains_key(parent) {
events.push(Event::FsRawChange {
library_id: Uuid::nil(),
kind: crate::infra::event::FsRawEventKind::Remove {
path: path.clone(),
},
});
}
}
}
} else {
paths_to_keep.insert(inode, (instant, path));
}
@@ -763,13 +824,13 @@ impl MacOSHandler {
// Handle rename create evictions
let create_events = self
.handle_rename_create_eviction(watched_locations)
.handle_rename_create_eviction(watched_locations, ephemeral_watches)
.await?;
all_events.extend(create_events);
// Handle rename remove evictions
let remove_events = self
.handle_rename_remove_eviction(watched_locations)
.handle_rename_remove_eviction(watched_locations, ephemeral_watches)
.await?;
all_events.extend(remove_events);

View File

@@ -135,7 +135,7 @@ pub struct TestConfigBuilder {
log_level: String,
networking_enabled: bool,
volume_monitoring_enabled: bool,
location_watcher_enabled: bool,
fs_watcher_enabled: bool,
job_logging_enabled: bool,
telemetry_enabled: bool,
}
@@ -148,7 +148,7 @@ impl TestConfigBuilder {
log_level: "warn".to_string(), // Reduce log noise by default
networking_enabled: false, // Disable for faster tests
volume_monitoring_enabled: false, // Disable for faster tests
location_watcher_enabled: true, // Usually needed for indexing tests
fs_watcher_enabled: true, // Usually needed for indexing tests
job_logging_enabled: true, // Usually needed for job tests
telemetry_enabled: false, // Disable for tests
}
@@ -172,9 +172,9 @@ impl TestConfigBuilder {
self
}
/// Enable/disable location watcher (default: true)
pub fn location_watcher_enabled(mut self, enabled: bool) -> Self {
self.location_watcher_enabled = enabled;
/// Enable/disable filesystem watcher (default: true)
pub fn fs_watcher_enabled(mut self, enabled: bool) -> Self {
self.fs_watcher_enabled = enabled;
self
}
@@ -207,7 +207,7 @@ impl TestConfigBuilder {
services: ServiceConfig {
networking_enabled: self.networking_enabled,
volume_monitoring_enabled: self.volume_monitoring_enabled,
location_watcher_enabled: self.location_watcher_enabled,
fs_watcher_enabled: self.fs_watcher_enabled,
},
logging: crate::config::app_config::LoggingConfig::default(),
}
@@ -236,8 +236,8 @@ impl TestConfigBuilder {
config.services.volume_monitoring_enabled
);
info!(
" - Location watcher enabled: {}",
config.services.location_watcher_enabled
" - Filesystem watcher enabled: {}",
config.services.fs_watcher_enabled
);
info!(" - Job logging enabled: {}", config.job_logging.enabled);
@@ -424,8 +424,8 @@ impl IntegrationTestSetup {
loaded_config.services.volume_monitoring_enabled
);
info!(
" - Location watcher enabled: {}",
loaded_config.services.location_watcher_enabled
" - Filesystem watcher enabled: {}",
loaded_config.services.fs_watcher_enabled
);
info!(
" - Job logging enabled: {}",

View File

@@ -0,0 +1,41 @@
[package]
name = "sd-fs-watcher"
version = "0.1.0"
authors = ["Spacedrive Technology Inc. <engineering@spacedrive.com>"]
description = """
Platform-agnostic filesystem watcher that emits normalized events.
Handles platform-specific quirks like macOS rename detection and event buffering.
"""
edition.workspace = true
license.workspace = true
repository.workspace = true
rust-version.workspace = true
[features]
default = []
# Enable detailed debug logging
debug = []
[dependencies]
# Workspace dependencies
async-trait = { workspace = true }
futures = { workspace = true }
serde = { workspace = true, features = ["derive"] }
thiserror = { workspace = true }
tokio = { workspace = true, features = ["sync", "time", "rt", "macros", "fs"] }
tracing = { workspace = true }
# External dependencies
notify = "8.0.0"
# Platform-specific dependencies
[target.'cfg(target_os = "macos")'.dependencies]
libc = { workspace = true }
[dev-dependencies]
tempfile = { workspace = true }
tokio = { workspace = true, features = ["rt-multi-thread", "macros"] }
tracing-subscriber = { workspace = true }
tracing-test = { workspace = true }

203
crates/fs-watcher/README.md Normal file
View File

@@ -0,0 +1,203 @@
# sd-fs-watcher
Platform-agnostic filesystem watcher for Spacedrive.
## Overview
`sd-fs-watcher` provides a clean, storage-agnostic interface for watching filesystem changes. It handles platform-specific quirks (like macOS rename detection) internally and emits normalized events.
This crate is designed to be the foundation of Spacedrive's filesystem event system, but it has no knowledge of:
- Databases or ORM entities
- Libraries or locations
- UUIDs or entry IDs
It just watches paths and emits events.
## Usage
```rust
use sd_fs_watcher::{FsWatcher, WatchConfig, WatcherConfig};
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
// Create watcher with default config
let watcher = FsWatcher::new(WatcherConfig::default());
watcher.start().await?;
// Subscribe to events
let mut rx = watcher.subscribe();
// Watch a directory recursively
let _handle = watcher.watch("/path/to/watch", WatchConfig::recursive()).await?;
// Process events
while let Ok(event) = rx.recv().await {
match event.kind {
sd_fs_watcher::FsEventKind::Create => {
println!("Created: {}", event.path.display());
}
sd_fs_watcher::FsEventKind::Modify => {
println!("Modified: {}", event.path.display());
}
sd_fs_watcher::FsEventKind::Remove => {
println!("Removed: {}", event.path.display());
}
sd_fs_watcher::FsEventKind::Rename { from, to } => {
println!("Renamed: {} -> {}", from.display(), to.display());
}
}
}
Ok(())
}
```
## Watch Modes
### Recursive (default)
Watch a directory and all its subdirectories:
```rust
let _handle = watcher.watch("/path", WatchConfig::recursive()).await?;
```
### Shallow
Watch only immediate children of a directory (for ephemeral browsing):
```rust
let _handle = watcher.watch("/path", WatchConfig::shallow()).await?;
```
## Event Filtering
By default, the watcher filters out:
- Temporary files (`.tmp`, `.temp`, `~`, `.swp`)
- System files (`.DS_Store`, `Thumbs.db`)
- Hidden files (starting with `.`)
Important dotfiles like `.gitignore`, `.env`, etc. are preserved.
```rust
// Custom filtering
let config = WatchConfig::recursive()
.with_filters(EventFilters {
skip_hidden: false, // Include hidden files
skip_system_files: true,
skip_temp_files: true,
skip_patterns: vec!["node_modules".to_string()],
important_dotfiles: vec![".env".to_string()],
});
```
## Platform-Specific Behavior
### macOS
macOS FSEvents doesn't provide native rename tracking. When a file is renamed, we receive separate create and delete events. This crate implements rename detection via inode tracking:
1. When a file is created, we record its inode
2. When a file is removed, we buffer it briefly
3. If a create with the same inode arrives within 500ms, we emit a rename event
4. Otherwise, we emit separate create/remove events
### Linux
Linux inotify provides better rename tracking. We handle rename events directly when both paths are provided, with a small stabilization buffer for modify events.
### Windows
Windows ReadDirectoryChangesW provides reasonable tracking. We implement rename detection by buffering remove events and matching with subsequent creates.
## Reference Counting
Multiple calls to `watch()` on the same path share resources:
```rust
let handle1 = watcher.watch("/path", WatchConfig::recursive()).await?;
let handle2 = watcher.watch("/path", WatchConfig::recursive()).await?;
// Only one actual watch is registered with the OS
// Dropping both handles will unwatch
drop(handle1);
// Still watching (handle2 exists)
drop(handle2);
// Now actually unwatched
```
## Metrics
```rust
let received = watcher.events_received(); // Raw events from notify
let emitted = watcher.events_emitted(); // Processed events broadcast
```
## Event Metadata
Each `FsEvent` includes an optional `is_directory` flag:
```rust
pub struct FsEvent {
pub path: PathBuf,
pub kind: FsEventKind,
pub timestamp: SystemTime,
pub is_directory: Option<bool>, // Avoids extra fs::metadata calls downstream
}
```
Check directory status without filesystem calls:
```rust
if let Some(true) = event.is_dir() {
// Handle directory event
} else if let Some(false) = event.is_file() {
// Handle file event
} else {
// Unknown - check filesystem if needed (e.g., for Remove events)
}
```
## Integration with Spacedrive
This crate is designed to be consumed by higher-level services:
- **PersistentIndexService**: Subscribes to events, filters by location scope, writes to database
- **EphemeralIndexService**: Subscribes to events, filters by session scope, writes to memory
These services are not part of this crate - they live in `sd-core` and consume events from `FsWatcher`.
### Backpressure Management
The `FsWatcher` uses a broadcast channel for event distribution. To avoid backpressure issues:
1. **Don't block in the receiver loop**: Avoid synchronous database writes directly in the broadcast receiver
2. **Use internal batching queues**: The `PersistentIndexService` should receive events and immediately push them to its own internal batching queue (like the existing `LocationWorker` logic)
3. **Keep the broadcast clear**: This ensures the `EphemeralIndexService` (UI updates) receives events promptly
```rust
// Good pattern for PersistentIndexService
let mut rx = watcher.subscribe();
let (batch_tx, batch_rx) = mpsc::channel(100_000);
// Receiver task - fast, non-blocking
tokio::spawn(async move {
while let Ok(event) = rx.recv().await {
if is_in_my_scope(&event) {
let _ = batch_tx.send(event).await; // Push to internal queue
}
}
});
// Worker task - handles batching and DB writes
tokio::spawn(async move {
// Batch events, coalesce, write to DB...
});
```
### Database-Backed Inode Lookup
For enhanced rename detection on macOS, the `PersistentIndexService` can maintain an inode cache. When a Remove event is received, check if the inode exists in your database to detect if it's actually a rename where the "new path" hasn't arrived yet.

View File

@@ -0,0 +1,259 @@
//! Watch configuration types
//!
//! Configuration for how paths should be watched and events filtered.
use serde::{Deserialize, Serialize};
use std::time::Duration;
/// Configuration for watching a path
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct WatchConfig {
/// Whether to watch recursively (all subdirectories) or shallow (immediate children only)
pub recursive: bool,
/// Rules for filtering events
pub filters: EventFilters,
}
impl Default for WatchConfig {
fn default() -> Self {
Self {
recursive: true,
filters: EventFilters::default(),
}
}
}
impl WatchConfig {
/// Create a recursive watch configuration with default filters
pub fn recursive() -> Self {
Self {
recursive: true,
filters: EventFilters::default(),
}
}
/// Create a shallow (non-recursive) watch configuration with default filters
pub fn shallow() -> Self {
Self {
recursive: false,
filters: EventFilters::default(),
}
}
/// Set whether to watch recursively
pub fn with_recursive(mut self, recursive: bool) -> Self {
self.recursive = recursive;
self
}
/// Set the event filters
pub fn with_filters(mut self, filters: EventFilters) -> Self {
self.filters = filters;
self
}
}
/// Filters for which events to emit
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct EventFilters {
/// Skip hidden files (starting with .)
pub skip_hidden: bool,
/// Skip system files (.DS_Store, Thumbs.db, etc.)
pub skip_system_files: bool,
/// Skip temporary files (.tmp, .temp, ~, .swp)
pub skip_temp_files: bool,
/// Custom patterns to skip (glob patterns)
pub skip_patterns: Vec<String>,
/// Keep these dotfiles even if skip_hidden is true
pub important_dotfiles: Vec<String>,
}
impl Default for EventFilters {
fn default() -> Self {
Self {
skip_hidden: true,
skip_system_files: true,
skip_temp_files: true,
skip_patterns: Vec::new(),
important_dotfiles: vec![
".gitignore".to_string(),
".gitkeep".to_string(),
".gitattributes".to_string(),
".editorconfig".to_string(),
".env".to_string(),
".env.local".to_string(),
".nvmrc".to_string(),
".node-version".to_string(),
".python-version".to_string(),
".dockerignore".to_string(),
".eslintrc".to_string(),
".prettierrc".to_string(),
],
}
}
}
impl EventFilters {
/// Create filters that allow all events (no filtering)
pub fn allow_all() -> Self {
Self {
skip_hidden: false,
skip_system_files: false,
skip_temp_files: false,
skip_patterns: Vec::new(),
important_dotfiles: Vec::new(),
}
}
/// Check if a path should be filtered out
pub fn should_skip(&self, path: &std::path::Path) -> bool {
let path_str = path.to_string_lossy();
// Check temp files
if self.skip_temp_files {
if path_str.contains(".tmp")
|| path_str.contains(".temp")
|| path_str.ends_with("~")
|| path_str.ends_with(".swp")
{
return true;
}
}
// Check system files
if self.skip_system_files {
if path_str.contains(".DS_Store") || path_str.contains("Thumbs.db") {
return true;
}
}
// Check hidden files
if self.skip_hidden {
if let Some(file_name) = path.file_name() {
let name = file_name.to_string_lossy();
if name.starts_with('.') {
// Check if it's an important dotfile
let is_important = self
.important_dotfiles
.iter()
.any(|d| d.as_str() == name.as_ref());
if !is_important {
return true;
}
}
}
}
// Check custom skip patterns
for pattern in &self.skip_patterns {
if path_str.contains(pattern) {
return true;
}
}
false
}
}
/// Global watcher configuration
#[derive(Debug, Clone)]
pub struct WatcherConfig {
/// Size of the event channel buffer
pub event_buffer_size: usize,
/// Platform-specific tick interval for buffered event eviction
pub tick_interval: Duration,
/// Debounce duration for rapid events
pub debounce_duration: Duration,
/// Enable detailed debug logging
pub debug_mode: bool,
}
impl Default for WatcherConfig {
fn default() -> Self {
Self {
event_buffer_size: 100_000,
tick_interval: Duration::from_millis(100),
debounce_duration: Duration::from_millis(100),
debug_mode: false,
}
}
}
impl WatcherConfig {
/// Create a new watcher configuration
pub fn new() -> Self {
Self::default()
}
/// Set the event buffer size
pub fn with_buffer_size(mut self, size: usize) -> Self {
self.event_buffer_size = size;
self
}
/// Set the tick interval
pub fn with_tick_interval(mut self, interval: Duration) -> Self {
self.tick_interval = interval;
self
}
/// Set the debounce duration
pub fn with_debounce(mut self, duration: Duration) -> Self {
self.debounce_duration = duration;
self
}
/// Enable debug mode
pub fn with_debug(mut self, debug: bool) -> Self {
self.debug_mode = debug;
self
}
}
#[cfg(test)]
mod tests {
use super::*;
use std::path::PathBuf;
#[test]
fn test_default_filters() {
let filters = EventFilters::default();
// Should skip system files
assert!(filters.should_skip(&PathBuf::from("/test/.DS_Store")));
assert!(filters.should_skip(&PathBuf::from("/test/Thumbs.db")));
// Should skip temp files
assert!(filters.should_skip(&PathBuf::from("/test/file.tmp")));
assert!(filters.should_skip(&PathBuf::from("/test/file~")));
// Should skip hidden files
assert!(filters.should_skip(&PathBuf::from("/test/.hidden")));
// Should NOT skip important dotfiles
assert!(!filters.should_skip(&PathBuf::from("/test/.gitignore")));
assert!(!filters.should_skip(&PathBuf::from("/test/.env")));
// Should NOT skip normal files
assert!(!filters.should_skip(&PathBuf::from("/test/file.txt")));
}
#[test]
fn test_allow_all_filters() {
let filters = EventFilters::allow_all();
// Should not skip anything
assert!(!filters.should_skip(&PathBuf::from("/test/.DS_Store")));
assert!(!filters.should_skip(&PathBuf::from("/test/.hidden")));
assert!(!filters.should_skip(&PathBuf::from("/test/file.tmp")));
}
#[test]
fn test_watch_config() {
let config = WatchConfig::recursive();
assert!(config.recursive);
let config = WatchConfig::shallow();
assert!(!config.recursive);
}
}

View File

@@ -0,0 +1,61 @@
//! Error types for the filesystem watcher
use std::path::PathBuf;
use thiserror::Error;
/// Result type alias for watcher operations
pub type Result<T> = std::result::Result<T, WatcherError>;
/// Errors that can occur during filesystem watching
#[derive(Debug, Error)]
pub enum WatcherError {
/// Failed to start the watcher
#[error("Failed to start watcher: {0}")]
StartFailed(String),
/// Failed to watch a path
#[error("Failed to watch path {path}: {reason}")]
WatchFailed {
path: PathBuf,
reason: String,
},
/// Failed to unwatch a path
#[error("Failed to unwatch path {path}: {reason}")]
UnwatchFailed {
path: PathBuf,
reason: String,
},
/// Path does not exist
#[error("Path does not exist: {0}")]
PathNotFound(PathBuf),
/// Path is not a directory
#[error("Path is not a directory: {0}")]
NotADirectory(PathBuf),
/// Watcher is already running
#[error("Watcher is already running")]
AlreadyRunning,
/// Watcher is not running
#[error("Watcher is not running")]
NotRunning,
/// Event channel closed
#[error("Event channel closed")]
ChannelClosed,
/// Internal notify error
#[error("Notify error: {0}")]
NotifyError(#[from] notify::Error),
/// IO error
#[error("IO error: {0}")]
IoError(#[from] std::io::Error),
/// Configuration error
#[error("Configuration error: {0}")]
ConfigError(String),
}

View File

@@ -0,0 +1,264 @@
//! Filesystem event types
//!
//! Storage-agnostic event types that represent raw filesystem changes.
//! These events contain only paths and change kinds - no library IDs,
//! no database references, no routing decisions.
use serde::{Deserialize, Serialize};
use std::path::PathBuf;
use std::time::SystemTime;
/// A filesystem change event
///
/// This is a normalized, platform-agnostic representation of a filesystem change.
/// Platform-specific handlers translate OS events into these normalized events.
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub struct FsEvent {
/// The path affected by this event
pub path: PathBuf,
/// The kind of change
pub kind: FsEventKind,
/// When the event was detected
pub timestamp: SystemTime,
/// Whether the path is a directory (avoids extra fs::metadata calls downstream)
/// Note: For Remove events, this may be None if the path no longer exists
pub is_directory: Option<bool>,
}
impl FsEvent {
/// Create a new filesystem event
pub fn new(path: PathBuf, kind: FsEventKind) -> Self {
Self {
path,
kind,
timestamp: SystemTime::now(),
is_directory: None,
}
}
/// Create a new filesystem event with directory flag
pub fn new_with_dir_flag(path: PathBuf, kind: FsEventKind, is_directory: bool) -> Self {
Self {
path,
kind,
timestamp: SystemTime::now(),
is_directory: Some(is_directory),
}
}
/// Create a create event
pub fn create(path: PathBuf) -> Self {
Self::new(path, FsEventKind::Create)
}
/// Create a create event for a directory
pub fn create_dir(path: PathBuf) -> Self {
Self::new_with_dir_flag(path, FsEventKind::Create, true)
}
/// Create a create event for a file
pub fn create_file(path: PathBuf) -> Self {
Self::new_with_dir_flag(path, FsEventKind::Create, false)
}
/// Create a modify event
pub fn modify(path: PathBuf) -> Self {
Self::new(path, FsEventKind::Modify)
}
/// Create a modify event for a file (directories typically don't get modify events)
pub fn modify_file(path: PathBuf) -> Self {
Self::new_with_dir_flag(path, FsEventKind::Modify, false)
}
/// Create a remove event
pub fn remove(path: PathBuf) -> Self {
Self::new(path, FsEventKind::Remove)
}
/// Create a rename event
pub fn rename(from: PathBuf, to: PathBuf) -> Self {
Self {
path: to.clone(),
kind: FsEventKind::Rename { from, to },
timestamp: SystemTime::now(),
is_directory: None,
}
}
/// Create a rename event with directory flag
pub fn rename_with_dir_flag(from: PathBuf, to: PathBuf, is_directory: bool) -> Self {
Self {
path: to.clone(),
kind: FsEventKind::Rename { from, to },
timestamp: SystemTime::now(),
is_directory: Some(is_directory),
}
}
/// Check if this event is for a directory
pub fn is_dir(&self) -> Option<bool> {
self.is_directory
}
/// Check if this event is for a file
pub fn is_file(&self) -> Option<bool> {
self.is_directory.map(|d| !d)
}
}
/// The kind of filesystem change
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub enum FsEventKind {
/// A file or directory was created
Create,
/// A file or directory was modified
Modify,
/// A file or directory was removed
Remove,
/// A file or directory was renamed/moved
Rename {
/// The original path
from: PathBuf,
/// The new path
to: PathBuf,
},
}
impl FsEventKind {
/// Check if this is a create event
pub fn is_create(&self) -> bool {
matches!(self, Self::Create)
}
/// Check if this is a modify event
pub fn is_modify(&self) -> bool {
matches!(self, Self::Modify)
}
/// Check if this is a remove event
pub fn is_remove(&self) -> bool {
matches!(self, Self::Remove)
}
/// Check if this is a rename event
pub fn is_rename(&self) -> bool {
matches!(self, Self::Rename { .. })
}
}
/// Raw event from notify crate before platform processing
#[derive(Debug, Clone)]
pub struct RawNotifyEvent {
/// The kind of event from notify
pub kind: RawEventKind,
/// Paths affected by the event
pub paths: Vec<PathBuf>,
/// Timestamp when received
pub timestamp: SystemTime,
}
/// Raw event kinds from notify
#[derive(Debug, Clone)]
pub enum RawEventKind {
/// Create event
Create,
/// Modify event
Modify,
/// Remove event
Remove,
/// Rename event (platform-specific semantics)
Rename,
/// Other/unknown event type
Other(String),
}
impl RawNotifyEvent {
/// Create from a notify event
pub fn from_notify(event: notify::Event) -> Self {
use notify::event::{ModifyKind, RenameMode};
use notify::EventKind;
let kind = match event.kind {
EventKind::Create(_) => RawEventKind::Create,
EventKind::Modify(ModifyKind::Name(RenameMode::Any)) => RawEventKind::Rename,
EventKind::Modify(ModifyKind::Name(RenameMode::From)) => RawEventKind::Rename,
EventKind::Modify(ModifyKind::Name(RenameMode::To)) => RawEventKind::Rename,
EventKind::Modify(ModifyKind::Name(RenameMode::Both)) => RawEventKind::Rename,
EventKind::Modify(_) => RawEventKind::Modify,
EventKind::Remove(_) => RawEventKind::Remove,
other => RawEventKind::Other(format!("{:?}", other)),
};
Self {
kind,
paths: event.paths,
timestamp: SystemTime::now(),
}
}
/// Get the primary path for this event
pub fn primary_path(&self) -> Option<&PathBuf> {
self.paths.first()
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_event_creation() {
let path = PathBuf::from("/test/file.txt");
let event = FsEvent::create(path.clone());
assert!(event.kind.is_create());
assert_eq!(event.path, path);
assert!(event.is_directory.is_none());
let event = FsEvent::modify(path.clone());
assert!(event.kind.is_modify());
let event = FsEvent::remove(path.clone());
assert!(event.kind.is_remove());
}
#[test]
fn test_directory_flag() {
let path = PathBuf::from("/test/dir");
let event = FsEvent::create_dir(path.clone());
assert!(event.kind.is_create());
assert_eq!(event.is_dir(), Some(true));
assert_eq!(event.is_file(), Some(false));
let event = FsEvent::create_file(path.clone());
assert!(event.kind.is_create());
assert_eq!(event.is_dir(), Some(false));
assert_eq!(event.is_file(), Some(true));
// Generic create has no flag
let event = FsEvent::create(path.clone());
assert!(event.is_dir().is_none());
}
#[test]
fn test_rename_event() {
let from = PathBuf::from("/test/old.txt");
let to = PathBuf::from("/test/new.txt");
let event = FsEvent::rename(from.clone(), to.clone());
assert!(event.kind.is_rename());
if let FsEventKind::Rename { from: f, to: t } = &event.kind {
assert_eq!(f, &from);
assert_eq!(t, &to);
} else {
panic!("Expected rename event");
}
// Test rename with directory flag
let event = FsEvent::rename_with_dir_flag(from.clone(), to.clone(), true);
assert_eq!(event.is_dir(), Some(true));
}
}

View File

@@ -0,0 +1,69 @@
//! Platform-agnostic filesystem watcher
//!
//! `sd-fs-watcher` provides a clean, storage-agnostic interface for watching
//! filesystem changes. It handles platform-specific quirks (like macOS rename
//! detection) internally and emits normalized events.
//!
//! # Architecture
//!
//! The crate is organized into layers:
//!
//! - **FsWatcher**: Main interface for watching paths and receiving events
//! - **PlatformHandler**: Platform-specific event processing (rename detection, buffering)
//! - **FsEvent/FsEventKind**: Normalized, storage-agnostic event types
//!
//! # Key Features
//!
//! - **Storage Agnostic**: No knowledge of databases, libraries, or UUIDs
//! - **Rename Detection**: Handles macOS FSEvents rename quirks via inode tracking
//! - **Event Filtering**: Built-in filtering for temp files, hidden files, etc.
//! - **Reference Counting**: Multiple watchers on the same path share resources
//! - **Broadcast Events**: Multiple subscribers can receive events concurrently
//!
//! # Example
//!
//! ```ignore
//! use sd_fs_watcher::{FsWatcher, WatchConfig, WatcherConfig};
//!
//! #[tokio::main]
//! async fn main() -> Result<(), Box<dyn std::error::Error>> {
//! // Create watcher with default config
//! let watcher = FsWatcher::new(WatcherConfig::default());
//! watcher.start().await?;
//!
//! // Subscribe to events
//! let mut rx = watcher.subscribe();
//!
//! // Watch a directory recursively
//! let _handle = watcher.watch("/path/to/watch", WatchConfig::recursive()).await?;
//!
//! // Process events
//! while let Ok(event) = rx.recv().await {
//! match event.kind {
//! sd_fs_watcher::FsEventKind::Create => println!("Created: {}", event.path.display()),
//! sd_fs_watcher::FsEventKind::Modify => println!("Modified: {}", event.path.display()),
//! sd_fs_watcher::FsEventKind::Remove => println!("Removed: {}", event.path.display()),
//! sd_fs_watcher::FsEventKind::Rename { from, to } => {
//! println!("Renamed: {} -> {}", from.display(), to.display())
//! }
//! }
//! }
//!
//! Ok(())
//! }
//! ```
mod config;
mod error;
mod event;
mod platform;
mod watcher;
pub use config::{EventFilters, WatchConfig, WatcherConfig};
pub use error::{Result, WatcherError};
pub use event::{FsEvent, FsEventKind, RawEventKind, RawNotifyEvent};
pub use platform::{EventHandler, PlatformHandler};
pub use watcher::{FsWatcher, WatchHandle};
// Re-export notify types that users might need
pub use notify::RecursiveMode;

View File

@@ -0,0 +1,160 @@
//! Linux-specific event handler
//!
//! Linux inotify provides better rename tracking than macOS FSEvents,
//! but still requires some buffering for reliable handling.
use crate::event::{FsEvent, RawEventKind, RawNotifyEvent};
use crate::platform::EventHandler;
use crate::Result;
use std::collections::HashMap;
use std::path::PathBuf;
use std::time::{Duration, Instant};
use tokio::sync::RwLock;
use tracing::trace;
/// Timeout for event stabilization
const STABILIZATION_TIMEOUT_MS: u64 = 100;
/// Linux event handler
pub struct LinuxHandler {
/// Files pending stabilization
pending_updates: RwLock<HashMap<PathBuf, Instant>>,
}
impl LinuxHandler {
/// Create a new Linux handler
pub fn new() -> Self {
Self {
pending_updates: RwLock::new(HashMap::new()),
}
}
/// Evict pending updates that have stabilized
async fn evict_updates(&self, timeout: Duration) -> Vec<FsEvent> {
let mut events = Vec::new();
let mut updates = self.pending_updates.write().await;
let mut to_remove = Vec::new();
for (path, timestamp) in updates.iter() {
if timestamp.elapsed() > timeout {
to_remove.push(path.clone());
events.push(FsEvent::modify(path.clone()));
trace!("Evicting update (stabilized): {}", path.display());
}
}
for path in to_remove {
updates.remove(&path);
}
events
}
}
impl Default for LinuxHandler {
fn default() -> Self {
Self::new()
}
}
#[async_trait::async_trait]
impl EventHandler for LinuxHandler {
async fn process(&self, event: RawNotifyEvent) -> Result<Vec<FsEvent>> {
let Some(path) = event.primary_path().cloned() else {
return Ok(vec![]);
};
match event.kind {
RawEventKind::Create => Ok(vec![FsEvent::create(path)]),
RawEventKind::Remove => Ok(vec![FsEvent::remove(path)]),
RawEventKind::Modify => {
// Buffer modifications for stabilization
let mut updates = self.pending_updates.write().await;
updates.insert(path, Instant::now());
Ok(vec![])
}
RawEventKind::Rename => {
// inotify provides rename events with both paths
if event.paths.len() >= 2 {
let from = event.paths[0].clone();
let to = event.paths[1].clone();
Ok(vec![FsEvent::rename(from, to)])
} else {
// Incomplete rename, treat as modify
let mut updates = self.pending_updates.write().await;
updates.insert(path, Instant::now());
Ok(vec![])
}
}
RawEventKind::Other(ref kind) => {
trace!("Ignoring unknown event kind: {}", kind);
Ok(vec![])
}
}
}
async fn tick(&self) -> Result<Vec<FsEvent>> {
let timeout = Duration::from_millis(STABILIZATION_TIMEOUT_MS);
Ok(self.evict_updates(timeout).await)
}
async fn reset(&self) {
self.pending_updates.write().await.clear();
}
}
#[cfg(test)]
mod tests {
use super::*;
#[tokio::test]
async fn test_handler_creation() {
let handler = LinuxHandler::new();
assert!(handler.pending_updates.read().await.is_empty());
}
#[tokio::test]
async fn test_create_event() {
let handler = LinuxHandler::new();
let event = RawNotifyEvent {
kind: RawEventKind::Create,
paths: vec![PathBuf::from("/test/file.txt")],
timestamp: std::time::SystemTime::now(),
};
let events = handler.process(event).await.unwrap();
assert_eq!(events.len(), 1);
assert!(events[0].kind.is_create());
}
#[tokio::test]
async fn test_remove_event() {
let handler = LinuxHandler::new();
let event = RawNotifyEvent {
kind: RawEventKind::Remove,
paths: vec![PathBuf::from("/test/file.txt")],
timestamp: std::time::SystemTime::now(),
};
let events = handler.process(event).await.unwrap();
assert_eq!(events.len(), 1);
assert!(events[0].kind.is_remove());
}
#[tokio::test]
async fn test_rename_event() {
let handler = LinuxHandler::new();
let event = RawNotifyEvent {
kind: RawEventKind::Rename,
paths: vec![
PathBuf::from("/test/old.txt"),
PathBuf::from("/test/new.txt"),
],
timestamp: std::time::SystemTime::now(),
};
let events = handler.process(event).await.unwrap();
assert_eq!(events.len(), 1);
assert!(events[0].kind.is_rename());
}
}

View File

@@ -0,0 +1,420 @@
//! macOS-specific event handler
//!
//! macOS FSEvents doesn't provide native rename tracking. When a file is renamed,
//! we receive separate create and delete events. This handler implements rename
//! detection by tracking inodes and buffering events.
//!
//! Key features:
//! - Inode-based rename detection
//! - Three-phase event buffering (creates, updates, removes)
//! - Timeout-based eviction for unmatched events
//! - Finder duplicate directory event deduplication
//! - Reincident file tracking for files with rapid successive changes
//! - Immediate emission for directories, buffered emission for files
use crate::event::{FsEvent, RawEventKind, RawNotifyEvent};
use crate::platform::EventHandler;
use crate::Result;
use std::collections::HashMap;
use std::os::unix::fs::MetadataExt;
use std::path::PathBuf;
use std::time::{Duration, Instant};
use tokio::sync::RwLock;
use tracing::{debug, trace};
/// Timeout for rename detection buffering (matching old path with new path)
const RENAME_TIMEOUT_MS: u64 = 500;
/// Timeout for file stabilization (avoid processing mid-write files)
const STABILIZATION_TIMEOUT_MS: u64 = 500;
/// Longer timeout for files with rapid successive changes
const REINCIDENT_TIMEOUT_MS: u64 = 10_000;
/// macOS event handler with rename detection
pub struct MacOsHandler {
/// Files pending potential rename (by inode) - the "old path" side
/// Key: inode, Value: (path, timestamp)
pending_removes: RwLock<HashMap<u64, PendingRemove>>,
/// Recently created files (by inode) for rename matching - the "new path" side
/// Key: inode, Value: (path, timestamp)
pending_creates: RwLock<HashMap<u64, PendingCreate>>,
/// Files to update after stabilization
/// Key: path, Value: timestamp
pending_updates: RwLock<HashMap<PathBuf, Instant>>,
/// Files with multiple rapid changes - use longer timeout
/// Key: path, Value: first change timestamp
reincident_updates: RwLock<HashMap<PathBuf, Instant>>,
/// Last created directory path - for Finder duplicate event deduplication
last_created_dir: RwLock<Option<PathBuf>>,
}
#[derive(Debug, Clone)]
struct PendingRemove {
path: PathBuf,
#[allow(dead_code)] // Used for debugging, will be useful for enhanced inode tracking
inode: u64,
timestamp: Instant,
}
#[derive(Debug, Clone)]
struct PendingCreate {
path: PathBuf,
inode: u64,
timestamp: Instant,
}
impl MacOsHandler {
/// Create a new macOS handler
pub fn new() -> Self {
Self {
pending_removes: RwLock::new(HashMap::new()),
pending_creates: RwLock::new(HashMap::new()),
pending_updates: RwLock::new(HashMap::new()),
reincident_updates: RwLock::new(HashMap::new()),
last_created_dir: RwLock::new(None),
}
}
/// Get the inode for a path
async fn get_inode(path: &PathBuf) -> Option<u64> {
match tokio::fs::metadata(path).await {
Ok(metadata) => Some(metadata.ino()),
Err(_) => None,
}
}
/// Check if path is a directory
async fn is_directory(path: &PathBuf) -> bool {
tokio::fs::metadata(path)
.await
.map(|m| m.is_dir())
.unwrap_or(false)
}
/// Try to match a create event with a pending remove (rename detection)
async fn try_match_rename(&self, path: &PathBuf, inode: u64) -> Option<PathBuf> {
let mut removes = self.pending_removes.write().await;
if let Some(pending) = removes.remove(&inode) {
debug!(
"Rename detected: {} -> {} (inode {})",
pending.path.display(),
path.display(),
inode
);
Some(pending.path)
} else {
None
}
}
/// Process create events, attempting rename matching
async fn process_create(&self, path: PathBuf) -> Result<Vec<FsEvent>> {
// Check if this is a directory
if Self::is_directory(&path).await {
// Dedupe Finder's duplicate directory creation events
{
let mut last_dir = self.last_created_dir.write().await;
if let Some(ref last) = *last_dir {
if *last == path {
trace!(
"Ignoring duplicate directory create event: {}",
path.display()
);
return Ok(vec![]);
}
}
*last_dir = Some(path.clone());
}
// Directories emit immediately (no rename detection needed)
debug!(
"Directory created, emitting immediately: {}",
path.display()
);
return Ok(vec![FsEvent::create_dir(path)]);
}
// For files, get inode for rename detection
let Some(inode) = Self::get_inode(&path).await else {
// File might have been deleted already
debug!("Could not get inode for created file: {}", path.display());
return Ok(vec![FsEvent::create(path)]);
};
// Check if this matches a pending remove (rename)
if let Some(from_path) = self.try_match_rename(&path, inode).await {
return Ok(vec![FsEvent::rename(from_path, path)]);
}
// Buffer the create for potential later rename matching
{
let mut creates = self.pending_creates.write().await;
creates.insert(
inode,
PendingCreate {
path: path.clone(),
inode,
timestamp: Instant::now(),
},
);
}
// Don't emit yet - will be emitted on tick if no matching remove comes
Ok(vec![])
}
/// Process remove events, buffering for rename detection
async fn process_remove(&self, path: PathBuf) -> Result<Vec<FsEvent>> {
// Try to get the inode from our pending creates (the file might already be gone)
// If we can't get the inode, emit immediately as a remove
let inode = {
let creates = self.pending_creates.read().await;
creates.values().find(|c| c.path == path).map(|c| c.inode)
};
// If we have a matching pending create, this is a rapid create+delete
if let Some(inode) = inode {
let mut creates = self.pending_creates.write().await;
if creates.remove(&inode).is_some() {
debug!(
"Rapid create+delete detected, neutralizing: {}",
path.display()
);
return Ok(vec![]);
}
}
// Try to get inode from the filesystem (file might still exist briefly)
if let Some(inode) = Self::get_inode(&path).await {
// Buffer for potential rename matching
let mut removes = self.pending_removes.write().await;
removes.insert(
inode,
PendingRemove {
path: path.clone(),
inode,
timestamp: Instant::now(),
},
);
trace!("Buffered remove for rename detection: {}", path.display());
return Ok(vec![]);
}
// File is gone and we couldn't get inode - emit remove
Ok(vec![FsEvent::remove(path)])
}
/// Process modify events with stabilization buffering
async fn process_modify(&self, path: PathBuf) -> Result<Vec<FsEvent>> {
let mut updates = self.pending_updates.write().await;
let mut reincident = self.reincident_updates.write().await;
// Check if this file is already pending - track as reincident
if let Some(old_instant) = updates.insert(path.clone(), Instant::now()) {
// File had a previous pending update - mark as reincident for longer timeout
reincident.entry(path).or_insert(old_instant);
}
Ok(vec![])
}
/// Evict pending creates that have timed out
async fn evict_creates(&self, timeout: Duration) -> Vec<FsEvent> {
let mut events = Vec::new();
let mut creates = self.pending_creates.write().await;
let mut to_remove = Vec::new();
for (inode, pending) in creates.iter() {
if pending.timestamp.elapsed() > timeout {
to_remove.push(*inode);
// Files only - directories are emitted immediately in process_create
events.push(FsEvent::create_file(pending.path.clone()));
trace!(
"Evicting create (no matching remove): {}",
pending.path.display()
);
}
}
for inode in to_remove {
creates.remove(&inode);
}
events
}
/// Evict pending removes that have timed out
async fn evict_removes(&self, timeout: Duration) -> Vec<FsEvent> {
let mut events = Vec::new();
let mut removes = self.pending_removes.write().await;
let mut to_remove = Vec::new();
for (inode, pending) in removes.iter() {
if pending.timestamp.elapsed() > timeout {
to_remove.push(*inode);
events.push(FsEvent::remove(pending.path.clone()));
trace!(
"Evicting remove (no matching create): {}",
pending.path.display()
);
}
}
for inode in to_remove {
removes.remove(&inode);
}
events
}
/// Evict pending updates that have stabilized
async fn evict_updates(&self, timeout: Duration) -> Vec<FsEvent> {
let mut events = Vec::new();
let mut updates = self.pending_updates.write().await;
let mut reincident = self.reincident_updates.write().await;
let reincident_timeout = Duration::from_millis(REINCIDENT_TIMEOUT_MS);
let mut to_remove = Vec::new();
for (path, timestamp) in updates.iter() {
// Check if this is a reincident file (use longer timeout)
let effective_timeout = if reincident.contains_key(path) {
reincident_timeout
} else {
timeout
};
if timestamp.elapsed() > effective_timeout {
to_remove.push(path.clone());
// Emit as Create for files - the responder will detect if it's an update via inode
events.push(FsEvent::create_file(path.clone()));
trace!(
"Evicting update (stabilized after {}ms): {}",
timestamp.elapsed().as_millis(),
path.display()
);
}
}
for path in &to_remove {
updates.remove(path);
reincident.remove(path);
}
events
}
}
impl Default for MacOsHandler {
fn default() -> Self {
Self::new()
}
}
#[async_trait::async_trait]
impl EventHandler for MacOsHandler {
async fn process(&self, event: RawNotifyEvent) -> Result<Vec<FsEvent>> {
let Some(path) = event.primary_path().cloned() else {
return Ok(vec![]);
};
match event.kind {
RawEventKind::Create => self.process_create(path).await,
RawEventKind::Remove => self.process_remove(path).await,
RawEventKind::Modify => self.process_modify(path).await,
RawEventKind::Rename => {
// On macOS, rename events from notify are unreliable
// We handle renames via inode tracking instead
// Treat as modify and let inode tracking handle it
self.process_modify(path).await
}
RawEventKind::Other(ref kind) => {
trace!("Ignoring unknown event kind: {}", kind);
Ok(vec![])
}
}
}
async fn tick(&self) -> Result<Vec<FsEvent>> {
let rename_timeout = Duration::from_millis(RENAME_TIMEOUT_MS);
let stabilization_timeout = Duration::from_millis(STABILIZATION_TIMEOUT_MS);
let mut events = Vec::new();
// Evict in order: updates first, then creates, then removes
// This ensures proper ordering for related events
events.extend(self.evict_updates(stabilization_timeout).await);
events.extend(self.evict_creates(rename_timeout).await);
events.extend(self.evict_removes(rename_timeout).await);
Ok(events)
}
async fn reset(&self) {
self.pending_removes.write().await.clear();
self.pending_creates.write().await.clear();
self.pending_updates.write().await.clear();
self.reincident_updates.write().await.clear();
*self.last_created_dir.write().await = None;
}
}
#[cfg(test)]
mod tests {
use super::*;
use std::path::PathBuf;
#[tokio::test]
async fn test_handler_creation() {
let handler = MacOsHandler::new();
// Should start with empty buffers
assert!(handler.pending_removes.read().await.is_empty());
assert!(handler.pending_creates.read().await.is_empty());
assert!(handler.pending_updates.read().await.is_empty());
assert!(handler.reincident_updates.read().await.is_empty());
assert!(handler.last_created_dir.read().await.is_none());
}
#[tokio::test]
async fn test_reset() {
let handler = MacOsHandler::new();
// Add some pending data
{
let mut updates = handler.pending_updates.write().await;
updates.insert(PathBuf::from("/test"), Instant::now());
}
{
let mut last_dir = handler.last_created_dir.write().await;
*last_dir = Some(PathBuf::from("/test/dir"));
}
// Reset should clear everything
handler.reset().await;
assert!(handler.pending_updates.read().await.is_empty());
assert!(handler.last_created_dir.read().await.is_none());
}
#[tokio::test]
async fn test_reincident_tracking() {
let handler = MacOsHandler::new();
let path = PathBuf::from("/test/file.txt");
// First modify - should not be reincident
{
let mut updates = handler.pending_updates.write().await;
updates.insert(path.clone(), Instant::now());
}
assert!(handler.reincident_updates.read().await.is_empty());
// Second modify - should mark as reincident
handler.process_modify(path.clone()).await.unwrap();
assert!(handler.reincident_updates.read().await.contains_key(&path));
}
}

View File

@@ -0,0 +1,144 @@
//! Platform-specific event handlers
//!
//! Each platform has different filesystem event semantics. Platform handlers
//! translate raw OS events into normalized `FsEvent` types.
//!
//! Key responsibilities:
//! - Rename detection (especially on macOS where renames come as separate create/delete events)
//! - Event buffering and debouncing
//! - Platform-specific quirk handling
//!
//! Platform handlers are storage-agnostic - they return raw events without
//! any knowledge of locations, libraries, or databases.
use crate::event::{FsEvent, RawNotifyEvent};
use crate::Result;
#[cfg(target_os = "linux")]
mod linux;
#[cfg(target_os = "macos")]
mod macos;
#[cfg(target_os = "windows")]
mod windows;
#[cfg(target_os = "linux")]
pub use linux::LinuxHandler;
#[cfg(target_os = "macos")]
pub use macos::MacOsHandler;
#[cfg(target_os = "windows")]
pub use windows::WindowsHandler;
/// Trait for platform-specific event processing
///
/// Platform handlers receive raw notify events and return normalized `FsEvent` values.
/// They may buffer events internally for rename detection or debouncing.
#[async_trait::async_trait]
pub trait EventHandler: Send + Sync {
/// Process a raw event and return normalized events
///
/// Platform handlers may:
/// - Buffer events internally (e.g., for rename detection)
/// - Return empty vec if event is still being processed
/// - Return multiple events if buffered events are ready
async fn process(&self, event: RawNotifyEvent) -> Result<Vec<FsEvent>>;
/// Periodic tick for evicting buffered events
///
/// Returns events that have been buffered and are now ready to emit.
/// For example, files that didn't match rename patterns after a timeout.
async fn tick(&self) -> Result<Vec<FsEvent>>;
/// Reset internal state (e.g., clear buffers)
async fn reset(&self);
}
/// Platform handler wrapper that selects the appropriate implementation
pub struct PlatformHandler {
#[cfg(target_os = "macos")]
inner: MacOsHandler,
#[cfg(target_os = "linux")]
inner: LinuxHandler,
#[cfg(target_os = "windows")]
inner: WindowsHandler,
#[cfg(not(any(target_os = "macos", target_os = "linux", target_os = "windows")))]
inner: DefaultHandler,
}
impl PlatformHandler {
/// Create a new platform handler for the current platform
pub fn new() -> Self {
Self {
#[cfg(target_os = "macos")]
inner: MacOsHandler::new(),
#[cfg(target_os = "linux")]
inner: LinuxHandler::new(),
#[cfg(target_os = "windows")]
inner: WindowsHandler::new(),
#[cfg(not(any(target_os = "macos", target_os = "linux", target_os = "windows")))]
inner: DefaultHandler::new(),
}
}
/// Process a raw event
pub async fn process(&self, event: RawNotifyEvent) -> Result<Vec<FsEvent>> {
self.inner.process(event).await
}
/// Periodic tick for buffered event eviction
pub async fn tick(&self) -> Result<Vec<FsEvent>> {
self.inner.tick().await
}
/// Reset internal state
pub async fn reset(&self) {
self.inner.reset().await
}
}
impl Default for PlatformHandler {
fn default() -> Self {
Self::new()
}
}
/// Default handler for unsupported platforms
#[cfg(not(any(target_os = "macos", target_os = "linux", target_os = "windows")))]
pub struct DefaultHandler;
#[cfg(not(any(target_os = "macos", target_os = "linux", target_os = "windows")))]
impl DefaultHandler {
pub fn new() -> Self {
Self
}
}
#[cfg(not(any(target_os = "macos", target_os = "linux", target_os = "windows")))]
#[async_trait::async_trait]
impl EventHandler for DefaultHandler {
async fn process(&self, event: RawNotifyEvent) -> Result<Vec<FsEvent>> {
use crate::event::RawEventKind;
let Some(path) = event.primary_path().cloned() else {
return Ok(vec![]);
};
let fs_event = match event.kind {
RawEventKind::Create => FsEvent::create(path),
RawEventKind::Modify => FsEvent::modify(path),
RawEventKind::Remove => FsEvent::remove(path),
RawEventKind::Rename => {
// Without platform-specific handling, treat rename as modify
FsEvent::modify(path)
}
RawEventKind::Other(_) => return Ok(vec![]),
};
Ok(vec![fs_event])
}
async fn tick(&self) -> Result<Vec<FsEvent>> {
Ok(vec![])
}
async fn reset(&self) {}
}

View File

@@ -0,0 +1,194 @@
//! Windows-specific event handler
//!
//! Windows ReadDirectoryChangesW provides reasonable rename tracking,
//! but still benefits from event buffering for stability.
use crate::event::{FsEvent, RawEventKind, RawNotifyEvent};
use crate::platform::EventHandler;
use crate::Result;
use std::collections::HashMap;
use std::path::PathBuf;
use std::time::{Duration, Instant};
use tokio::sync::RwLock;
use tracing::trace;
/// Timeout for event stabilization
const STABILIZATION_TIMEOUT_MS: u64 = 100;
/// Windows event handler
pub struct WindowsHandler {
/// Files pending stabilization
pending_updates: RwLock<HashMap<PathBuf, Instant>>,
/// Pending rename sources (waiting for target)
pending_rename_from: RwLock<Option<(PathBuf, Instant)>>,
}
impl WindowsHandler {
/// Create a new Windows handler
pub fn new() -> Self {
Self {
pending_updates: RwLock::new(HashMap::new()),
pending_rename_from: RwLock::new(None),
}
}
/// Evict pending updates that have stabilized
async fn evict_updates(&self, timeout: Duration) -> Vec<FsEvent> {
let mut events = Vec::new();
let mut updates = self.pending_updates.write().await;
let mut to_remove = Vec::new();
for (path, timestamp) in updates.iter() {
if timestamp.elapsed() > timeout {
to_remove.push(path.clone());
events.push(FsEvent::modify(path.clone()));
trace!("Evicting update (stabilized): {}", path.display());
}
}
for path in to_remove {
updates.remove(&path);
}
events
}
/// Evict pending rename source if timed out
async fn evict_pending_rename(&self, timeout: Duration) -> Vec<FsEvent> {
let mut events = Vec::new();
let mut pending = self.pending_rename_from.write().await;
if let Some((path, timestamp)) = pending.take() {
if timestamp.elapsed() > timeout {
// Rename source without target - treat as remove
events.push(FsEvent::remove(path));
} else {
// Put it back
*pending = Some((path, timestamp));
}
}
events
}
}
impl Default for WindowsHandler {
fn default() -> Self {
Self::new()
}
}
#[async_trait::async_trait]
impl EventHandler for WindowsHandler {
async fn process(&self, event: RawNotifyEvent) -> Result<Vec<FsEvent>> {
let Some(path) = event.primary_path().cloned() else {
return Ok(vec![]);
};
match event.kind {
RawEventKind::Create => {
// Check if this matches a pending rename
let pending = self.pending_rename_from.write().await.take();
if let Some((from_path, _)) = pending {
return Ok(vec![FsEvent::rename(from_path, path)]);
}
Ok(vec![FsEvent::create(path)])
}
RawEventKind::Remove => {
// Buffer as potential rename source
let mut pending = self.pending_rename_from.write().await;
*pending = Some((path, Instant::now()));
Ok(vec![])
}
RawEventKind::Modify => {
// Buffer modifications for stabilization
let mut updates = self.pending_updates.write().await;
updates.insert(path, Instant::now());
Ok(vec![])
}
RawEventKind::Rename => {
// Windows sometimes provides proper rename events
if event.paths.len() >= 2 {
let from = event.paths[0].clone();
let to = event.paths[1].clone();
Ok(vec![FsEvent::rename(from, to)])
} else {
// Incomplete rename, buffer it
let mut pending = self.pending_rename_from.write().await;
*pending = Some((path, Instant::now()));
Ok(vec![])
}
}
RawEventKind::Other(ref kind) => {
trace!("Ignoring unknown event kind: {}", kind);
Ok(vec![])
}
}
}
async fn tick(&self) -> Result<Vec<FsEvent>> {
let timeout = Duration::from_millis(STABILIZATION_TIMEOUT_MS);
let mut events = Vec::new();
events.extend(self.evict_updates(timeout).await);
events.extend(self.evict_pending_rename(timeout).await);
Ok(events)
}
async fn reset(&self) {
self.pending_updates.write().await.clear();
*self.pending_rename_from.write().await = None;
}
}
#[cfg(test)]
mod tests {
use super::*;
#[tokio::test]
async fn test_handler_creation() {
let handler = WindowsHandler::new();
assert!(handler.pending_updates.read().await.is_empty());
assert!(handler.pending_rename_from.read().await.is_none());
}
#[tokio::test]
async fn test_create_event() {
let handler = WindowsHandler::new();
let event = RawNotifyEvent {
kind: RawEventKind::Create,
paths: vec![PathBuf::from("C:\\test\\file.txt")],
timestamp: std::time::SystemTime::now(),
};
let events = handler.process(event).await.unwrap();
assert_eq!(events.len(), 1);
assert!(events[0].kind.is_create());
}
#[tokio::test]
async fn test_rename_detection() {
let handler = WindowsHandler::new();
// First, a remove event (potential rename source)
let remove_event = RawNotifyEvent {
kind: RawEventKind::Remove,
paths: vec![PathBuf::from("C:\\test\\old.txt")],
timestamp: std::time::SystemTime::now(),
};
let events = handler.process(remove_event).await.unwrap();
assert!(events.is_empty()); // Buffered
// Then, a create event (rename target)
let create_event = RawNotifyEvent {
kind: RawEventKind::Create,
paths: vec![PathBuf::from("C:\\test\\new.txt")],
timestamp: std::time::SystemTime::now(),
};
let events = handler.process(create_event).await.unwrap();
assert_eq!(events.len(), 1);
assert!(events[0].kind.is_rename());
}
}

View File

@@ -0,0 +1,535 @@
//! Main filesystem watcher implementation
//!
//! `FsWatcher` is the primary interface for watching filesystem changes.
//! It's storage-agnostic - it only knows about paths and events, not
//! about locations, libraries, or databases.
use crate::config::{WatchConfig, WatcherConfig};
use crate::error::{Result, WatcherError};
use crate::event::{FsEvent, RawNotifyEvent};
use crate::platform::PlatformHandler;
use notify::{RecommendedWatcher, RecursiveMode, Watcher as NotifyWatcher};
use std::collections::HashMap;
use std::path::{Path, PathBuf};
use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
use std::sync::Arc;
use tokio::sync::{broadcast, mpsc, RwLock};
use tracing::{debug, error, info, trace, warn};
/// Handle returned when watching a path
///
/// When dropped, the path is automatically unwatched (if no other handles exist).
pub struct WatchHandle {
path: PathBuf,
watcher: Arc<FsWatcherInner>,
}
impl std::fmt::Debug for WatchHandle {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("WatchHandle")
.field("path", &self.path)
.finish()
}
}
impl Drop for WatchHandle {
fn drop(&mut self) {
// Decrement reference count and unwatch if zero
let path = self.path.clone();
let inner = self.watcher.clone();
// Spawn a task to handle the async unwatch
tokio::spawn(async move {
if let Err(e) = inner.release_watch(&path).await {
warn!("Failed to release watch for {}: {}", path.display(), e);
}
});
}
}
/// Watch state for a path
struct WatchState {
config: WatchConfig,
ref_count: usize,
}
/// Internal watcher state
struct FsWatcherInner {
/// Configuration
config: WatcherConfig,
/// Watched paths with reference counting
watched_paths: RwLock<HashMap<PathBuf, WatchState>>,
/// The notify watcher instance
notify_watcher: RwLock<Option<RecommendedWatcher>>,
/// Platform-specific event handler
platform_handler: PlatformHandler,
/// Whether the watcher is running
is_running: AtomicBool,
/// Event sender for broadcasts
event_tx: broadcast::Sender<FsEvent>,
/// Metrics
events_received: AtomicU64,
events_emitted: AtomicU64,
}
impl FsWatcherInner {
/// Add a watch with reference counting
async fn add_watch(&self, path: PathBuf, config: WatchConfig) -> Result<()> {
let mut watched = self.watched_paths.write().await;
if let Some(state) = watched.get_mut(&path) {
// Path already watched - increment ref count
state.ref_count += 1;
debug!(
"Incremented ref count for {}: {}",
path.display(),
state.ref_count
);
return Ok(());
}
// Validate path exists
if !path.exists() {
return Err(WatcherError::PathNotFound(path));
}
// Register with notify if we're running
if self.is_running.load(Ordering::SeqCst) {
if let Some(watcher) = self.notify_watcher.write().await.as_mut() {
let mode = if config.recursive {
RecursiveMode::Recursive
} else {
RecursiveMode::NonRecursive
};
watcher.watch(&path, mode).map_err(|e| WatcherError::WatchFailed {
path: path.clone(),
reason: e.to_string(),
})?;
}
}
watched.insert(
path.clone(),
WatchState {
config,
ref_count: 1,
},
);
debug!("Started watching: {}", path.display());
Ok(())
}
/// Release a watch (decrement ref count, unwatch if zero)
async fn release_watch(&self, path: &Path) -> Result<()> {
let mut watched = self.watched_paths.write().await;
let should_unwatch = if let Some(state) = watched.get_mut(path) {
state.ref_count -= 1;
debug!(
"Decremented ref count for {}: {}",
path.display(),
state.ref_count
);
state.ref_count == 0
} else {
return Ok(()); // Not watched
};
if should_unwatch {
watched.remove(path);
// Unregister from notify if we're running
if self.is_running.load(Ordering::SeqCst) {
if let Some(watcher) = self.notify_watcher.write().await.as_mut() {
if let Err(e) = watcher.unwatch(path) {
warn!("Failed to unwatch {}: {}", path.display(), e);
}
}
}
debug!("Stopped watching: {}", path.display());
}
Ok(())
}
}
/// Platform-agnostic filesystem watcher
///
/// Watches filesystem paths and emits normalized events. Handles platform-specific
/// quirks like macOS rename detection internally.
///
/// # Example
///
/// ```ignore
/// use sd_fs_watcher::{FsWatcher, WatchConfig};
///
/// let watcher = FsWatcher::new(Default::default());
/// watcher.start().await?;
///
/// // Subscribe to events
/// let mut rx = watcher.subscribe();
///
/// // Watch a path
/// let handle = watcher.watch("/path/to/watch", WatchConfig::recursive()).await?;
///
/// // Receive events
/// while let Ok(event) = rx.recv().await {
/// println!("Event: {:?}", event);
/// }
/// ```
pub struct FsWatcher {
inner: Arc<FsWatcherInner>,
}
impl FsWatcher {
/// Create a new filesystem watcher
pub fn new(config: WatcherConfig) -> Self {
let (event_tx, _) = broadcast::channel(config.event_buffer_size);
Self {
inner: Arc::new(FsWatcherInner {
config,
watched_paths: RwLock::new(HashMap::new()),
notify_watcher: RwLock::new(None),
platform_handler: PlatformHandler::new(),
is_running: AtomicBool::new(false),
event_tx,
events_received: AtomicU64::new(0),
events_emitted: AtomicU64::new(0),
}),
}
}
/// Start the watcher
pub async fn start(&self) -> Result<()> {
if self.inner.is_running.swap(true, Ordering::SeqCst) {
return Err(WatcherError::AlreadyRunning);
}
info!("Starting filesystem watcher");
// Create channel for raw events from notify
let (raw_tx, raw_rx) = mpsc::channel(self.inner.config.event_buffer_size);
// Create the notify watcher
let raw_tx_clone = raw_tx.clone();
let inner_clone = self.inner.clone();
let watcher = notify::recommended_watcher(move |res: std::result::Result<notify::Event, notify::Error>| {
match res {
Ok(event) => {
inner_clone.events_received.fetch_add(1, Ordering::Relaxed);
let raw_event = RawNotifyEvent::from_notify(event);
if let Err(e) = raw_tx_clone.try_send(raw_event) {
error!("Failed to send raw event: {}", e);
}
}
Err(e) => {
error!("Notify watcher error: {}", e);
}
}
})
.map_err(|e| WatcherError::StartFailed(e.to_string()))?;
*self.inner.notify_watcher.write().await = Some(watcher);
// Register all existing watched paths
self.register_existing_watches().await?;
// Start the event processing loop
self.start_event_loop(raw_rx).await;
info!("Filesystem watcher started");
Ok(())
}
/// Stop the watcher
pub async fn stop(&self) -> Result<()> {
if !self.inner.is_running.swap(false, Ordering::SeqCst) {
return Ok(()); // Already stopped
}
info!("Stopping filesystem watcher");
// Clear the notify watcher
*self.inner.notify_watcher.write().await = None;
// Reset platform handler state
self.inner.platform_handler.reset().await;
info!("Filesystem watcher stopped");
Ok(())
}
/// Check if the watcher is running
pub fn is_running(&self) -> bool {
self.inner.is_running.load(Ordering::SeqCst)
}
/// Watch a path
///
/// Returns a handle that automatically unwatches when dropped.
pub async fn watch(&self, path: impl AsRef<Path>, config: WatchConfig) -> Result<WatchHandle> {
let path = path.as_ref().to_path_buf();
self.inner.add_watch(path.clone(), config).await?;
Ok(WatchHandle {
path,
watcher: self.inner.clone(),
})
}
/// Watch a path without returning a handle
///
/// Use this when you want to manually manage watch lifecycle via `unwatch()`.
pub async fn watch_path(&self, path: impl AsRef<Path>, config: WatchConfig) -> Result<()> {
let path = path.as_ref().to_path_buf();
self.inner.add_watch(path, config).await
}
/// Unwatch a path
pub async fn unwatch(&self, path: impl AsRef<Path>) -> Result<()> {
self.inner.release_watch(path.as_ref()).await
}
/// Get all watched paths
pub async fn watched_paths(&self) -> Vec<PathBuf> {
self.inner
.watched_paths
.read()
.await
.keys()
.cloned()
.collect()
}
/// Subscribe to filesystem events
pub fn subscribe(&self) -> broadcast::Receiver<FsEvent> {
self.inner.event_tx.subscribe()
}
/// Get the number of events received
pub fn events_received(&self) -> u64 {
self.inner.events_received.load(Ordering::Relaxed)
}
/// Get the number of events emitted
pub fn events_emitted(&self) -> u64 {
self.inner.events_emitted.load(Ordering::Relaxed)
}
/// Register existing watches with notify
async fn register_existing_watches(&self) -> Result<()> {
let watched = self.inner.watched_paths.read().await;
let mut watcher_guard = self.inner.notify_watcher.write().await;
if let Some(watcher) = watcher_guard.as_mut() {
for (path, state) in watched.iter() {
let mode = if state.config.recursive {
RecursiveMode::Recursive
} else {
RecursiveMode::NonRecursive
};
if let Err(e) = watcher.watch(path, mode) {
warn!("Failed to register watch for {}: {}", path.display(), e);
} else {
debug!("Registered watch for: {}", path.display());
}
}
}
Ok(())
}
/// Start the event processing loop
async fn start_event_loop(&self, mut raw_rx: mpsc::Receiver<RawNotifyEvent>) {
let inner = self.inner.clone();
let tick_interval = self.inner.config.tick_interval;
tokio::spawn(async move {
info!("Event processing loop started");
loop {
if !inner.is_running.load(Ordering::SeqCst) {
break;
}
tokio::select! {
// Process incoming raw events
Some(raw_event) = raw_rx.recv() => {
// Check if path should be filtered
let should_process = if let Some(path) = raw_event.primary_path() {
let watched = inner.watched_paths.read().await;
// Find the watch config for this path
let config = watched.iter().find(|(watched_path, _)| {
path.starts_with(watched_path)
}).map(|(_, state)| &state.config);
if let Some(config) = config {
!config.filters.should_skip(path)
} else {
true // No filter config, process anyway
}
} else {
false
};
if should_process {
// Process through platform handler
match inner.platform_handler.process(raw_event).await {
Ok(events) => {
for event in events {
inner.events_emitted.fetch_add(1, Ordering::Relaxed);
if let Err(e) = inner.event_tx.send(event) {
trace!("No event subscribers: {}", e);
}
}
}
Err(e) => {
error!("Error processing event: {}", e);
}
}
}
}
// Periodic tick for buffered event eviction
_ = tokio::time::sleep(tick_interval) => {
match inner.platform_handler.tick().await {
Ok(events) => {
for event in events {
inner.events_emitted.fetch_add(1, Ordering::Relaxed);
if let Err(e) = inner.event_tx.send(event) {
trace!("No event subscribers: {}", e);
}
}
}
Err(e) => {
error!("Error during tick: {}", e);
}
}
}
}
}
info!("Event processing loop stopped");
});
}
}
impl Default for FsWatcher {
fn default() -> Self {
Self::new(WatcherConfig::default())
}
}
#[cfg(test)]
mod tests {
use super::*;
use std::time::Duration;
use tempfile::TempDir;
#[tokio::test]
async fn test_watcher_creation() {
let watcher = FsWatcher::new(WatcherConfig::default());
assert!(!watcher.is_running());
}
#[tokio::test]
async fn test_watcher_start_stop() {
let watcher = FsWatcher::new(WatcherConfig::default());
watcher.start().await.unwrap();
assert!(watcher.is_running());
watcher.stop().await.unwrap();
assert!(!watcher.is_running());
}
#[tokio::test]
async fn test_watch_path() {
let watcher = FsWatcher::new(WatcherConfig::default());
watcher.start().await.unwrap();
let temp_dir = TempDir::new().unwrap();
watcher
.watch_path(temp_dir.path(), WatchConfig::recursive())
.await
.unwrap();
let paths = watcher.watched_paths().await;
assert_eq!(paths.len(), 1);
assert_eq!(paths[0], temp_dir.path());
watcher.stop().await.unwrap();
}
#[tokio::test]
async fn test_watch_handle_drops() {
let watcher = FsWatcher::new(WatcherConfig::default());
watcher.start().await.unwrap();
let temp_dir = TempDir::new().unwrap();
{
let _handle = watcher
.watch(temp_dir.path(), WatchConfig::recursive())
.await
.unwrap();
let paths = watcher.watched_paths().await;
assert_eq!(paths.len(), 1);
}
// Give time for the async drop to complete
tokio::time::sleep(Duration::from_millis(100)).await;
let paths = watcher.watched_paths().await;
assert_eq!(paths.len(), 0);
watcher.stop().await.unwrap();
}
#[tokio::test]
async fn test_reference_counting() {
let watcher = FsWatcher::new(WatcherConfig::default());
watcher.start().await.unwrap();
let temp_dir = TempDir::new().unwrap();
// Watch the same path twice
let _handle1 = watcher
.watch(temp_dir.path(), WatchConfig::recursive())
.await
.unwrap();
let _handle2 = watcher
.watch(temp_dir.path(), WatchConfig::recursive())
.await
.unwrap();
let paths = watcher.watched_paths().await;
assert_eq!(paths.len(), 1); // Only one path in the map
drop(_handle1);
tokio::time::sleep(Duration::from_millis(100)).await;
// Should still be watched (handle2 exists)
let paths = watcher.watched_paths().await;
assert_eq!(paths.len(), 1);
drop(_handle2);
tokio::time::sleep(Duration::from_millis(100)).await;
// Now should be unwatched
let paths = watcher.watched_paths().await;
assert_eq!(paths.len(), 0);
watcher.stop().await.unwrap();
}
}

View File

@@ -2,6 +2,8 @@ import copyOgg from "./copy.ogg";
import copyMp3 from "./copy.mp3";
import startupOgg from "./startup.ogg";
import startupMp3 from "./startup.mp3";
import pairingOgg from "./pairing.ogg";
import pairingMp3 from "./pairing.mp3";
/**
* Play a sound effect
@@ -26,4 +28,5 @@ function playSound(oggSrc: string, mp3Src: string, volume = 0.5) {
export const sounds = {
copy: () => playSound(copyOgg, copyMp3, 0.3),
startup: () => playSound(startupOgg, startupMp3, 0.5),
pairing: () => playSound(pairingOgg, pairingMp3, 0.5),
};

View File

Binary file not shown.

View File

Binary file not shown.

View File

@@ -43,6 +43,7 @@ function FileOperationDialog(props: FileOperationDialogProps) {
const dialog = useDialog(props);
const form = useForm();
const [phase, setPhase] = useState<DialogPhase>({ type: "form" });
const [operation, setOperation] = useState<"copy" | "move">(props.operation);
const [conflictResolution, setConflictResolution] = useState<ConflictResolution>("Skip");
const copyFiles = useLibraryMutation("files.copy");
@@ -51,14 +52,14 @@ function FileOperationDialog(props: FileOperationDialogProps) {
try {
setPhase({ type: "executing" });
// Execute with the user's chosen conflict resolution
// Execute with the user's chosen operation and conflict resolution
await copyFiles.mutateAsync({
sources: { paths: props.sources },
destination: props.destination,
overwrite: conflictResolution === "Overwrite",
verify_checksum: false,
preserve_timestamps: true,
move_files: props.operation === "move",
move_files: operation === "move",
copy_method: "Auto",
on_conflict: conflictResolution,
});
@@ -87,7 +88,7 @@ function FileOperationDialog(props: FileOperationDialogProps) {
<Dialog
dialog={dialog}
form={form}
title={props.operation === "copy" ? "Copying Files" : "Moving Files"}
title={operation === "copy" ? "Copying Files" : "Moving Files"}
icon={<Files size={20} weight="bold" />}
hideButtons
>
@@ -95,7 +96,7 @@ function FileOperationDialog(props: FileOperationDialogProps) {
<div className="flex items-center justify-center gap-3">
<CircleNotch className="size-6 text-accent animate-spin" weight="bold" />
<span className="text-sm text-ink">
{props.operation === "copy" ? "Copying files..." : "Moving files..."}
{operation === "copy" ? "Copying files..." : "Moving files..."}
</span>
</div>
</div>
@@ -127,14 +128,14 @@ function FileOperationDialog(props: FileOperationDialogProps) {
);
}
// Form state - let user choose conflict resolution
// Form state - let user choose operation and conflict resolution
return (
<Dialog
dialog={dialog}
form={form}
title={props.operation === "copy" ? "Copy Files" : "Move Files"}
title="File Operation"
icon={<Files size={20} weight="bold" />}
ctaLabel={props.operation === "copy" ? "Copy" : "Move"}
ctaLabel={operation === "copy" ? "Copy" : "Move"}
onSubmit={handleSubmit}
onCancelled={handleCancel}
>
@@ -143,9 +144,7 @@ function FileOperationDialog(props: FileOperationDialogProps) {
<div className="flex items-start gap-3 p-3 bg-app rounded-md">
<FolderOpen className="size-5 text-accent mt-0.5" weight="fill" />
<div className="flex-1 min-w-0">
<div className="text-xs text-ink-dull mb-1">
{props.operation === "copy" ? "Copying to:" : "Moving to:"}
</div>
<div className="text-xs text-ink-dull mb-1">Destination:</div>
<div className="text-sm text-ink font-medium truncate">
{formatDestination(props.destination)}
</div>
@@ -155,6 +154,35 @@ function FileOperationDialog(props: FileOperationDialogProps) {
</div>
</div>
{/* Operation type selection */}
<div className="space-y-2">
<div className="text-xs font-medium text-ink-dull mb-2">Operation:</div>
<div className="flex gap-2">
<button
type="button"
onClick={() => setOperation("copy")}
className={`flex-1 px-3 py-2 rounded-md text-sm font-medium transition-colors ${
operation === "copy"
? "bg-accent text-white"
: "bg-app-box text-ink hover:bg-app-hover"
}`}
>
Copy
</button>
<button
type="button"
onClick={() => setOperation("move")}
className={`flex-1 px-3 py-2 rounded-md text-sm font-medium transition-colors ${
operation === "move"
? "bg-accent text-white"
: "bg-app-box text-ink hover:bg-app-hover"
}`}
>
Move
</button>
</div>
</div>
{/* Conflict resolution options */}
<div className="space-y-2">
<div className="text-xs font-medium text-ink-dull mb-2">

View File

@@ -13,6 +13,7 @@ import { motion, AnimatePresence } from "framer-motion";
import clsx from "clsx";
import QRCode from "qrcode";
import { useCoreMutation, useCoreQuery } from "../context";
import { sounds } from "@sd/assets/sounds";
interface PairingModalProps {
isOpen: boolean;
@@ -86,6 +87,7 @@ export function PairingModal({ isOpen, onClose, mode: initialMode = "generate" }
useEffect(() => {
if (isCompleted) {
sounds.pairing();
const timer = setTimeout(() => {
handleClose();
}, 2000);