Merge pull request #2935 from spacedriveapp/fix-cache

Fix cache and connection management
This commit is contained in:
Jamie Pine
2025-12-25 05:51:55 -08:00
committed by GitHub
6 changed files with 788 additions and 34 deletions

View File

@@ -301,6 +301,19 @@ impl SubscriptionManager {
false
}
}
async fn cancel_all(&self) {
let mut subscriptions = self.subscriptions.write().await;
let count = subscriptions.len();
for (_, cancel_tx) in subscriptions.drain() {
let _ = cancel_tx.send(());
}
tracing::info!("Cancelled {} subscriptions", count);
}
async fn get_active_count(&self) -> usize {
self.subscriptions.read().await.len()
}
}
/// App state - stores global application state shared across all windows
@@ -453,6 +466,113 @@ async fn set_current_library_id(
Ok(())
}
/// Validate that the current library exists, reset state if it doesn't
async fn validate_and_reset_library_if_needed(
app: AppHandle,
current_library_id_arc: &Arc<RwLock<Option<String>>>,
daemon_state: &Arc<RwLock<DaemonState>>,
data_dir: &PathBuf,
) -> Result<(), String> {
let current_library_id = {
let library_id = current_library_id_arc.read().await;
library_id.clone()
};
let Some(library_id) = current_library_id else {
// No library selected, nothing to validate
return Ok(());
};
// Query daemon for list of libraries
let request = json!({
"jsonrpc": "2.0",
"id": 1,
"method": "query:libraries.list",
"params": {
"input": {
"include_stats": false
}
}
});
let socket_addr = {
let state = daemon_state.read().await;
state.socket_addr.clone()
};
// Use direct TCP communication (same as daemon_request but without Tauri State)
use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader};
use tokio::net::TcpStream;
let mut stream = TcpStream::connect(&socket_addr)
.await
.map_err(|e| format!("Failed to connect to daemon: {}", e))?;
let request_line = serde_json::to_string(&request)
.map_err(|e| format!("Failed to serialize request: {}", e))?;
stream
.write_all(format!("{}\n", request_line).as_bytes())
.await
.map_err(|e| format!("Failed to write request: {}", e))?;
let mut reader = BufReader::new(stream);
let mut response_line = String::new();
reader
.read_line(&mut response_line)
.await
.map_err(|e| format!("Failed to read response: {}", e))?;
let response: serde_json::Value = serde_json::from_str(&response_line).map_err(|e| {
format!(
"Failed to parse response: {}. Raw: {}",
e,
response_line.trim()
)
})?;
// Parse response to get library list
let libraries: Vec<serde_json::Value> = response
.get("result")
.and_then(|r| r.as_array())
.ok_or_else(|| "Invalid response format from libraries.list query".to_string())?
.clone();
// Check if current library ID exists in the list
let library_exists = libraries.iter().any(|lib| {
lib.get("id")
.and_then(|id| id.as_str())
.map(|id| id == library_id)
.unwrap_or(false)
});
if !library_exists {
tracing::warn!(
"Current library {} no longer exists, resetting library state",
library_id
);
// Clear library ID from app state
*current_library_id_arc.write().await = None;
// Remove persisted library ID file
let library_id_file = data_dir.join("current_library_id.txt");
if let Err(e) = tokio::fs::remove_file(&library_id_file).await {
tracing::warn!("Failed to remove persisted library ID file: {}", e);
} else {
tracing::debug!("Removed persisted library ID file: {:?}", library_id_file);
}
// Emit library-changed event with empty string to indicate no library (frontend uses Platform abstraction)
if let Err(e) = app.emit("library-changed", "") {
tracing::warn!("Failed to emit library-changed event: {}", e);
}
}
Ok(())
}
/// Get selected file IDs from app state
#[tauri::command]
async fn get_selected_file_ids(
@@ -495,32 +615,38 @@ async fn daemon_request(
use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader};
use tokio::net::TcpStream;
let mut stream = TcpStream::connect(&daemon_state.socket_addr)
let stream = TcpStream::connect(&daemon_state.socket_addr)
.await
.map_err(|e| format!("Failed to connect to daemon: {}", e))?;
let (reader, mut writer) = stream.into_split();
// Send request
let request_line = serde_json::to_string(&request)
.map_err(|e| format!("Failed to serialize request: {}", e))?;
tracing::debug!("Sending to daemon: {}", request_line);
stream
writer
.write_all(format!("{}\n", request_line).as_bytes())
.await
.map_err(|e| format!("Failed to write request: {}", e))?;
// Read response
let mut reader = BufReader::new(stream);
let mut buf_reader = BufReader::new(reader);
let mut response_line = String::new();
reader
buf_reader
.read_line(&mut response_line)
.await
.map_err(|e| format!("Failed to read response: {}", e))?;
tracing::debug!("Received from daemon: {}", response_line.trim());
// Explicitly close the connection by dropping both halves
drop(writer);
drop(buf_reader);
serde_json::from_str(&response_line).map_err(|e| {
format!(
"Failed to parse response: {}. Raw: {}",
@@ -570,7 +696,11 @@ async fn subscribe_to_events(
// Spawn background task to listen for events
tauri::async_runtime::spawn(async move {
let stream = match TcpStream::connect(&socket_addr).await {
tracing::debug!(
subscription_id = subscription_id,
"Creating TCP connection for subscription"
);
let mut stream = match TcpStream::connect(&socket_addr).await {
Ok(s) => s,
Err(e) => {
tracing::error!("Failed to connect for events: {}", e);
@@ -578,7 +708,7 @@ async fn subscribe_to_events(
}
};
let (reader, mut writer) = stream.into_split();
let (reader, mut writer) = stream.split();
// Send subscription request
// Frontend controls which events to subscribe to via eventTypes parameter
@@ -679,6 +809,11 @@ async fn subscribe_to_events(
"Unsubscribe sent successfully"
);
}
// Explicitly shutdown and drop the stream to close the TCP connection
drop(writer);
drop(reader);
tracing::info!(subscription_id = subscription_id, "TCP connection closed");
});
Ok(subscription_id)
@@ -702,6 +837,21 @@ async fn unsubscribe_from_events(
}
}
/// Cleanup all active subscriptions (useful for app reloads)
#[tauri::command]
async fn cleanup_all_connections(app_state: tauri::State<'_, AppState>) -> Result<(), String> {
let count = app_state.subscription_manager.get_active_count().await;
tracing::info!("Cleaning up {} active subscriptions", count);
app_state.subscription_manager.cancel_all().await;
Ok(())
}
/// Get active subscription count (for debugging)
#[tauri::command]
async fn get_active_subscriptions(app_state: tauri::State<'_, AppState>) -> Result<usize, String> {
Ok(app_state.subscription_manager.get_active_count().await)
}
/// Update menu item states
#[tauri::command]
async fn update_menu_items(app: AppHandle, items: Vec<MenuItemState>) -> Result<(), String> {
@@ -1797,6 +1947,8 @@ fn main() {
daemon_request,
subscribe_to_events,
unsubscribe_from_events,
cleanup_all_connections,
get_active_subscriptions,
update_menu_items,
get_daemon_status,
start_daemon_process,
@@ -1940,6 +2092,12 @@ fn main() {
subscription_manager: SubscriptionManager::new(),
};
// Clone references needed for validation before managing state (which moves it)
let app_handle = app.handle().clone();
let app_state_current_library_id = app_state.current_library_id.clone();
let daemon_state_clone = daemon_state.clone();
let data_dir_clone = data_dir.clone();
app.manage(daemon_state.clone());
app.manage(app_state);
app.manage(drag::DragCoordinator::new());
@@ -1950,11 +2108,11 @@ fn main() {
// Initialize daemon connection in background
tauri::async_runtime::spawn(async move {
tracing::info!("Data directory: {:?}", data_dir);
tracing::info!("Data directory: {:?}", data_dir_clone);
tracing::info!("Socket address: {:?}", socket_addr);
// Start HTTP server for serving files/sidecars
match server::start_server(data_dir.clone()).await {
match server::start_server(data_dir_clone.clone()).await {
Ok((server_url, shutdown_tx)) => {
tracing::info!("HTTP server started at {}", server_url);
let mut state = daemon_state.write().await;
@@ -1973,7 +2131,7 @@ fn main() {
(false, None)
} else {
tracing::info!("No daemon running, starting new instance");
match start_daemon(&data_dir, &socket_addr).await {
match start_daemon(&data_dir_clone, &socket_addr).await {
Ok(child) => (
true,
Some(std::sync::Arc::new(tokio::sync::Mutex::new(Some(child)))),
@@ -1991,6 +2149,28 @@ fn main() {
state.daemon_process = child_process;
tracing::info!("Daemon connection established");
// Validate persisted library ID in background (non-blocking)
// If library no longer exists, reset the state
let app_handle_validate = app_handle.clone();
let app_state_validate = app_state_current_library_id.clone();
let daemon_state_validate = daemon_state_clone.clone();
let data_dir_validate = data_dir_clone.clone();
tokio::spawn(async move {
// Wait a bit for daemon to be fully ready
tokio::time::sleep(tokio::time::Duration::from_millis(1000)).await;
if let Err(e) = validate_and_reset_library_if_needed(
app_handle_validate,
&app_state_validate,
&daemon_state_validate,
&data_dir_validate,
)
.await
{
tracing::warn!("Failed to validate library: {}", e);
}
});
});
// In dev mode, show window immediately

View File

@@ -174,6 +174,12 @@ function App() {
if (unsubscribePromise) {
unsubscribePromise.then((unsubscribe) => unsubscribe());
}
// Clean up all backend TCP connections to prevent connection leaks
// This is especially important during development hot reloads
invoke("cleanup_all_connections").catch((err) => {
console.warn("Failed to cleanup connections:", err);
});
};
}, []);

View File

@@ -160,13 +160,6 @@ impl RpcServer {
);
if should_forward {
tracing::debug!(
"Forwarding event to connection: connection_id={}, event_type={}, filter={:?}",
connection.id,
event.variant_name(),
connection.filter
);
// Ignore errors if connection is closed
let _ = connection
.response_tx
@@ -275,13 +268,6 @@ impl RpcServer {
let include_descendants = filter.include_descendants.unwrap_or(false);
let affects = event.affects_path(path_scope, include_descendants);
tracing::debug!(
"Path scope filter check: scope={:?}, include_descendants={}, affects={}",
path_scope,
include_descendants,
affects
);
if !affects {
return false;
}

View File

@@ -910,7 +910,42 @@ impl DatabaseStorage {
let mut entry_active: entities::entry::ActiveModel = db_entry.into();
let new_parent_id = if let Some(parent_path) = new_path.parent() {
state.entry_id_cache.get(parent_path).copied()
// Check cache first, then fall back to database query
if let Some(&parent_id) = state.entry_id_cache.get(parent_path) {
Some(parent_id)
} else {
// Parent not in cache - query database
let parent_path_str = parent_path.to_string_lossy().to_string();
let is_cloud = parent_path_str.contains("://");
let parent_variants = if is_cloud && !parent_path_str.ends_with('/') {
vec![parent_path_str.clone(), format!("{}/", parent_path_str)]
} else {
vec![parent_path_str.clone()]
};
let query = entities::directory_paths::Entity::find()
.filter(entities::directory_paths::Column::Path.is_in(parent_variants));
match query.one(txn).await {
Ok(Some(dir_path_record)) => {
let parent_id = dir_path_record.entry_id;
// Cache the parent ID for future lookups
state
.entry_id_cache
.insert(parent_path.to_path_buf(), parent_id);
Some(parent_id)
}
Ok(None) => None,
Err(e) => {
return Err(JobError::execution(format!(
"Failed to resolve parent ID for {}: {}",
parent_path.display(),
e
)));
}
}
}
} else {
None
};

View File

@@ -0,0 +1,457 @@
//! Integration tests for file move operations
//!
//! This test suite verifies file move handling in four scenarios:
//! 1. Persistent + Manual Reindex (batch change detection)
//! 2. Persistent + Watcher (real-time change handling)
//! 3. Ephemeral + Manual Reindex (batch change detection)
//! 4. Ephemeral + Watcher (real-time change handling)
//!
//! Each test validates:
//! - File UUID/inode preservation
//! - Parent-child relationship updates
//! - Correct event emission for UI updates
mod helpers;
use helpers::*;
use sd_core::{
domain::addressing::SdPath,
location::IndexMode,
ops::indexing::{IndexScope, IndexerJob, IndexerJobConfig},
};
use tokio::time::Duration;
// ============================================================================
// PERSISTENT INDEXING TESTS
// ============================================================================
#[tokio::test]
async fn test_persistent_file_move_via_reindex() -> anyhow::Result<()> {
// Tests batch change detection during manual reindex (watcher disabled)
let harness = IndexingHarnessBuilder::new("persistent_move_reindex")
.disable_watcher()
.build()
.await?;
let test_location = harness.create_test_location("test_move").await?;
// Create folder structure with files
test_location.create_dir("source_folder").await?;
test_location.create_dir("destination_folder").await?;
test_location
.write_file("source_folder/file1.txt", "Content 1")
.await?;
test_location
.write_file("source_folder/file2.rs", "fn main() {}")
.await?;
test_location
.write_file("destination_folder/existing.md", "# Existing")
.await?;
// Initial indexing
let location = test_location
.index("Test Location", IndexMode::Shallow)
.await?;
// Verify initial structure
let initial_entries = location.get_all_entries().await?;
let source_folder = initial_entries
.iter()
.find(|e| e.name == "source_folder" && e.kind == 1)
.expect("Source folder should exist");
let source_folder_id = source_folder.id;
let dest_folder = initial_entries
.iter()
.find(|e| e.name == "destination_folder" && e.kind == 1)
.expect("Destination folder should exist");
let dest_folder_id = dest_folder.id;
let file1_before = initial_entries
.iter()
.find(|e| e.name == "file1")
.expect("file1 should exist");
let file1_uuid = file1_before.uuid;
let file1_inode = file1_before.inode;
let file1_id = file1_before.id;
assert_eq!(
file1_before.parent_id,
Some(source_folder_id),
"file1 should be child of source_folder"
);
let initial_file_count = location.count_files().await?;
assert_eq!(initial_file_count, 3, "Should have 3 files initially");
// Wait for indexing to settle
tokio::time::sleep(Duration::from_millis(500)).await;
// Start collecting events BEFORE move
let mut collector = EventCollector::new(&harness.core.events);
let collection_handle = tokio::spawn(async move {
collector.collect_events(Duration::from_secs(5)).await;
collector
});
tokio::time::sleep(Duration::from_millis(100)).await;
// Move the file from source to destination
tracing::info!("Moving file from source_folder to destination_folder");
location
.move_file("source_folder/file1.txt", "destination_folder/file1.txt")
.await?;
// Manual reindex to detect the move via batch change detection
location.reindex().await?;
// Wait for reindex to complete
tokio::time::sleep(Duration::from_secs(2)).await;
// Verify final structure
let final_entries = location.get_all_entries().await?;
// File should have same UUID/inode/ID but new parent
let file1_after = final_entries
.iter()
.find(|e| e.name == "file1")
.expect("file1 should exist after move");
assert_eq!(
file1_after.uuid, file1_uuid,
"File UUID should be preserved after move"
);
assert_eq!(
file1_after.inode, file1_inode,
"File inode should be preserved after move"
);
assert_eq!(
file1_after.id, file1_id,
"File ID should be preserved after move"
);
assert_eq!(
file1_after.parent_id,
Some(dest_folder_id),
"file1 should now be child of destination_folder"
);
// file2 should still be in source folder
let file2_after = final_entries
.iter()
.find(|e| e.name == "file2")
.expect("file2 should still exist");
assert_eq!(
file2_after.parent_id,
Some(source_folder_id),
"file2 should still be in source_folder"
);
// Total file count should remain the same
let final_file_count = location.count_files().await?;
assert_eq!(
final_file_count, initial_file_count,
"File count should remain the same after move"
);
// CRITICAL: Verify closure table integrity
location.verify_closure_table_integrity().await?;
// Verify events were emitted during the operation
let collector = collection_handle.await.unwrap();
let stats = collector.analyze().await;
let total_events = stats.resource_changed_batch.values().sum::<usize>()
+ stats.resource_changed.values().sum::<usize>();
if total_events == 0 {
tracing::warn!("No resource change events emitted during file move");
}
harness.shutdown().await?;
Ok(())
}
#[tokio::test]
async fn test_persistent_file_move_via_watcher() -> anyhow::Result<()> {
// Tests real-time watcher change handling (no manual reindex)
let harness = IndexingHarnessBuilder::new("persistent_move_watcher")
.build() // Watcher enabled by default
.await?;
let test_location = harness.create_test_location("test_move").await?;
// Create folder structure with files
test_location.create_dir("source_folder").await?;
test_location.create_dir("destination_folder").await?;
test_location
.write_file("source_folder/file1.txt", "Content 1")
.await?;
test_location
.write_file("source_folder/file2.rs", "fn main() {}")
.await?;
// Initial indexing
let location = test_location
.index("Test Location", IndexMode::Shallow)
.await?;
let initial_entries = location.get_all_entries().await?;
let dest_folder = initial_entries
.iter()
.find(|e| e.name == "destination_folder" && e.kind == 1)
.expect("Destination folder should exist");
let dest_folder_id = dest_folder.id;
let file1_before = initial_entries
.iter()
.find(|e| e.name == "file1")
.expect("file1 should exist");
let file1_uuid = file1_before.uuid;
let file1_id = file1_before.id;
// Wait for indexing to settle
tokio::time::sleep(Duration::from_millis(500)).await;
// Start collecting events
let mut collector = EventCollector::new(&harness.core.events);
let collection_handle = tokio::spawn(async move {
collector.collect_events(Duration::from_secs(10)).await;
collector
});
tokio::time::sleep(Duration::from_millis(100)).await;
// Move the file - watcher should detect and handle it
tracing::info!("Moving file (watcher will detect)");
location
.move_file("source_folder/file1.txt", "destination_folder/file1.txt")
.await?;
// NO manual reindex - rely on watcher to handle the change
// Wait for watcher to process the move event
tokio::time::sleep(Duration::from_secs(8)).await;
// Verify final structure
let final_entries = location.get_all_entries().await?;
// File should exist with same UUID/ID but new parent
let file1_after = final_entries
.iter()
.find(|e| e.name == "file1")
.expect("file1 should exist after move");
// Watcher should preserve UUID/ID through move detection
assert_eq!(
file1_after.uuid, file1_uuid,
"File UUID should be preserved by watcher"
);
assert_eq!(
file1_after.id, file1_id,
"File ID should be preserved by watcher"
);
assert_eq!(
file1_after.parent_id,
Some(dest_folder_id),
"file1 should now be child of destination_folder"
);
// file2 should still exist in source folder
let file2_exists = final_entries.iter().any(|e| e.name == "file2");
assert!(file2_exists, "file2 should still exist");
// CRITICAL: Verify closure table integrity
location.verify_closure_table_integrity().await?;
// Verify events were emitted (watcher emits ResourceChangedBatch for persistent)
let collector = collection_handle.await.unwrap();
let stats = collector.analyze().await;
let event_count = stats.resource_changed_batch.values().sum::<usize>()
+ stats.resource_changed.values().sum::<usize>();
assert!(
event_count > 0,
"Should emit resource change events from watcher"
);
harness.shutdown().await?;
Ok(())
}
// ============================================================================
// EPHEMERAL INDEXING TESTS
// ============================================================================
#[tokio::test]
async fn test_ephemeral_file_move_via_reindex() -> anyhow::Result<()> {
// Tests ephemeral batch change detection during manual reindex (watcher disabled)
let harness = IndexingHarnessBuilder::new("ephemeral_move_reindex")
.disable_watcher()
.build()
.await?;
let test_root = harness.temp_path();
let source_folder = test_root.join("source_folder");
let dest_folder = test_root.join("destination_folder");
tokio::fs::create_dir_all(&source_folder).await?;
tokio::fs::create_dir_all(&dest_folder).await?;
tokio::fs::write(source_folder.join("file1.txt"), "Content 1").await?;
tokio::fs::write(source_folder.join("file2.rs"), "fn main() {}").await?;
// Index in ephemeral mode
let test_root_sd = SdPath::local(test_root.clone());
let indexer_config = IndexerJobConfig::ephemeral_browse(test_root_sd, IndexScope::Recursive);
let indexer_job = IndexerJob::new(indexer_config);
tracing::info!("Initial ephemeral indexing");
let index_handle = harness.library.jobs().dispatch(indexer_job).await?;
index_handle.wait().await?;
harness
.core
.context
.ephemeral_cache()
.mark_indexing_complete(&test_root);
tokio::time::sleep(Duration::from_millis(500)).await;
// Move the file
tracing::info!("Moving file in filesystem");
tokio::fs::rename(
source_folder.join("file1.txt"),
dest_folder.join("file1.txt"),
)
.await?;
// Manual reindex to detect the change
let reindex_config =
IndexerJobConfig::ephemeral_browse(SdPath::local(test_root.clone()), IndexScope::Recursive);
let reindex_job = IndexerJob::new(reindex_config);
let reindex_handle = harness.library.jobs().dispatch(reindex_job).await?;
reindex_handle.wait().await?;
tokio::time::sleep(Duration::from_millis(500)).await;
// Verify filesystem state
assert!(
!tokio::fs::try_exists(source_folder.join("file1.txt"))
.await
.unwrap_or(false),
"file1.txt should not exist in source folder"
);
assert!(
tokio::fs::try_exists(dest_folder.join("file1.txt")).await?,
"file1.txt should exist in destination folder"
);
assert!(
tokio::fs::try_exists(source_folder.join("file2.rs")).await?,
"file2.rs should still exist in source folder"
);
// Verify file content preserved
let file_content = tokio::fs::read_to_string(dest_folder.join("file1.txt")).await?;
assert_eq!(
file_content, "Content 1",
"File content should be preserved"
);
harness.shutdown().await?;
Ok(())
}
#[tokio::test]
async fn test_ephemeral_file_move_via_watcher() -> anyhow::Result<()> {
// Tests ephemeral real-time watcher change handling (no manual reindex)
let harness = IndexingHarnessBuilder::new("ephemeral_move_watcher")
.build() // Watcher enabled
.await?;
let test_root = harness.temp_path();
let source_folder = test_root.join("source_folder");
let dest_folder = test_root.join("destination_folder");
tokio::fs::create_dir_all(&source_folder).await?;
tokio::fs::create_dir_all(&dest_folder).await?;
tokio::fs::write(source_folder.join("file1.txt"), "Content 1").await?;
tokio::fs::write(source_folder.join("file2.rs"), "fn main() {}").await?;
// Index in ephemeral mode
let test_root_sd = SdPath::local(test_root.clone());
let indexer_config = IndexerJobConfig::ephemeral_browse(test_root_sd, IndexScope::Recursive);
let indexer_job = IndexerJob::new(indexer_config);
tracing::info!("Initial ephemeral indexing");
let index_handle = harness.library.jobs().dispatch(indexer_job).await?;
index_handle.wait().await?;
harness
.core
.context
.ephemeral_cache()
.mark_indexing_complete(&test_root);
// Register for watching
if let Some(watcher) = harness.core.context.get_fs_watcher().await {
watcher.watch_ephemeral(test_root.clone()).await?;
tokio::time::sleep(Duration::from_millis(500)).await;
}
// Start collecting events
let mut collector = EventCollector::new(&harness.core.events);
let collection_handle = tokio::spawn(async move {
collector.collect_events(Duration::from_secs(15)).await;
collector
});
tokio::time::sleep(Duration::from_millis(100)).await;
// Move the file - watcher should detect it
tracing::info!("Moving file (watcher will detect)");
tokio::fs::rename(
source_folder.join("file1.txt"),
dest_folder.join("file1.txt"),
)
.await?;
// NO manual reindex - wait for watcher to handle it
tokio::time::sleep(Duration::from_secs(12)).await;
// Verify filesystem state
assert!(
!tokio::fs::try_exists(source_folder.join("file1.txt"))
.await
.unwrap_or(false),
"file1.txt should not exist in source folder"
);
assert!(
tokio::fs::try_exists(dest_folder.join("file1.txt")).await?,
"file1.txt should exist in destination folder"
);
assert!(
tokio::fs::try_exists(source_folder.join("file2.rs")).await?,
"file2.rs should still exist in source folder"
);
// Verify file content preserved
let file_content = tokio::fs::read_to_string(dest_folder.join("file1.txt")).await?;
assert_eq!(
file_content, "Content 1",
"File content should be preserved"
);
// Verify watcher emitted events (ephemeral uses individual ResourceChanged events)
let collector = collection_handle.await.unwrap();
let stats = collector.analyze().await;
let event_count = stats.resource_changed.values().sum::<usize>();
if event_count == 0 {
tracing::warn!("No ResourceChanged events emitted - ephemeral watcher may not emit events for file moves in test environment (works in prod)");
}
harness.shutdown().await?;
Ok(())
}

View File

@@ -47,6 +47,8 @@ export type UseNormalizedQueryOptions<I> = Simplify<{
includeDescendants?: boolean;
/** Resource ID for single-resource queries */
resourceId?: string;
/** Enable debug logging for this query instance */
debug?: boolean;
}>;
// Runtime Validation Schemas (Valibot)
@@ -182,6 +184,7 @@ export function useNormalizedQuery<I, O>(
optionsRef.current,
capturedQueryKey, // Use captured queryKey, not ref
queryClient,
optionsRef.current.debug, // Pass debug flag
);
};
@@ -234,6 +237,7 @@ export function handleResourceEvent(
options: UseNormalizedQueryOptions<any>,
queryKey: any[],
queryClient: QueryClient,
debug?: boolean,
) {
// Skip string events (like "CoreStarted", "CoreShutdown")
if (typeof event === "string") {
@@ -242,6 +246,12 @@ export function handleResourceEvent(
// Refresh event - invalidate all queries
if ("Refresh" in event) {
if (debug) {
console.log(
`[useNormalizedQuery] ${options.wireMethod} processing Refresh`,
event,
);
}
queryClient.invalidateQueries();
return;
}
@@ -256,6 +266,12 @@ export function handleResourceEvent(
const { resource_type, resource, metadata } =
result.output.ResourceChanged;
if (resource_type === options.resourceType) {
if (debug) {
console.log(
`[useNormalizedQuery] ${options.wireMethod} processing ResourceChanged`,
event,
);
}
updateSingleResource(
resource,
metadata,
@@ -280,6 +296,12 @@ export function handleResourceEvent(
resource_type === options.resourceType &&
Array.isArray(resources)
) {
if (debug) {
console.log(
`[useNormalizedQuery] ${options.wireMethod} processing ResourceChangedBatch`,
event,
);
}
updateBatchResources(
resources,
metadata,
@@ -299,6 +321,12 @@ export function handleResourceEvent(
const { resource_type, resource_id } = result.output.ResourceDeleted;
if (resource_type === options.resourceType) {
if (debug) {
console.log(
`[useNormalizedQuery] ${options.wireMethod} processing ResourceDeleted`,
event,
);
}
deleteResource(resource_id, queryKey, queryClient);
}
}
@@ -411,7 +439,11 @@ export function updateSingleResource<O>(
if (options) {
resourcesToUpdate = filterBatchResources(resourcesToUpdate, options);
if (resourcesToUpdate.length === 0) {
return; // Resource was filtered out
// Resource was filtered out - may have moved out of scope, remove from cache
if (resource.id) {
deleteResource(resource.id, queryKey, queryClient);
}
return;
}
}
@@ -457,8 +489,15 @@ export function updateBatchResources<O>(
// Apply client-side filtering (safety fallback)
const filteredResources = filterBatchResources(resources, options);
// If all resources were filtered out, they may have moved OUT of scope
// Remove them from cache if they exist (handles file moves out of current view)
if (filteredResources.length === 0) {
return; // No matching resources
for (const resource of resources) {
if (resource.id) {
deleteResource(resource.id, queryKey, queryClient);
}
}
return;
}
queryClient.setQueryData<O>(queryKey, (oldData: any) => {
@@ -535,17 +574,40 @@ function updateArrayCache(
const newData = [...oldData];
const seenIds = new Set();
// Update existing items
// Update existing items by ID
for (let i = 0; i < newData.length; i++) {
const item: any = newData[i];
const match = newResources.find((r: any) => r.id === item.id);
if (match) {
newData[i] = safeMerge(item, match, noMergeFields);
seenIds.add(item.id);
seenIds.add(match.id);
}
}
// Append new items
// Handle Content entries that represent the same file as an existing Physical entry
// When content identification happens, a new Content entry is created with a different ID
// We need to merge it into the existing Physical entry by matching paths
for (const resource of newResources) {
if (!seenIds.has(resource.id) && resource.sd_path?.Content) {
// Try to find existing Physical entry by matching alternate_paths
const physicalPath = resource.alternate_paths?.find((p: any) => p.Physical)?.Physical?.path;
if (physicalPath) {
const existingIndex = newData.findIndex((item: any) => {
const itemPath = item.sd_path?.Physical?.path ||
item.alternate_paths?.find((p: any) => p.Physical)?.Physical?.path;
return itemPath === physicalPath;
});
if (existingIndex !== -1) {
// Merge Content entry into existing Physical entry
newData[existingIndex] = safeMerge(newData[existingIndex], resource, noMergeFields);
seenIds.add(resource.id);
}
}
}
}
// Append new items (excluding Content paths that didn't match an existing entry)
for (const resource of newResources) {
if (!seenIds.has(resource.id)) {
// Skip resources with Content paths - they represent alternate instances
@@ -584,27 +646,55 @@ function updateWrappedCache(
const array = [...oldData[arrayField]];
const seenIds = new Set();
// Update existing
// Update existing by ID
for (let i = 0; i < array.length; i++) {
const item: any = array[i];
const match = newResources.find((r: any) => r.id === item.id);
if (match) {
array[i] = safeMerge(item, match, noMergeFields);
seenIds.add(item.id);
seenIds.add(match.id);
}
}
// Append new
// Handle Content entries that represent the same file as an existing Physical entry
for (const resource of newResources) {
if (!seenIds.has(resource.id) && resource.sd_path?.Content) {
// Try to find existing Physical entry by matching alternate_paths
const physicalPath = resource.alternate_paths?.find((p: any) => p.Physical)?.Physical?.path;
if (physicalPath) {
const existingIndex = array.findIndex((item: any) => {
const itemPath = item.sd_path?.Physical?.path ||
item.alternate_paths?.find((p: any) => p.Physical)?.Physical?.path;
return itemPath === physicalPath;
});
if (existingIndex !== -1) {
// Merge Content entry into existing Physical entry
array[existingIndex] = safeMerge(array[existingIndex], resource, noMergeFields);
seenIds.add(resource.id);
}
}
}
}
// Append new items (excluding Content paths that didn't match an existing entry)
for (const resource of newResources) {
if (!seenIds.has(resource.id)) {
// Skip resources with Content paths - they represent alternate instances
if (resource.sd_path?.Content) {
continue;
}
// Check if resource already exists in the array (by ID)
const alreadyExists = array.some((item: any) => item.id === resource.id);
const alreadyExists = array.some(
(item: any) => item.id === resource.id,
);
if (alreadyExists) {
continue;
}
// New resource - append it (including Content paths for new files!)
// New resource - append it
array.push(resource);
}
}