fix(ephemeral): restore streaming design, fix event delivery race conditions

Reverts the query/response approach from #3037 and fixes the actual bugs
that caused empty ephemeral directories:

- directory_listing.rs: Restore async indexer dispatch (return empty,
  populate via events). Subdirectories from a parent's shallow index now
  correctly fall through to trigger their own indexer job.

- subscriptionManager.ts: Pre-register initial listener before calling
  transport.subscribe() so buffer replay events aren't broadcast to an
  empty listener Set.

- useNormalizedQuery.ts: Seed TanStack Query cache when oldData is
  undefined, so events arriving before the query response aren't silently
  dropped by the setQueryData updater.

Adds bridge test (Rust harness + TS integration) that reproduces the
ephemeral event streaming flow end-to-end.
This commit is contained in:
James Pine
2026-03-24 18:38:46 -07:00
parent 73ec35656b
commit b7bbf29dbd
6 changed files with 555 additions and 148 deletions

View File

@@ -12,7 +12,6 @@ use crate::{
video_media_data,
},
infra::query::LibraryQuery,
ops::indexing::ephemeral::{cache::EphemeralIndexCache, EphemeralIndex},
};
use sea_orm::{
ColumnTrait, ConnectionTrait, DatabaseConnection, EntityTrait, JoinType, QueryFilter,
@@ -21,7 +20,6 @@ use sea_orm::{
use serde::{Deserialize, Serialize};
use specta::Type;
use std::{collections::HashMap, sync::Arc};
use tokio::sync::RwLock as TokioRwLock;
use tracing;
use uuid::Uuid;
@@ -119,15 +117,11 @@ impl LibraryQuery for DirectoryListingQuery {
context: Arc<CoreContext>,
session: crate::infra::api::SessionContext,
) -> QueryResult<Self::Output> {
tracing::info!(
"DirectoryListingQuery::execute called with path: {:?}",
self.input.path
);
tracing::debug!("DirectoryListingQuery path={:?}", self.input.path);
let library_id = session
.current_library_id
.ok_or_else(|| QueryError::Internal("No library in session".to_string()))?;
tracing::info!("Library ID: {}", library_id);
let library = context
.libraries()
@@ -141,7 +135,7 @@ impl LibraryQuery for DirectoryListingQuery {
// Check if this path's location has IndexMode::None
if let Some(should_use_ephemeral) = self.check_location_index_mode(db.conn()).await {
if should_use_ephemeral {
tracing::info!("Location has IndexMode::None, using ephemeral indexing");
tracing::debug!("IndexMode::None, using ephemeral indexing");
return self
.query_ephemeral_directory_impl(context, library_id)
.await;
@@ -153,11 +147,13 @@ impl LibraryQuery for DirectoryListingQuery {
Ok(parent_entry) => {
// Path is indexed - query from database
let parent_id = parent_entry.id;
tracing::debug!("Indexed path, parent_id={}", parent_id);
self.query_indexed_directory_impl(parent_id, db.conn())
.await
}
Err(_) => {
// Path not indexed - trigger ephemeral indexing and return empty
Err(e) => {
// Path not indexed - trigger ephemeral indexing
tracing::debug!("Path not indexed, using ephemeral (err={:?})", e);
self.query_ephemeral_directory_impl(context, library_id)
.await
}
@@ -626,6 +622,7 @@ impl DirectoryListingQuery {
context: Arc<CoreContext>,
library_id: Uuid,
) -> QueryResult<DirectoryListingOutput> {
use crate::domain::file::File;
use crate::ops::indexing::{IndexScope, IndexerJob, IndexerJobConfig};
// Get the local path for cache lookup
@@ -654,54 +651,103 @@ impl DirectoryListingQuery {
);
// Try to get directory listing from cached index
// First, get the children list with a read lock
let children = {
let index_guard = index.read().await;
index_guard.list_directory(&local_path)
};
// Check if the index actually has entries for this directory
// BUT: if children is empty and this path wasn't explicitly indexed,
// it means a parent was indexed and this subdirectory needs its own indexing job
if let Some(children) = children {
// A parent directory's shallow index may contain this path as an entry
// but have no children indexed under it. In that case, fall through
// to trigger a new indexer job for this specific directory.
if children.is_empty() && !cache.is_indexed(&local_path) {
tracing::info!(
"Subdirectory has no indexed children, triggering indexing: {}",
tracing::debug!(
"Subdirectory has no indexed children, will trigger indexing: {}",
local_path.display()
);
// Fall through to indexer dispatch below
} else {
tracing::debug!(
"Cached index has {} children for {}",
children.len(),
local_path.display()
);
// Fall through to spawn indexer job
} else if let Some(output) = self.read_ephemeral_listing(&index, &local_path).await {
return Ok(output);
}
}
// Index exists but doesn't have this directory yet
// Fall through to spawn indexer job
tracing::debug!(
"Cached index doesn't contain directory: {}",
local_path.display()
);
// Convert cached entries to File objects with lazy UUID assignment
let mut index_write = index.write().await;
let mut files = Vec::new();
for child_path in children {
if let Some(metadata) = index_write.get_entry_ref(&child_path) {
if !self.input.include_hidden.unwrap_or(false) && metadata.is_hidden {
continue;
}
let entry_uuid = index_write.get_or_assign_uuid(&child_path);
let entry_sd_path = SdPath::Physical {
device_slug: match &self.input.path {
SdPath::Physical { device_slug, .. } => device_slug.clone(),
_ => String::new(),
},
path: child_path.clone(),
};
let content_kind = index_write.get_content_kind(&child_path);
let mut file =
File::from_ephemeral(entry_uuid, &metadata, entry_sd_path);
file.content_kind = content_kind;
files.push(file);
}
}
drop(index_write);
self.sort_files(&mut files);
let total_count = files.len() as u32;
let has_more = if let Some(limit) = self.input.limit {
if files.len() > limit as usize {
files.truncate(limit as usize);
true
} else {
false
}
} else {
false
};
return Ok(DirectoryListingOutput {
files,
total_count,
has_more,
});
}
} else {
// Index exists but doesn't have this directory yet
tracing::debug!(
"Cached index doesn't contain directory: {}",
local_path.display()
);
}
}
// No cached index or index doesn't cover this path
// If another request is already indexing this path, wait for it
// instead of returning empty (avoids flicker on concurrent requests)
// Check if indexing is already in progress
if cache.is_indexing(&local_path) {
tracing::debug!("Indexing already in progress, waiting: {}", local_path.display());
self.wait_for_indexing(&cache, &local_path).await;
if let Some(index) = cache.get_for_search(&local_path) {
if let Some(output) = self.read_ephemeral_listing(&index, &local_path).await {
return Ok(output);
}
}
// Fall through if cache still empty after wait
tracing::debug!(
"Ephemeral indexing already in progress for {}",
local_path.display()
);
return Ok(DirectoryListingOutput {
files: Vec::new(),
total_count: 0,
has_more: false,
});
}
tracing::info!(
"Triggering ephemeral indexing for: {:?}",
self.input.path
);
tracing::debug!("DirectoryListingQuery path={:?}", self.input.path);
// Get library to dispatch indexer job
if let Some(library) = context.get_library(library_id).await {
@@ -730,20 +776,11 @@ impl DirectoryListingQuery {
// Share the cached index with the job
indexer_job.set_ephemeral_index(ephemeral_index);
// Dispatch job and wait for completion so we can return results directly.
// Ephemeral indexing is very fast (typically <100ms for a single directory),
// so waiting is better than relying on event-based UI updates which have
// race conditions with subscription setup timing.
// Dispatch job asynchronously
// The job will emit ResourceChanged events as files are discovered
match library.jobs().dispatch(indexer_job).await {
Ok(_handle) => {
tracing::info!("Dispatched ephemeral indexer for {:?}, waiting", self.input.path);
self.wait_for_indexing(&cache, &local_path).await;
if let Some(index) = cache.get_for_search(&local_path) {
if let Some(output) = self.read_ephemeral_listing(&index, &local_path).await {
return Ok(output);
}
}
Ok(_) => {
tracing::info!("Dispatched ephemeral indexer for {:?}", self.input.path);
}
Err(e) => {
tracing::warn!(
@@ -756,7 +793,8 @@ impl DirectoryListingQuery {
}
}
// Fallback: return empty if job dispatch failed or cache is empty
// Return empty result immediately
// UI will receive ResourceChanged events and populate incrementally
Ok(DirectoryListingOutput {
files: Vec::new(),
total_count: 0,
@@ -764,87 +802,6 @@ impl DirectoryListingQuery {
})
}
/// Read children from the ephemeral index and convert to a DirectoryListingOutput.
///
/// Shared by all ephemeral code paths (cache hit, wait-for-indexing, post-dispatch).
async fn read_ephemeral_listing(
&self,
index: &Arc<TokioRwLock<EphemeralIndex>>,
local_path: &std::path::Path,
) -> Option<DirectoryListingOutput> {
let children = {
let guard = index.read().await;
guard.list_directory(local_path)
};
let children = children?;
let mut index_write = index.write().await;
let mut files = Vec::new();
for child_path in children {
if let Some(metadata) = index_write.get_entry_ref(&child_path) {
if !self.input.include_hidden.unwrap_or(false) && metadata.is_hidden {
continue;
}
let entry_uuid = index_write.get_or_assign_uuid(&child_path);
let entry_sd_path = SdPath::Physical {
device_slug: match &self.input.path {
SdPath::Physical { device_slug, .. } => device_slug.clone(),
_ => String::new(),
},
path: child_path.clone(),
};
let content_kind = index_write.get_content_kind(&child_path);
let mut file = File::from_ephemeral(entry_uuid, &metadata, entry_sd_path);
file.content_kind = content_kind;
files.push(file);
}
}
drop(index_write);
self.sort_files(&mut files);
let total_count = files.len() as u32;
let has_more = if let Some(limit) = self.input.limit {
if files.len() > limit as usize {
files.truncate(limit as usize);
true
} else {
false
}
} else {
false
};
Some(DirectoryListingOutput {
files,
total_count,
has_more,
})
}
/// Poll until ephemeral indexing finishes (or 10s timeout).
///
/// Ephemeral indexing typically completes in <500ms, so this avoids
/// returning empty results and relying on event-based UI updates.
async fn wait_for_indexing(
&self,
cache: &EphemeralIndexCache,
local_path: &std::path::Path,
) {
let deadline = tokio::time::Instant::now() + std::time::Duration::from_secs(10);
loop {
if !cache.is_indexing(local_path) {
break;
}
if tokio::time::Instant::now() >= deadline {
tracing::warn!("Ephemeral indexing timed out for {}", local_path.display());
break;
}
tokio::time::sleep(std::time::Duration::from_millis(25)).await;
}
}
/// Sort files according to the input options
fn sort_files(&self, files: &mut Vec<File>) {
use crate::domain::file::EntryKind;

View File

@@ -0,0 +1,133 @@
//! Ephemeral Directory Event Streaming Bridge Test
//!
//! Tests the core ephemeral browsing flow end-to-end:
//! 1. TS client subscribes to events for a directory path scope
//! 2. TS client queries the directory listing (backend returns empty, dispatches indexer)
//! 3. Indexer emits ResourceChangedBatch events
//! 4. Events stream through EventBuffer -> RPC -> TCP -> TS subscription
//! 5. TS client receives events and verifies files arrive
//!
//! This test exists to catch regressions in the event delivery pipeline
//! for ephemeral (non-indexed) directory browsing.
mod helpers;
use helpers::*;
use sd_core::device::get_current_device_slug;
use serde::{Deserialize, Serialize};
use std::path::PathBuf;
/// Connection info passed from Rust test harness to TypeScript tests
#[derive(Debug, Serialize, Deserialize)]
struct EphemeralBridgeConfig {
/// TCP socket address for daemon connection
socket_addr: String,
/// Library UUID
library_id: String,
/// Device slug used by this daemon (must match path_scope in subscriptions)
device_slug: String,
/// Physical path to the ephemeral directory (not a managed location)
ephemeral_dir_path: PathBuf,
/// Test data directory
test_data_path: PathBuf,
}
#[tokio::test]
async fn test_ephemeral_directory_event_streaming() -> anyhow::Result<()> {
let harness = IndexingHarnessBuilder::new("ephemeral_event_streaming")
.enable_daemon()
.build()
.await?;
// Create an ephemeral directory with files (NOT a managed location)
let test_root = harness.temp_path();
let ephemeral_dir = test_root.join("ephemeral_browse");
tokio::fs::create_dir_all(&ephemeral_dir).await?;
// Create files the indexer will discover
tokio::fs::write(ephemeral_dir.join("document.txt"), "Hello world").await?;
tokio::fs::write(ephemeral_dir.join("photo.jpg"), "fake jpeg data").await?;
tokio::fs::write(ephemeral_dir.join("notes.md"), "# Notes").await?;
tokio::fs::write(ephemeral_dir.join("script.rs"), "fn main() {}").await?;
tokio::fs::write(ephemeral_dir.join("data.json"), r#"{"key": "value"}"#).await?;
// Create a subdirectory too
tokio::fs::create_dir_all(ephemeral_dir.join("subfolder")).await?;
tokio::fs::write(ephemeral_dir.join("subfolder/nested.txt"), "nested").await?;
let socket_addr = harness
.daemon_socket_addr()
.expect("Daemon should be enabled")
.to_string();
let device_slug = get_current_device_slug();
eprintln!("[Rust] Device slug: {}", device_slug);
let bridge_config = EphemeralBridgeConfig {
socket_addr: socket_addr.clone(),
library_id: harness.library.id().to_string(),
device_slug: device_slug.clone(),
ephemeral_dir_path: ephemeral_dir.clone(),
test_data_path: harness.temp_path().to_path_buf(),
};
let config_path = harness.temp_path().join("ephemeral_bridge_config.json");
let config_json = serde_json::to_string_pretty(&bridge_config)?;
tokio::fs::write(&config_path, config_json).await?;
tracing::info!("Bridge config written to: {}", config_path.display());
tracing::info!("Socket address: {}", socket_addr);
tracing::info!("Library ID: {}", bridge_config.library_id);
tracing::info!("Ephemeral dir: {}", ephemeral_dir.display());
let ts_test_file = "packages/ts-client/tests/integration/ephemeral-streaming.test.ts";
let workspace_root = std::env::current_dir()?.parent().unwrap().to_path_buf();
let ts_test_path = workspace_root.join(ts_test_file);
let bun_config = workspace_root.join("packages/ts-client/tests/integration/bunfig.toml");
eprintln!("\n=== Ephemeral Event Streaming Bridge Test ===");
eprintln!("Workspace root: {}", workspace_root.display());
eprintln!("Test file: {}", ts_test_path.display());
eprintln!("Config path: {}", config_path.display());
eprintln!("Socket address: {}", socket_addr);
eprintln!("Library ID: {}", bridge_config.library_id);
eprintln!("Ephemeral dir: {}", ephemeral_dir.display());
eprintln!("=============================================\n");
if !ts_test_path.exists() {
anyhow::bail!("TypeScript test file not found: {}", ts_test_path.display());
}
let output = tokio::process::Command::new("bun")
.arg("test")
.arg("--config")
.arg(&bun_config)
.arg(&ts_test_path)
.env("BRIDGE_CONFIG_PATH", config_path.to_str().unwrap())
.env("RUST_LOG", "debug")
.current_dir(&workspace_root)
.output()
.await?;
let stdout = String::from_utf8_lossy(&output.stdout);
let stderr = String::from_utf8_lossy(&output.stderr);
if !stdout.is_empty() {
eprintln!("\n=== TypeScript stdout ===\n{}\n", stdout);
}
if !stderr.is_empty() {
eprintln!("\n=== TypeScript stderr ===\n{}\n", stderr);
}
if !output.status.success() {
anyhow::bail!(
"TypeScript test failed with exit code: {:?}",
output.status.code()
);
}
tracing::info!("Ephemeral event streaming test passed!");
harness.shutdown().await?;
Ok(())
}

View File

@@ -72,6 +72,7 @@ impl TestConfigBuilder {
statistics_listener_enabled: false,
},
proxy_pairing: sd_core::config::app_config::ProxyPairingConfig::default(),
spacebot: sd_core::config::app_config::SpacebotConfig::default(),
};
config.save()?;

View File

@@ -506,7 +506,9 @@ export function updateSingleResource<O>(
}
queryClient.setQueryData<O>(queryKey, (oldData: any) => {
if (!oldData) return oldData;
if (!oldData) {
return { files: resourcesToUpdate, total_count: resourcesToUpdate.length, has_more: false } as O;
}
// Handle array responses
if (Array.isArray(oldData)) {
@@ -570,7 +572,13 @@ export function updateBatchResources<O>(
if (options.debug) {
console.log(`[useNormalizedQuery] ${wireMethod} setQueryData: oldData has`, Array.isArray(oldData) ? oldData.length : Object.keys(oldData || {}).join(','), `adding ${filteredResources.length} resources`);
}
if (!oldData) return oldData;
// If the query hasn't returned yet, seed the cache with the event data.
// This handles the race where the subscription's buffer replay delivers
// events before the initial query response arrives. Without this, the
// events would be silently dropped and the UI stays empty.
if (!oldData) {
return { files: filteredResources, total_count: filteredResources.length, has_more: false } as O;
}
// Handle array responses
if (Array.isArray(oldData)) {

View File

@@ -86,14 +86,15 @@ export class SubscriptionManager {
return this.createCleanup(key, callback);
}
// Create new subscription
// Create new subscription.
// The initial callback is added to listeners BEFORE transport.subscribe()
// so that buffered events replayed during subscription setup are not lost.
console.log(`[SubscriptionManager] Creating new subscription for key: ${key}`);
const subscriptionPromise = this.createSubscription(key, filter);
const subscriptionPromise = this.createSubscription(key, filter, callback);
this.pendingSubscriptions.set(key, subscriptionPromise);
try {
entry = await subscriptionPromise;
entry.listeners.add(callback);
entry.refCount++;
console.log(`[SubscriptionManager] New subscription created, refCount: ${entry.refCount}`);
return this.createCleanup(key, callback);
@@ -105,6 +106,7 @@ export class SubscriptionManager {
private async createSubscription(
key: string,
filter: EventFilter,
initialCallback: (event: Event) => void,
): Promise<SubscriptionEntry> {
const eventTypes = filter.event_types ?? [
"ResourceChanged",
@@ -113,6 +115,16 @@ export class SubscriptionManager {
"Refresh",
];
// Pre-create the entry with the initial listener so that events
// replayed by the daemon during transport.subscribe() are captured.
const entry: SubscriptionEntry = {
unsubscribe: () => {},
listeners: new Set([initialCallback]),
refCount: 0,
};
this.subscriptions.set(key, entry);
const unsubscribe = await this.transport.subscribe(
(event) => {
// Broadcast event to all listeners
@@ -132,13 +144,7 @@ export class SubscriptionManager {
},
);
const entry: SubscriptionEntry = {
unsubscribe,
listeners: new Set(),
refCount: 0,
};
this.subscriptions.set(key, entry);
entry.unsubscribe = unsubscribe;
return entry;
}

View File

@@ -0,0 +1,302 @@
/**
* Ephemeral Directory Event Streaming Test
*
* Tests the core ephemeral browsing flow without React:
* 1. Subscribe to events for a directory path scope
* 2. Query the directory listing (backend returns empty, dispatches indexer)
* 3. Verify ResourceChangedBatch events arrive through the subscription
*
* This test reproduces the exact race condition that causes empty directories
* in the UI: the query triggers indexing, but events may not reach the
* subscription due to timing, buffer overflow, or filter mismatches.
*/
import "./setup";
import { describe, test, expect, beforeAll, afterAll } from "bun:test";
import { readFile } from "fs/promises";
import { hostname } from "os";
import { SpacedriveClient } from "../../src/client";
import { TcpSocketTransport } from "../../src/transport";
interface BridgeConfig {
socket_addr: string;
library_id: string;
device_slug: string;
ephemeral_dir_path: string;
test_data_path: string;
}
let bridgeConfig: BridgeConfig;
let client: SpacedriveClient;
let deviceSlug: string;
beforeAll(async () => {
const configPath = process.env.BRIDGE_CONFIG_PATH;
if (!configPath) {
throw new Error("BRIDGE_CONFIG_PATH environment variable not set");
}
const configJson = await readFile(configPath, "utf-8");
bridgeConfig = JSON.parse(configJson);
console.log(`[TS] Bridge config:`, bridgeConfig);
client = SpacedriveClient.fromTcpSocket(bridgeConfig.socket_addr);
client.setCurrentLibrary(bridgeConfig.library_id);
// Use the device slug from the daemon, not hostname() which may differ
deviceSlug = bridgeConfig.device_slug;
console.log(`[TS] Device slug (from daemon): ${deviceSlug}`);
console.log(`[TS] Hostname for comparison: ${hostname().toLowerCase().replace(/\s+/g, "-")}`);
});
describe("Ephemeral Directory Event Streaming", () => {
/**
* Test 1: Raw transport-level event delivery
*
* Subscribe via raw TCP, then query directory listing.
* This bypasses React, SubscriptionManager, and useNormalizedQuery entirely
* to test the daemon -> TCP -> event pipeline in isolation.
*/
test("events arrive via TCP subscription after directory listing query", async () => {
const ephemeralPath = bridgeConfig.ephemeral_dir_path;
const pathScope = {
Physical: {
device_slug: deviceSlug,
path: ephemeralPath,
},
};
const receivedEvents: any[] = [];
// Step 1: Subscribe FIRST
console.log(`[TS] Subscribing to events for: ${ephemeralPath}`);
const transport = new TcpSocketTransport(bridgeConfig.socket_addr);
const unsubscribe = await transport.subscribe(
(event: any) => {
console.log(`[TS] EVENT received:`, JSON.stringify(event).slice(0, 200));
receivedEvents.push(event);
},
{
event_types: [
"ResourceChanged",
"ResourceChangedBatch",
"ResourceDeleted",
"Refresh",
],
filter: {
resource_type: "file",
path_scope: pathScope,
library_id: bridgeConfig.library_id,
include_descendants: false,
},
},
);
console.log(`[TS] Subscription active`);
// Small delay to ensure subscription is fully registered on daemon
await new Promise((r) => setTimeout(r, 100));
// Step 2: Query the directory listing (triggers ephemeral indexing)
console.log(`[TS] Querying directory listing for: ${ephemeralPath}`);
const queryResult = await client.execute(
"query:files.directory_listing",
{
path: pathScope,
sort_by: "name",
limit: null,
include_hidden: false,
folders_first: true,
},
);
console.log(
`[TS] Query returned:`,
JSON.stringify(queryResult).slice(0, 200),
);
// The query may return empty (indexer dispatched async) or may return
// cached results if a previous run populated the cache.
// Either way, events should arrive.
// Step 3: Wait for events to arrive
// Ephemeral indexing is fast (<500ms), give generous timeout
const deadline = Date.now() + 10_000;
while (Date.now() < deadline) {
if (receivedEvents.length > 0) {
console.log(
`[TS] Got ${receivedEvents.length} event(s) after ${Date.now() - (deadline - 10_000)}ms`,
);
break;
}
await new Promise((r) => setTimeout(r, 50));
}
// Step 4: Verify events arrived
console.log(`[TS] Total events received: ${receivedEvents.length}`);
for (const [i, event] of receivedEvents.entries()) {
console.log(`[TS] Event ${i + 1}:`, JSON.stringify(event).slice(0, 300));
}
// Clean up subscription
unsubscribe();
// If the query returned files directly (cache hit), events may not fire.
// Check both paths.
const queryFiles = (queryResult as any)?.files ?? [];
const eventFiles: string[] = [];
for (const event of receivedEvents) {
if (event.ResourceChangedBatch) {
for (const resource of event.ResourceChangedBatch.resources) {
eventFiles.push(resource.name);
}
} else if (event.ResourceChanged) {
eventFiles.push(event.ResourceChanged.resource.name);
}
}
const totalFiles = queryFiles.length + eventFiles.length;
console.log(
`[TS] Files from query: ${queryFiles.length}, from events: ${eventFiles.length}`,
);
if (queryFiles.length > 0) {
console.log(
`[TS] Query returned files (cache hit):`,
queryFiles.map((f: any) => f.name),
);
}
if (eventFiles.length > 0) {
console.log(`[TS] Event files:`, eventFiles);
}
// We expect files to arrive via EITHER the query response (cache hit)
// OR events (first-time indexing). At least one must work.
// The directory has 6 items: document.txt, photo.jpg, notes.md, script.rs, data.json, subfolder
expect(totalFiles).toBeGreaterThanOrEqual(5);
// If the query returned empty, ALL files must come from events
if (queryFiles.length === 0) {
console.log(`[TS] Query returned empty — verifying events delivered all files`);
expect(eventFiles.length).toBeGreaterThanOrEqual(5);
}
console.log(`[TS] Event streaming test passed`);
}, 30_000);
/**
* Test 2: EventBuffer replay
*
* Query first (triggers indexing + events), THEN subscribe.
* The EventBuffer should replay recent events to the new subscription.
* This is the exact race condition the buffer was designed to solve.
*/
test("EventBuffer replays events to late subscriber", async () => {
// Use a subdirectory so it hasn't been indexed yet
const subPath = bridgeConfig.ephemeral_dir_path + "/subfolder";
const pathScope = {
Physical: {
device_slug: deviceSlug,
path: subPath,
},
};
// Step 1: Query FIRST (triggers indexing, events go to buffer)
console.log(`[TS] Querying BEFORE subscribing: ${subPath}`);
const queryResult = await client.execute(
"query:files.directory_listing",
{
path: pathScope,
sort_by: "name",
limit: null,
include_hidden: false,
folders_first: true,
},
);
console.log(
`[TS] Query returned:`,
JSON.stringify(queryResult).slice(0, 200),
);
// Small delay to let events buffer (but not expire — 5s retention)
await new Promise((r) => setTimeout(r, 500));
// Step 2: Subscribe AFTER query (late subscriber)
const replayedEvents: any[] = [];
const transport = new TcpSocketTransport(bridgeConfig.socket_addr);
const unsubscribe = await transport.subscribe(
(event: any) => {
console.log(
`[TS] REPLAYED event:`,
JSON.stringify(event).slice(0, 200),
);
replayedEvents.push(event);
},
{
event_types: [
"ResourceChanged",
"ResourceChangedBatch",
"ResourceDeleted",
"Refresh",
],
filter: {
resource_type: "file",
path_scope: pathScope,
library_id: bridgeConfig.library_id,
include_descendants: false,
},
},
);
// Wait a bit for replayed events to arrive
await new Promise((r) => setTimeout(r, 1000));
console.log(`[TS] Replayed events: ${replayedEvents.length}`);
for (const [i, event] of replayedEvents.entries()) {
console.log(
`[TS] Replayed ${i + 1}:`,
JSON.stringify(event).slice(0, 300),
);
}
unsubscribe();
// The subfolder has 1 file (nested.txt).
// If the buffer replay works, we should see it.
// If it doesn't, this test will fail and tell us the buffer is broken.
const queryFiles = (queryResult as any)?.files ?? [];
const replayFiles: string[] = [];
for (const event of replayedEvents) {
if (event.ResourceChangedBatch) {
for (const resource of event.ResourceChangedBatch.resources) {
replayFiles.push(resource.name);
}
} else if (event.ResourceChanged) {
replayFiles.push(event.ResourceChanged.resource.name);
}
}
const total = queryFiles.length + replayFiles.length;
console.log(
`[TS] Buffer replay: ${queryFiles.length} from query, ${replayFiles.length} from replay`,
);
// We expect the file to arrive via either query (cache hit from test 1)
// or buffer replay. Log clearly which path succeeded.
if (total === 0) {
console.error(
`[TS] FAILURE: No files from query or buffer replay for ${subPath}`,
);
console.error(
`[TS] This means the EventBuffer is not replaying events to late subscribers`,
);
}
expect(total).toBeGreaterThanOrEqual(1);
console.log(`[TS] Buffer replay test passed`);
}, 30_000);
});