diff --git a/core/src/ops/files/query/directory_listing.rs b/core/src/ops/files/query/directory_listing.rs index ce8caa09b..5cc2f431c 100644 --- a/core/src/ops/files/query/directory_listing.rs +++ b/core/src/ops/files/query/directory_listing.rs @@ -12,7 +12,6 @@ use crate::{ video_media_data, }, infra::query::LibraryQuery, - ops::indexing::ephemeral::{cache::EphemeralIndexCache, EphemeralIndex}, }; use sea_orm::{ ColumnTrait, ConnectionTrait, DatabaseConnection, EntityTrait, JoinType, QueryFilter, @@ -21,7 +20,6 @@ use sea_orm::{ use serde::{Deserialize, Serialize}; use specta::Type; use std::{collections::HashMap, sync::Arc}; -use tokio::sync::RwLock as TokioRwLock; use tracing; use uuid::Uuid; @@ -119,15 +117,11 @@ impl LibraryQuery for DirectoryListingQuery { context: Arc, session: crate::infra::api::SessionContext, ) -> QueryResult { - tracing::info!( - "DirectoryListingQuery::execute called with path: {:?}", - self.input.path - ); + tracing::debug!("DirectoryListingQuery path={:?}", self.input.path); let library_id = session .current_library_id .ok_or_else(|| QueryError::Internal("No library in session".to_string()))?; - tracing::info!("Library ID: {}", library_id); let library = context .libraries() @@ -141,7 +135,7 @@ impl LibraryQuery for DirectoryListingQuery { // Check if this path's location has IndexMode::None if let Some(should_use_ephemeral) = self.check_location_index_mode(db.conn()).await { if should_use_ephemeral { - tracing::info!("Location has IndexMode::None, using ephemeral indexing"); + tracing::debug!("IndexMode::None, using ephemeral indexing"); return self .query_ephemeral_directory_impl(context, library_id) .await; @@ -153,11 +147,13 @@ impl LibraryQuery for DirectoryListingQuery { Ok(parent_entry) => { // Path is indexed - query from database let parent_id = parent_entry.id; + tracing::debug!("Indexed path, parent_id={}", parent_id); self.query_indexed_directory_impl(parent_id, db.conn()) .await } - Err(_) => { - // Path not indexed - trigger ephemeral indexing and return empty + Err(e) => { + // Path not indexed - trigger ephemeral indexing + tracing::debug!("Path not indexed, using ephemeral (err={:?})", e); self.query_ephemeral_directory_impl(context, library_id) .await } @@ -626,6 +622,7 @@ impl DirectoryListingQuery { context: Arc, library_id: Uuid, ) -> QueryResult { + use crate::domain::file::File; use crate::ops::indexing::{IndexScope, IndexerJob, IndexerJobConfig}; // Get the local path for cache lookup @@ -654,54 +651,103 @@ impl DirectoryListingQuery { ); // Try to get directory listing from cached index - // First, get the children list with a read lock let children = { let index_guard = index.read().await; index_guard.list_directory(&local_path) }; // Check if the index actually has entries for this directory - // BUT: if children is empty and this path wasn't explicitly indexed, - // it means a parent was indexed and this subdirectory needs its own indexing job if let Some(children) = children { + // A parent directory's shallow index may contain this path as an entry + // but have no children indexed under it. In that case, fall through + // to trigger a new indexer job for this specific directory. if children.is_empty() && !cache.is_indexed(&local_path) { - tracing::info!( - "Subdirectory has no indexed children, triggering indexing: {}", + tracing::debug!( + "Subdirectory has no indexed children, will trigger indexing: {}", + local_path.display() + ); + // Fall through to indexer dispatch below + } else { + tracing::debug!( + "Cached index has {} children for {}", + children.len(), local_path.display() ); - // Fall through to spawn indexer job - } else if let Some(output) = self.read_ephemeral_listing(&index, &local_path).await { - return Ok(output); - } - } - // Index exists but doesn't have this directory yet - // Fall through to spawn indexer job - tracing::debug!( - "Cached index doesn't contain directory: {}", - local_path.display() - ); + // Convert cached entries to File objects with lazy UUID assignment + let mut index_write = index.write().await; + let mut files = Vec::new(); + + for child_path in children { + if let Some(metadata) = index_write.get_entry_ref(&child_path) { + if !self.input.include_hidden.unwrap_or(false) && metadata.is_hidden { + continue; + } + + let entry_uuid = index_write.get_or_assign_uuid(&child_path); + + let entry_sd_path = SdPath::Physical { + device_slug: match &self.input.path { + SdPath::Physical { device_slug, .. } => device_slug.clone(), + _ => String::new(), + }, + path: child_path.clone(), + }; + + let content_kind = index_write.get_content_kind(&child_path); + + let mut file = + File::from_ephemeral(entry_uuid, &metadata, entry_sd_path); + file.content_kind = content_kind; + files.push(file); + } + } + drop(index_write); + + self.sort_files(&mut files); + + let total_count = files.len() as u32; + let has_more = if let Some(limit) = self.input.limit { + if files.len() > limit as usize { + files.truncate(limit as usize); + true + } else { + false + } + } else { + false + }; + + return Ok(DirectoryListingOutput { + files, + total_count, + has_more, + }); + } + } else { + // Index exists but doesn't have this directory yet + tracing::debug!( + "Cached index doesn't contain directory: {}", + local_path.display() + ); + } } // No cached index or index doesn't cover this path - // If another request is already indexing this path, wait for it - // instead of returning empty (avoids flicker on concurrent requests) + // Check if indexing is already in progress if cache.is_indexing(&local_path) { - tracing::debug!("Indexing already in progress, waiting: {}", local_path.display()); - self.wait_for_indexing(&cache, &local_path).await; - - if let Some(index) = cache.get_for_search(&local_path) { - if let Some(output) = self.read_ephemeral_listing(&index, &local_path).await { - return Ok(output); - } - } - // Fall through if cache still empty after wait + tracing::debug!( + "Ephemeral indexing already in progress for {}", + local_path.display() + ); + return Ok(DirectoryListingOutput { + files: Vec::new(), + total_count: 0, + has_more: false, + }); } - tracing::info!( - "Triggering ephemeral indexing for: {:?}", - self.input.path - ); + tracing::debug!("DirectoryListingQuery path={:?}", self.input.path); // Get library to dispatch indexer job if let Some(library) = context.get_library(library_id).await { @@ -730,20 +776,11 @@ impl DirectoryListingQuery { // Share the cached index with the job indexer_job.set_ephemeral_index(ephemeral_index); - // Dispatch job and wait for completion so we can return results directly. - // Ephemeral indexing is very fast (typically <100ms for a single directory), - // so waiting is better than relying on event-based UI updates which have - // race conditions with subscription setup timing. + // Dispatch job asynchronously + // The job will emit ResourceChanged events as files are discovered match library.jobs().dispatch(indexer_job).await { - Ok(_handle) => { - tracing::info!("Dispatched ephemeral indexer for {:?}, waiting", self.input.path); - self.wait_for_indexing(&cache, &local_path).await; - - if let Some(index) = cache.get_for_search(&local_path) { - if let Some(output) = self.read_ephemeral_listing(&index, &local_path).await { - return Ok(output); - } - } + Ok(_) => { + tracing::info!("Dispatched ephemeral indexer for {:?}", self.input.path); } Err(e) => { tracing::warn!( @@ -756,7 +793,8 @@ impl DirectoryListingQuery { } } - // Fallback: return empty if job dispatch failed or cache is empty + // Return empty result immediately + // UI will receive ResourceChanged events and populate incrementally Ok(DirectoryListingOutput { files: Vec::new(), total_count: 0, @@ -764,87 +802,6 @@ impl DirectoryListingQuery { }) } - /// Read children from the ephemeral index and convert to a DirectoryListingOutput. - /// - /// Shared by all ephemeral code paths (cache hit, wait-for-indexing, post-dispatch). - async fn read_ephemeral_listing( - &self, - index: &Arc>, - local_path: &std::path::Path, - ) -> Option { - - let children = { - let guard = index.read().await; - guard.list_directory(local_path) - }; - let children = children?; - - let mut index_write = index.write().await; - let mut files = Vec::new(); - - for child_path in children { - if let Some(metadata) = index_write.get_entry_ref(&child_path) { - if !self.input.include_hidden.unwrap_or(false) && metadata.is_hidden { - continue; - } - let entry_uuid = index_write.get_or_assign_uuid(&child_path); - let entry_sd_path = SdPath::Physical { - device_slug: match &self.input.path { - SdPath::Physical { device_slug, .. } => device_slug.clone(), - _ => String::new(), - }, - path: child_path.clone(), - }; - let content_kind = index_write.get_content_kind(&child_path); - let mut file = File::from_ephemeral(entry_uuid, &metadata, entry_sd_path); - file.content_kind = content_kind; - files.push(file); - } - } - drop(index_write); - - self.sort_files(&mut files); - let total_count = files.len() as u32; - let has_more = if let Some(limit) = self.input.limit { - if files.len() > limit as usize { - files.truncate(limit as usize); - true - } else { - false - } - } else { - false - }; - - Some(DirectoryListingOutput { - files, - total_count, - has_more, - }) - } - - /// Poll until ephemeral indexing finishes (or 10s timeout). - /// - /// Ephemeral indexing typically completes in <500ms, so this avoids - /// returning empty results and relying on event-based UI updates. - async fn wait_for_indexing( - &self, - cache: &EphemeralIndexCache, - local_path: &std::path::Path, - ) { - let deadline = tokio::time::Instant::now() + std::time::Duration::from_secs(10); - loop { - if !cache.is_indexing(local_path) { - break; - } - if tokio::time::Instant::now() >= deadline { - tracing::warn!("Ephemeral indexing timed out for {}", local_path.display()); - break; - } - tokio::time::sleep(std::time::Duration::from_millis(25)).await; - } - } - /// Sort files according to the input options fn sort_files(&self, files: &mut Vec) { use crate::domain::file::EntryKind; diff --git a/core/tests/ephemeral_bridge_test.rs b/core/tests/ephemeral_bridge_test.rs new file mode 100644 index 000000000..d6a67fbc7 --- /dev/null +++ b/core/tests/ephemeral_bridge_test.rs @@ -0,0 +1,133 @@ +//! Ephemeral Directory Event Streaming Bridge Test +//! +//! Tests the core ephemeral browsing flow end-to-end: +//! 1. TS client subscribes to events for a directory path scope +//! 2. TS client queries the directory listing (backend returns empty, dispatches indexer) +//! 3. Indexer emits ResourceChangedBatch events +//! 4. Events stream through EventBuffer -> RPC -> TCP -> TS subscription +//! 5. TS client receives events and verifies files arrive +//! +//! This test exists to catch regressions in the event delivery pipeline +//! for ephemeral (non-indexed) directory browsing. + +mod helpers; + +use helpers::*; +use sd_core::device::get_current_device_slug; +use serde::{Deserialize, Serialize}; +use std::path::PathBuf; + +/// Connection info passed from Rust test harness to TypeScript tests +#[derive(Debug, Serialize, Deserialize)] +struct EphemeralBridgeConfig { + /// TCP socket address for daemon connection + socket_addr: String, + /// Library UUID + library_id: String, + /// Device slug used by this daemon (must match path_scope in subscriptions) + device_slug: String, + /// Physical path to the ephemeral directory (not a managed location) + ephemeral_dir_path: PathBuf, + /// Test data directory + test_data_path: PathBuf, +} + +#[tokio::test] +async fn test_ephemeral_directory_event_streaming() -> anyhow::Result<()> { + let harness = IndexingHarnessBuilder::new("ephemeral_event_streaming") + .enable_daemon() + .build() + .await?; + + // Create an ephemeral directory with files (NOT a managed location) + let test_root = harness.temp_path(); + let ephemeral_dir = test_root.join("ephemeral_browse"); + tokio::fs::create_dir_all(&ephemeral_dir).await?; + + // Create files the indexer will discover + tokio::fs::write(ephemeral_dir.join("document.txt"), "Hello world").await?; + tokio::fs::write(ephemeral_dir.join("photo.jpg"), "fake jpeg data").await?; + tokio::fs::write(ephemeral_dir.join("notes.md"), "# Notes").await?; + tokio::fs::write(ephemeral_dir.join("script.rs"), "fn main() {}").await?; + tokio::fs::write(ephemeral_dir.join("data.json"), r#"{"key": "value"}"#).await?; + + // Create a subdirectory too + tokio::fs::create_dir_all(ephemeral_dir.join("subfolder")).await?; + tokio::fs::write(ephemeral_dir.join("subfolder/nested.txt"), "nested").await?; + + let socket_addr = harness + .daemon_socket_addr() + .expect("Daemon should be enabled") + .to_string(); + + let device_slug = get_current_device_slug(); + eprintln!("[Rust] Device slug: {}", device_slug); + + let bridge_config = EphemeralBridgeConfig { + socket_addr: socket_addr.clone(), + library_id: harness.library.id().to_string(), + device_slug: device_slug.clone(), + ephemeral_dir_path: ephemeral_dir.clone(), + test_data_path: harness.temp_path().to_path_buf(), + }; + + let config_path = harness.temp_path().join("ephemeral_bridge_config.json"); + let config_json = serde_json::to_string_pretty(&bridge_config)?; + tokio::fs::write(&config_path, config_json).await?; + + tracing::info!("Bridge config written to: {}", config_path.display()); + tracing::info!("Socket address: {}", socket_addr); + tracing::info!("Library ID: {}", bridge_config.library_id); + tracing::info!("Ephemeral dir: {}", ephemeral_dir.display()); + + let ts_test_file = "packages/ts-client/tests/integration/ephemeral-streaming.test.ts"; + let workspace_root = std::env::current_dir()?.parent().unwrap().to_path_buf(); + let ts_test_path = workspace_root.join(ts_test_file); + let bun_config = workspace_root.join("packages/ts-client/tests/integration/bunfig.toml"); + + eprintln!("\n=== Ephemeral Event Streaming Bridge Test ==="); + eprintln!("Workspace root: {}", workspace_root.display()); + eprintln!("Test file: {}", ts_test_path.display()); + eprintln!("Config path: {}", config_path.display()); + eprintln!("Socket address: {}", socket_addr); + eprintln!("Library ID: {}", bridge_config.library_id); + eprintln!("Ephemeral dir: {}", ephemeral_dir.display()); + eprintln!("=============================================\n"); + + if !ts_test_path.exists() { + anyhow::bail!("TypeScript test file not found: {}", ts_test_path.display()); + } + + let output = tokio::process::Command::new("bun") + .arg("test") + .arg("--config") + .arg(&bun_config) + .arg(&ts_test_path) + .env("BRIDGE_CONFIG_PATH", config_path.to_str().unwrap()) + .env("RUST_LOG", "debug") + .current_dir(&workspace_root) + .output() + .await?; + + let stdout = String::from_utf8_lossy(&output.stdout); + let stderr = String::from_utf8_lossy(&output.stderr); + + if !stdout.is_empty() { + eprintln!("\n=== TypeScript stdout ===\n{}\n", stdout); + } + if !stderr.is_empty() { + eprintln!("\n=== TypeScript stderr ===\n{}\n", stderr); + } + + if !output.status.success() { + anyhow::bail!( + "TypeScript test failed with exit code: {:?}", + output.status.code() + ); + } + + tracing::info!("Ephemeral event streaming test passed!"); + + harness.shutdown().await?; + Ok(()) +} diff --git a/core/tests/helpers/sync_harness.rs b/core/tests/helpers/sync_harness.rs index dd04e8ec1..f27358c52 100644 --- a/core/tests/helpers/sync_harness.rs +++ b/core/tests/helpers/sync_harness.rs @@ -72,6 +72,7 @@ impl TestConfigBuilder { statistics_listener_enabled: false, }, proxy_pairing: sd_core::config::app_config::ProxyPairingConfig::default(), + spacebot: sd_core::config::app_config::SpacebotConfig::default(), }; config.save()?; diff --git a/packages/ts-client/src/hooks/useNormalizedQuery.ts b/packages/ts-client/src/hooks/useNormalizedQuery.ts index a29a2df38..d9129c6cc 100644 --- a/packages/ts-client/src/hooks/useNormalizedQuery.ts +++ b/packages/ts-client/src/hooks/useNormalizedQuery.ts @@ -506,7 +506,9 @@ export function updateSingleResource( } queryClient.setQueryData(queryKey, (oldData: any) => { - if (!oldData) return oldData; + if (!oldData) { + return { files: resourcesToUpdate, total_count: resourcesToUpdate.length, has_more: false } as O; + } // Handle array responses if (Array.isArray(oldData)) { @@ -570,7 +572,13 @@ export function updateBatchResources( if (options.debug) { console.log(`[useNormalizedQuery] ${wireMethod} setQueryData: oldData has`, Array.isArray(oldData) ? oldData.length : Object.keys(oldData || {}).join(','), `adding ${filteredResources.length} resources`); } - if (!oldData) return oldData; + // If the query hasn't returned yet, seed the cache with the event data. + // This handles the race where the subscription's buffer replay delivers + // events before the initial query response arrives. Without this, the + // events would be silently dropped and the UI stays empty. + if (!oldData) { + return { files: filteredResources, total_count: filteredResources.length, has_more: false } as O; + } // Handle array responses if (Array.isArray(oldData)) { diff --git a/packages/ts-client/src/subscriptionManager.ts b/packages/ts-client/src/subscriptionManager.ts index 5a0ec44b7..14a7f0c1e 100644 --- a/packages/ts-client/src/subscriptionManager.ts +++ b/packages/ts-client/src/subscriptionManager.ts @@ -86,14 +86,15 @@ export class SubscriptionManager { return this.createCleanup(key, callback); } - // Create new subscription + // Create new subscription. + // The initial callback is added to listeners BEFORE transport.subscribe() + // so that buffered events replayed during subscription setup are not lost. console.log(`[SubscriptionManager] Creating new subscription for key: ${key}`); - const subscriptionPromise = this.createSubscription(key, filter); + const subscriptionPromise = this.createSubscription(key, filter, callback); this.pendingSubscriptions.set(key, subscriptionPromise); try { entry = await subscriptionPromise; - entry.listeners.add(callback); entry.refCount++; console.log(`[SubscriptionManager] New subscription created, refCount: ${entry.refCount}`); return this.createCleanup(key, callback); @@ -105,6 +106,7 @@ export class SubscriptionManager { private async createSubscription( key: string, filter: EventFilter, + initialCallback: (event: Event) => void, ): Promise { const eventTypes = filter.event_types ?? [ "ResourceChanged", @@ -113,6 +115,16 @@ export class SubscriptionManager { "Refresh", ]; + // Pre-create the entry with the initial listener so that events + // replayed by the daemon during transport.subscribe() are captured. + const entry: SubscriptionEntry = { + unsubscribe: () => {}, + listeners: new Set([initialCallback]), + refCount: 0, + }; + + this.subscriptions.set(key, entry); + const unsubscribe = await this.transport.subscribe( (event) => { // Broadcast event to all listeners @@ -132,13 +144,7 @@ export class SubscriptionManager { }, ); - const entry: SubscriptionEntry = { - unsubscribe, - listeners: new Set(), - refCount: 0, - }; - - this.subscriptions.set(key, entry); + entry.unsubscribe = unsubscribe; return entry; } diff --git a/packages/ts-client/tests/integration/ephemeral-streaming.test.ts b/packages/ts-client/tests/integration/ephemeral-streaming.test.ts new file mode 100644 index 000000000..a031a0658 --- /dev/null +++ b/packages/ts-client/tests/integration/ephemeral-streaming.test.ts @@ -0,0 +1,302 @@ +/** + * Ephemeral Directory Event Streaming Test + * + * Tests the core ephemeral browsing flow without React: + * 1. Subscribe to events for a directory path scope + * 2. Query the directory listing (backend returns empty, dispatches indexer) + * 3. Verify ResourceChangedBatch events arrive through the subscription + * + * This test reproduces the exact race condition that causes empty directories + * in the UI: the query triggers indexing, but events may not reach the + * subscription due to timing, buffer overflow, or filter mismatches. + */ + +import "./setup"; + +import { describe, test, expect, beforeAll, afterAll } from "bun:test"; +import { readFile } from "fs/promises"; +import { hostname } from "os"; +import { SpacedriveClient } from "../../src/client"; +import { TcpSocketTransport } from "../../src/transport"; + +interface BridgeConfig { + socket_addr: string; + library_id: string; + device_slug: string; + ephemeral_dir_path: string; + test_data_path: string; +} + +let bridgeConfig: BridgeConfig; +let client: SpacedriveClient; +let deviceSlug: string; + +beforeAll(async () => { + const configPath = process.env.BRIDGE_CONFIG_PATH; + if (!configPath) { + throw new Error("BRIDGE_CONFIG_PATH environment variable not set"); + } + + const configJson = await readFile(configPath, "utf-8"); + bridgeConfig = JSON.parse(configJson); + + console.log(`[TS] Bridge config:`, bridgeConfig); + + client = SpacedriveClient.fromTcpSocket(bridgeConfig.socket_addr); + client.setCurrentLibrary(bridgeConfig.library_id); + + // Use the device slug from the daemon, not hostname() which may differ + deviceSlug = bridgeConfig.device_slug; + console.log(`[TS] Device slug (from daemon): ${deviceSlug}`); + console.log(`[TS] Hostname for comparison: ${hostname().toLowerCase().replace(/\s+/g, "-")}`); +}); + +describe("Ephemeral Directory Event Streaming", () => { + /** + * Test 1: Raw transport-level event delivery + * + * Subscribe via raw TCP, then query directory listing. + * This bypasses React, SubscriptionManager, and useNormalizedQuery entirely + * to test the daemon -> TCP -> event pipeline in isolation. + */ + test("events arrive via TCP subscription after directory listing query", async () => { + const ephemeralPath = bridgeConfig.ephemeral_dir_path; + const pathScope = { + Physical: { + device_slug: deviceSlug, + path: ephemeralPath, + }, + }; + + const receivedEvents: any[] = []; + + // Step 1: Subscribe FIRST + console.log(`[TS] Subscribing to events for: ${ephemeralPath}`); + const transport = new TcpSocketTransport(bridgeConfig.socket_addr); + const unsubscribe = await transport.subscribe( + (event: any) => { + console.log(`[TS] EVENT received:`, JSON.stringify(event).slice(0, 200)); + receivedEvents.push(event); + }, + { + event_types: [ + "ResourceChanged", + "ResourceChangedBatch", + "ResourceDeleted", + "Refresh", + ], + filter: { + resource_type: "file", + path_scope: pathScope, + library_id: bridgeConfig.library_id, + include_descendants: false, + }, + }, + ); + + console.log(`[TS] Subscription active`); + + // Small delay to ensure subscription is fully registered on daemon + await new Promise((r) => setTimeout(r, 100)); + + // Step 2: Query the directory listing (triggers ephemeral indexing) + console.log(`[TS] Querying directory listing for: ${ephemeralPath}`); + const queryResult = await client.execute( + "query:files.directory_listing", + { + path: pathScope, + sort_by: "name", + limit: null, + include_hidden: false, + folders_first: true, + }, + ); + + console.log( + `[TS] Query returned:`, + JSON.stringify(queryResult).slice(0, 200), + ); + + // The query may return empty (indexer dispatched async) or may return + // cached results if a previous run populated the cache. + // Either way, events should arrive. + + // Step 3: Wait for events to arrive + // Ephemeral indexing is fast (<500ms), give generous timeout + const deadline = Date.now() + 10_000; + while (Date.now() < deadline) { + if (receivedEvents.length > 0) { + console.log( + `[TS] Got ${receivedEvents.length} event(s) after ${Date.now() - (deadline - 10_000)}ms`, + ); + break; + } + await new Promise((r) => setTimeout(r, 50)); + } + + // Step 4: Verify events arrived + console.log(`[TS] Total events received: ${receivedEvents.length}`); + for (const [i, event] of receivedEvents.entries()) { + console.log(`[TS] Event ${i + 1}:`, JSON.stringify(event).slice(0, 300)); + } + + // Clean up subscription + unsubscribe(); + + // If the query returned files directly (cache hit), events may not fire. + // Check both paths. + const queryFiles = (queryResult as any)?.files ?? []; + const eventFiles: string[] = []; + + for (const event of receivedEvents) { + if (event.ResourceChangedBatch) { + for (const resource of event.ResourceChangedBatch.resources) { + eventFiles.push(resource.name); + } + } else if (event.ResourceChanged) { + eventFiles.push(event.ResourceChanged.resource.name); + } + } + + const totalFiles = queryFiles.length + eventFiles.length; + console.log( + `[TS] Files from query: ${queryFiles.length}, from events: ${eventFiles.length}`, + ); + + if (queryFiles.length > 0) { + console.log( + `[TS] Query returned files (cache hit):`, + queryFiles.map((f: any) => f.name), + ); + } + if (eventFiles.length > 0) { + console.log(`[TS] Event files:`, eventFiles); + } + + // We expect files to arrive via EITHER the query response (cache hit) + // OR events (first-time indexing). At least one must work. + // The directory has 6 items: document.txt, photo.jpg, notes.md, script.rs, data.json, subfolder + expect(totalFiles).toBeGreaterThanOrEqual(5); + + // If the query returned empty, ALL files must come from events + if (queryFiles.length === 0) { + console.log(`[TS] Query returned empty — verifying events delivered all files`); + expect(eventFiles.length).toBeGreaterThanOrEqual(5); + } + + console.log(`[TS] Event streaming test passed`); + }, 30_000); + + /** + * Test 2: EventBuffer replay + * + * Query first (triggers indexing + events), THEN subscribe. + * The EventBuffer should replay recent events to the new subscription. + * This is the exact race condition the buffer was designed to solve. + */ + test("EventBuffer replays events to late subscriber", async () => { + // Use a subdirectory so it hasn't been indexed yet + const subPath = bridgeConfig.ephemeral_dir_path + "/subfolder"; + const pathScope = { + Physical: { + device_slug: deviceSlug, + path: subPath, + }, + }; + + // Step 1: Query FIRST (triggers indexing, events go to buffer) + console.log(`[TS] Querying BEFORE subscribing: ${subPath}`); + const queryResult = await client.execute( + "query:files.directory_listing", + { + path: pathScope, + sort_by: "name", + limit: null, + include_hidden: false, + folders_first: true, + }, + ); + + console.log( + `[TS] Query returned:`, + JSON.stringify(queryResult).slice(0, 200), + ); + + // Small delay to let events buffer (but not expire — 5s retention) + await new Promise((r) => setTimeout(r, 500)); + + // Step 2: Subscribe AFTER query (late subscriber) + const replayedEvents: any[] = []; + const transport = new TcpSocketTransport(bridgeConfig.socket_addr); + const unsubscribe = await transport.subscribe( + (event: any) => { + console.log( + `[TS] REPLAYED event:`, + JSON.stringify(event).slice(0, 200), + ); + replayedEvents.push(event); + }, + { + event_types: [ + "ResourceChanged", + "ResourceChangedBatch", + "ResourceDeleted", + "Refresh", + ], + filter: { + resource_type: "file", + path_scope: pathScope, + library_id: bridgeConfig.library_id, + include_descendants: false, + }, + }, + ); + + // Wait a bit for replayed events to arrive + await new Promise((r) => setTimeout(r, 1000)); + + console.log(`[TS] Replayed events: ${replayedEvents.length}`); + for (const [i, event] of replayedEvents.entries()) { + console.log( + `[TS] Replayed ${i + 1}:`, + JSON.stringify(event).slice(0, 300), + ); + } + + unsubscribe(); + + // The subfolder has 1 file (nested.txt). + // If the buffer replay works, we should see it. + // If it doesn't, this test will fail and tell us the buffer is broken. + const queryFiles = (queryResult as any)?.files ?? []; + const replayFiles: string[] = []; + for (const event of replayedEvents) { + if (event.ResourceChangedBatch) { + for (const resource of event.ResourceChangedBatch.resources) { + replayFiles.push(resource.name); + } + } else if (event.ResourceChanged) { + replayFiles.push(event.ResourceChanged.resource.name); + } + } + + const total = queryFiles.length + replayFiles.length; + console.log( + `[TS] Buffer replay: ${queryFiles.length} from query, ${replayFiles.length} from replay`, + ); + + // We expect the file to arrive via either query (cache hit from test 1) + // or buffer replay. Log clearly which path succeeded. + if (total === 0) { + console.error( + `[TS] FAILURE: No files from query or buffer replay for ${subPath}`, + ); + console.error( + `[TS] This means the EventBuffer is not replaying events to late subscribers`, + ); + } + + expect(total).toBeGreaterThanOrEqual(1); + console.log(`[TS] Buffer replay test passed`); + }, 30_000); +});