mirror of
https://github.com/spacedriveapp/spacedrive.git
synced 2026-05-09 15:53:23 -04:00
Refactor to a single unified ephemeral index cache
This commit is contained in:
@@ -246,63 +246,91 @@ pub async fn run(ctx: &Context, cmd: IndexCmd) -> Result<()> {
|
||||
|status: &sd_core::ops::core::ephemeral_status::EphemeralCacheStatus| {
|
||||
println!();
|
||||
println!("╔══════════════════════════════════════════════════════════════╗");
|
||||
println!("║ EPHEMERAL INDEX CACHE STATUS ║");
|
||||
println!("║ UNIFIED EPHEMERAL INDEX CACHE ║");
|
||||
println!("╠══════════════════════════════════════════════════════════════╣");
|
||||
println!(
|
||||
"║ Total Indexes: {:3} In Progress: {:3} ║",
|
||||
status.total_indexes, status.indexing_in_progress
|
||||
"║ Indexed Paths: {:3} In Progress: {:3} ║",
|
||||
status.indexed_paths_count, status.indexing_in_progress_count
|
||||
);
|
||||
println!("╚══════════════════════════════════════════════════════════════╝");
|
||||
|
||||
if status.indexes.is_empty() {
|
||||
println!("\n No ephemeral indexes cached.");
|
||||
// Show unified index stats
|
||||
let stats = &status.index_stats;
|
||||
println!();
|
||||
let mut stats_table = Table::new();
|
||||
stats_table.load_preset(UTF8_BORDERS_ONLY);
|
||||
stats_table.set_header(vec![
|
||||
Cell::new("SHARED INDEX STATS").add_attribute(Attribute::Bold),
|
||||
Cell::new(""),
|
||||
]);
|
||||
|
||||
stats_table.add_row(vec![
|
||||
"Total entries (shared arena)",
|
||||
&stats.total_entries.to_string(),
|
||||
]);
|
||||
stats_table.add_row(vec![
|
||||
"Path index count",
|
||||
&stats.path_index_count.to_string(),
|
||||
]);
|
||||
stats_table.add_row(vec![
|
||||
"Unique names (shared)",
|
||||
&stats.unique_names.to_string(),
|
||||
]);
|
||||
stats_table.add_row(vec![
|
||||
"Interned strings (shared)",
|
||||
&stats.interned_strings.to_string(),
|
||||
]);
|
||||
stats_table.add_row(vec![
|
||||
"Content kinds",
|
||||
&stats.content_kinds.to_string(),
|
||||
]);
|
||||
stats_table.add_row(vec![
|
||||
"Memory usage",
|
||||
&format_bytes(stats.memory_bytes as u64),
|
||||
]);
|
||||
stats_table.add_row(vec!["Cache age", &format!("{:.1}s", stats.age_seconds)]);
|
||||
stats_table
|
||||
.add_row(vec!["Idle time", &format!("{:.1}s", stats.idle_seconds)]);
|
||||
|
||||
println!("{}", stats_table);
|
||||
|
||||
// Show indexed paths
|
||||
if status.indexed_paths.is_empty() && status.paths_in_progress.is_empty() {
|
||||
println!("\n No paths indexed yet.");
|
||||
} else {
|
||||
for idx in &status.indexes {
|
||||
// Paths in progress
|
||||
if !status.paths_in_progress.is_empty() {
|
||||
println!();
|
||||
let mut table = Table::new();
|
||||
table.load_preset(UTF8_BORDERS_ONLY);
|
||||
let mut progress_table = Table::new();
|
||||
progress_table.load_preset(UTF8_BORDERS_ONLY);
|
||||
progress_table.set_header(vec![
|
||||
Cell::new("INDEXING IN PROGRESS").add_attribute(Attribute::Bold),
|
||||
]);
|
||||
for path in &status.paths_in_progress {
|
||||
progress_table.add_row(vec![format!(
|
||||
"● {}",
|
||||
path.display()
|
||||
)]);
|
||||
}
|
||||
println!("{}", progress_table);
|
||||
}
|
||||
|
||||
let status_indicator = if idx.indexing_in_progress {
|
||||
"● INDEXING"
|
||||
} else {
|
||||
"○ Ready"
|
||||
};
|
||||
|
||||
table.set_header(vec![
|
||||
Cell::new(format!("{}", idx.root_path.display()))
|
||||
.add_attribute(Attribute::Bold),
|
||||
Cell::new(status_indicator),
|
||||
// Indexed paths
|
||||
if !status.indexed_paths.is_empty() {
|
||||
println!();
|
||||
let mut paths_table = Table::new();
|
||||
paths_table.load_preset(UTF8_BORDERS_ONLY);
|
||||
paths_table.set_header(vec![
|
||||
Cell::new("INDEXED PATHS").add_attribute(Attribute::Bold),
|
||||
Cell::new("Children"),
|
||||
]);
|
||||
|
||||
table.add_row(vec!["Entries (arena)", &idx.total_entries.to_string()]);
|
||||
table.add_row(vec![
|
||||
"Path index count",
|
||||
&idx.path_index_count.to_string(),
|
||||
]);
|
||||
table.add_row(vec!["Unique names", &idx.unique_names.to_string()]);
|
||||
table.add_row(vec![
|
||||
"Interned strings",
|
||||
&idx.interned_strings.to_string(),
|
||||
]);
|
||||
table.add_row(vec!["Content kinds", &idx.content_kinds.to_string()]);
|
||||
table.add_row(vec![
|
||||
"Memory usage",
|
||||
&format_bytes(idx.memory_bytes as u64),
|
||||
]);
|
||||
table.add_row(vec!["Age", &format!("{:.1}s", idx.age_seconds)]);
|
||||
table.add_row(vec!["Idle time", &format!("{:.1}s", idx.idle_seconds)]);
|
||||
table.add_row(vec![
|
||||
"Job stats",
|
||||
&format!(
|
||||
"{} files, {} dirs, {} symlinks, {}",
|
||||
idx.job_stats.files,
|
||||
idx.job_stats.dirs,
|
||||
idx.job_stats.symlinks,
|
||||
format_bytes(idx.job_stats.bytes)
|
||||
),
|
||||
]);
|
||||
|
||||
println!("{}", table);
|
||||
for info in &status.indexed_paths {
|
||||
paths_table.add_row(vec![
|
||||
format!("○ {}", info.path.display()),
|
||||
info.child_count.to_string(),
|
||||
]);
|
||||
}
|
||||
println!("{}", paths_table);
|
||||
}
|
||||
}
|
||||
println!();
|
||||
|
||||
@@ -4,18 +4,60 @@ use serde::{Deserialize, Serialize};
|
||||
use specta::Type;
|
||||
use std::path::PathBuf;
|
||||
|
||||
/// Status of the entire ephemeral index cache
|
||||
/// Status of the unified ephemeral index cache
|
||||
#[derive(Debug, Clone, Serialize, Deserialize, Type)]
|
||||
pub struct EphemeralCacheStatus {
|
||||
/// Total number of cached indexes
|
||||
pub total_indexes: usize,
|
||||
/// Number of indexes currently being populated
|
||||
pub indexing_in_progress: usize,
|
||||
/// Details for each cached index
|
||||
/// Number of paths that have been indexed
|
||||
pub indexed_paths_count: usize,
|
||||
/// Number of paths currently being indexed
|
||||
pub indexing_in_progress_count: usize,
|
||||
/// Unified index statistics (shared arena and string interning)
|
||||
pub index_stats: UnifiedIndexStats,
|
||||
/// List of indexed paths (directories whose contents are ready)
|
||||
pub indexed_paths: Vec<IndexedPathInfo>,
|
||||
/// List of paths currently being indexed
|
||||
pub paths_in_progress: Vec<PathBuf>,
|
||||
|
||||
// Legacy fields for backward compatibility
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
pub total_indexes: Option<usize>,
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
pub indexing_in_progress: Option<usize>,
|
||||
#[serde(skip_serializing_if = "Vec::is_empty", default)]
|
||||
pub indexes: Vec<EphemeralIndexInfo>,
|
||||
}
|
||||
|
||||
/// Information about a single ephemeral index
|
||||
/// Statistics for the unified ephemeral index
|
||||
#[derive(Debug, Clone, Serialize, Deserialize, Type)]
|
||||
pub struct UnifiedIndexStats {
|
||||
/// Total entries in the shared arena
|
||||
pub total_entries: usize,
|
||||
/// Number of entries indexed by path
|
||||
pub path_index_count: usize,
|
||||
/// Number of unique interned names (shared across all paths)
|
||||
pub unique_names: usize,
|
||||
/// Number of interned strings in shared cache
|
||||
pub interned_strings: usize,
|
||||
/// Number of content kinds stored
|
||||
pub content_kinds: usize,
|
||||
/// Estimated memory usage in bytes
|
||||
pub memory_bytes: usize,
|
||||
/// Age of the cache in seconds
|
||||
pub age_seconds: f64,
|
||||
/// Seconds since last access
|
||||
pub idle_seconds: f64,
|
||||
}
|
||||
|
||||
/// Information about an indexed path
|
||||
#[derive(Debug, Clone, Serialize, Deserialize, Type)]
|
||||
pub struct IndexedPathInfo {
|
||||
/// The directory path that was indexed
|
||||
pub path: PathBuf,
|
||||
/// Number of direct children in this directory
|
||||
pub child_count: usize,
|
||||
}
|
||||
|
||||
/// Legacy: Information about a single ephemeral index (for backward compatibility)
|
||||
#[derive(Debug, Clone, Serialize, Deserialize, Type)]
|
||||
pub struct EphemeralIndexInfo {
|
||||
/// Root path this index covers
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
//! Ephemeral index cache status query
|
||||
//!
|
||||
//! Provides a snapshot of all cached ephemeral indexes for debugging.
|
||||
//! Provides a snapshot of the unified ephemeral index for debugging.
|
||||
|
||||
use super::output::*;
|
||||
use crate::{
|
||||
@@ -14,7 +14,7 @@ use std::sync::Arc;
|
||||
/// Input for the ephemeral cache status query
|
||||
#[derive(Debug, Clone, Serialize, Deserialize, Type, Default)]
|
||||
pub struct EphemeralCacheStatusInput {
|
||||
/// Optional: only include indexes for paths containing this substring
|
||||
/// Optional: only include indexed paths containing this substring
|
||||
#[serde(default)]
|
||||
pub path_filter: Option<String>,
|
||||
}
|
||||
@@ -39,14 +39,31 @@ impl CoreQuery for EphemeralCacheStatusQuery {
|
||||
) -> QueryResult<Self::Output> {
|
||||
let cache = context.ephemeral_cache();
|
||||
|
||||
// Get basic cache stats
|
||||
// Get cache stats
|
||||
let cache_stats = cache.stats();
|
||||
let cached_paths = cache.cached_paths();
|
||||
let all_indexed_paths = cache.indexed_paths();
|
||||
let paths_in_progress = cache.paths_in_progress();
|
||||
|
||||
// Gather detailed info for each index
|
||||
let mut indexes = Vec::new();
|
||||
// Get the global index for detailed stats
|
||||
let global_index = cache.get_global_index();
|
||||
let index = global_index.read().await;
|
||||
let stats = index.get_stats();
|
||||
|
||||
for path in cached_paths {
|
||||
// Build unified index stats
|
||||
let index_stats = UnifiedIndexStats {
|
||||
total_entries: stats.total_entries,
|
||||
path_index_count: index.path_index_count(),
|
||||
unique_names: stats.unique_names,
|
||||
interned_strings: stats.interned_strings,
|
||||
content_kinds: index.content_kinds_count(),
|
||||
memory_bytes: stats.memory_bytes,
|
||||
age_seconds: cache.age().as_secs_f64(),
|
||||
idle_seconds: index.idle_time().as_secs_f64(),
|
||||
};
|
||||
|
||||
// Build indexed paths info with child counts
|
||||
let mut indexed_paths = Vec::new();
|
||||
for path in all_indexed_paths {
|
||||
// Apply path filter if provided
|
||||
if let Some(ref filter) = self.input.path_filter {
|
||||
if !path.to_string_lossy().contains(filter) {
|
||||
@@ -54,44 +71,35 @@ impl CoreQuery for EphemeralCacheStatusQuery {
|
||||
}
|
||||
}
|
||||
|
||||
// Check if indexing is in progress
|
||||
let indexing_in_progress = cache.is_indexing(&path);
|
||||
// Get child count for this directory
|
||||
let child_count = index.list_directory(&path).map(|c| c.len()).unwrap_or(0);
|
||||
|
||||
// Try to get the index to read its internal stats
|
||||
if let Some(index_arc) = cache.get(&path) {
|
||||
let index = index_arc.read().await;
|
||||
let stats = index.get_stats();
|
||||
|
||||
let info = EphemeralIndexInfo {
|
||||
root_path: index.root_path.clone(),
|
||||
indexing_in_progress,
|
||||
total_entries: stats.total_entries,
|
||||
path_index_count: index.path_index_count(),
|
||||
unique_names: stats.unique_names,
|
||||
interned_strings: stats.interned_strings,
|
||||
content_kinds: index.content_kinds_count(),
|
||||
memory_bytes: stats.memory_bytes,
|
||||
age_seconds: index.age().as_secs_f64(),
|
||||
idle_seconds: index.idle_time().as_secs_f64(),
|
||||
job_stats: JobStats {
|
||||
files: index.stats.files,
|
||||
dirs: index.stats.dirs,
|
||||
symlinks: index.stats.symlinks,
|
||||
bytes: index.stats.bytes,
|
||||
},
|
||||
};
|
||||
|
||||
indexes.push(info);
|
||||
}
|
||||
indexed_paths.push(IndexedPathInfo { path, child_count });
|
||||
}
|
||||
|
||||
// Sort by root path for consistent output
|
||||
indexes.sort_by(|a, b| a.root_path.cmp(&b.root_path));
|
||||
// Sort by path for consistent output
|
||||
indexed_paths.sort_by(|a, b| a.path.cmp(&b.path));
|
||||
|
||||
// Filter paths in progress
|
||||
let filtered_in_progress: Vec<_> = if let Some(ref filter) = self.input.path_filter {
|
||||
paths_in_progress
|
||||
.into_iter()
|
||||
.filter(|p| p.to_string_lossy().contains(filter))
|
||||
.collect()
|
||||
} else {
|
||||
paths_in_progress
|
||||
};
|
||||
|
||||
Ok(EphemeralCacheStatus {
|
||||
total_indexes: cache_stats.total_entries,
|
||||
indexing_in_progress: cache_stats.indexing_count,
|
||||
indexes,
|
||||
indexed_paths_count: cache_stats.indexed_paths,
|
||||
indexing_in_progress_count: cache_stats.indexing_in_progress,
|
||||
index_stats,
|
||||
indexed_paths,
|
||||
paths_in_progress: filtered_in_progress,
|
||||
// Legacy fields
|
||||
total_indexes: None,
|
||||
indexing_in_progress: None,
|
||||
indexes: Vec::new(),
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,159 +1,181 @@
|
||||
//! Global cache for ephemeral indexes
|
||||
//!
|
||||
//! This module provides a thread-safe cache for storing ephemeral indexes
|
||||
//! by their root path. This allows directory listing queries to reuse
|
||||
//! existing indexes instead of spawning new indexer jobs.
|
||||
//! This module provides a thread-safe cache with a SINGLE global ephemeral index.
|
||||
//! All browsed directories share the same arena and string interning pool,
|
||||
//! providing efficient memory usage through deduplication.
|
||||
//!
|
||||
//! The cache is permanent in memory (no TTL or expiration). Entries persist
|
||||
//! until the daemon restarts or they are explicitly removed. This ensures
|
||||
//! UUIDs from ephemeral indexing can be preserved when regular indexing is enabled.
|
||||
//! Key benefits of unified index:
|
||||
//! - String interning shared across all paths (common names like .git, README.md)
|
||||
//! - Single arena for all entries (~50 bytes per entry vs ~200 with HashMap)
|
||||
//! - Hierarchical structure preserved for efficient directory listings
|
||||
//!
|
||||
//! The cache tracks which paths have been indexed (ready) vs are currently
|
||||
//! being indexed (in progress).
|
||||
|
||||
use crate::ops::indexing::EphemeralIndex;
|
||||
use parking_lot::RwLock;
|
||||
use std::{
|
||||
collections::HashMap,
|
||||
collections::HashSet,
|
||||
path::{Path, PathBuf},
|
||||
sync::Arc,
|
||||
time::Instant,
|
||||
};
|
||||
use tokio::sync::RwLock as TokioRwLock;
|
||||
|
||||
/// Cache entry wrapping an ephemeral index with metadata
|
||||
struct CacheEntry {
|
||||
/// The ephemeral index
|
||||
index: Arc<TokioRwLock<EphemeralIndex>>,
|
||||
/// When this entry was created
|
||||
created_at: Instant,
|
||||
/// Whether an indexer job is currently running for this path
|
||||
indexing_in_progress: bool,
|
||||
}
|
||||
|
||||
impl CacheEntry {
|
||||
fn new(index: Arc<TokioRwLock<EphemeralIndex>>) -> Self {
|
||||
Self {
|
||||
index,
|
||||
created_at: Instant::now(),
|
||||
indexing_in_progress: false,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Global cache for ephemeral indexes
|
||||
/// Global cache with a single unified ephemeral index
|
||||
///
|
||||
/// Stores ephemeral indexes by their root path for reuse across queries.
|
||||
/// Indexes persist in memory until the daemon restarts or they are explicitly removed.
|
||||
/// Instead of separate indexes per path, all entries live in one shared index.
|
||||
/// This maximizes memory efficiency through shared string interning and arena.
|
||||
pub struct EphemeralIndexCache {
|
||||
/// Map of root path to cache entry
|
||||
entries: RwLock<HashMap<PathBuf, CacheEntry>>,
|
||||
/// Single global index containing all browsed entries
|
||||
index: Arc<TokioRwLock<EphemeralIndex>>,
|
||||
|
||||
/// Paths whose immediate children have been indexed (ready for queries)
|
||||
indexed_paths: RwLock<HashSet<PathBuf>>,
|
||||
|
||||
/// Paths currently being indexed
|
||||
indexing_in_progress: RwLock<HashSet<PathBuf>>,
|
||||
|
||||
/// When the cache was created
|
||||
created_at: Instant,
|
||||
}
|
||||
|
||||
impl EphemeralIndexCache {
|
||||
/// Create a new cache
|
||||
/// Create a new cache with an empty global index
|
||||
pub fn new() -> Self {
|
||||
Self {
|
||||
entries: RwLock::new(HashMap::new()),
|
||||
index: Arc::new(TokioRwLock::new(EphemeralIndex::new())),
|
||||
indexed_paths: RwLock::new(HashSet::new()),
|
||||
indexing_in_progress: RwLock::new(HashSet::new()),
|
||||
created_at: Instant::now(),
|
||||
}
|
||||
}
|
||||
|
||||
/// Get an existing index for a path, or None if not cached
|
||||
pub fn get(&self, path: &Path) -> Option<Arc<TokioRwLock<EphemeralIndex>>> {
|
||||
let entries = self.entries.read();
|
||||
entries.get(path).map(|entry| entry.index.clone())
|
||||
/// Get the global index if the given path has been indexed
|
||||
///
|
||||
/// Returns Some(index) if this path's contents are available,
|
||||
/// None if the path hasn't been browsed yet.
|
||||
pub fn get_for_path(&self, path: &Path) -> Option<Arc<TokioRwLock<EphemeralIndex>>> {
|
||||
let indexed = self.indexed_paths.read();
|
||||
if indexed.contains(path) {
|
||||
Some(self.index.clone())
|
||||
} else {
|
||||
None
|
||||
}
|
||||
}
|
||||
|
||||
/// Get an existing index for a path (exact match only)
|
||||
///
|
||||
/// Returns the index if an index exists for this exact path.
|
||||
///
|
||||
/// Note: We only use exact matches because ephemeral indexing uses
|
||||
/// IndexScope::Current (single level), so an ancestor index doesn't
|
||||
/// contain the contents of subdirectories.
|
||||
pub fn get_for_path(&self, path: &Path) -> Option<Arc<TokioRwLock<EphemeralIndex>>> {
|
||||
self.get(path)
|
||||
/// Get the global index unconditionally (for internal use)
|
||||
pub fn get_global_index(&self) -> Arc<TokioRwLock<EphemeralIndex>> {
|
||||
self.index.clone()
|
||||
}
|
||||
|
||||
/// Check if a path has been fully indexed
|
||||
pub fn is_indexed(&self, path: &Path) -> bool {
|
||||
self.indexed_paths.read().contains(path)
|
||||
}
|
||||
|
||||
/// Check if indexing is in progress for a path
|
||||
pub fn is_indexing(&self, path: &Path) -> bool {
|
||||
let entries = self.entries.read();
|
||||
entries
|
||||
.get(path)
|
||||
.map(|e| e.indexing_in_progress)
|
||||
.unwrap_or(false)
|
||||
self.indexing_in_progress.read().contains(path)
|
||||
}
|
||||
|
||||
/// Insert or update an index in the cache
|
||||
pub fn insert(&self, path: PathBuf, index: Arc<TokioRwLock<EphemeralIndex>>) {
|
||||
let mut entries = self.entries.write();
|
||||
entries.insert(path, CacheEntry::new(index));
|
||||
}
|
||||
|
||||
/// Create a new index for a path and mark it as indexing in progress
|
||||
/// Prepare the global index for indexing a new path
|
||||
///
|
||||
/// Returns the index to be used by the indexer job.
|
||||
/// Marks the path as indexing-in-progress and returns the global index.
|
||||
/// The indexer job should add entries to this shared index.
|
||||
pub fn create_for_indexing(&self, path: PathBuf) -> Arc<TokioRwLock<EphemeralIndex>> {
|
||||
let mut entries = self.entries.write();
|
||||
|
||||
// Check if entry already exists
|
||||
if let Some(entry) = entries.get_mut(&path) {
|
||||
entry.indexing_in_progress = true;
|
||||
return entry.index.clone();
|
||||
}
|
||||
|
||||
// Create new entry
|
||||
let index = Arc::new(TokioRwLock::new(EphemeralIndex::new(path.clone())));
|
||||
let mut entry = CacheEntry::new(index.clone());
|
||||
entry.indexing_in_progress = true;
|
||||
entries.insert(path, entry);
|
||||
index
|
||||
let mut in_progress = self.indexing_in_progress.write();
|
||||
in_progress.insert(path);
|
||||
self.index.clone()
|
||||
}
|
||||
|
||||
/// Mark indexing as complete for a path
|
||||
///
|
||||
/// Moves the path from "in progress" to "indexed" state.
|
||||
pub fn mark_indexing_complete(&self, path: &Path) {
|
||||
let mut entries = self.entries.write();
|
||||
if let Some(entry) = entries.get_mut(path) {
|
||||
entry.indexing_in_progress = false;
|
||||
}
|
||||
let mut in_progress = self.indexing_in_progress.write();
|
||||
let mut indexed = self.indexed_paths.write();
|
||||
|
||||
in_progress.remove(path);
|
||||
indexed.insert(path.to_path_buf());
|
||||
}
|
||||
|
||||
/// Remove an index from the cache
|
||||
pub fn remove(&self, path: &Path) {
|
||||
let mut entries = self.entries.write();
|
||||
entries.remove(path);
|
||||
/// Remove a path from the indexed set (e.g., on invalidation)
|
||||
///
|
||||
/// Note: This doesn't remove entries from the index itself,
|
||||
/// just marks the path as needing re-indexing.
|
||||
pub fn invalidate_path(&self, path: &Path) {
|
||||
let mut indexed = self.indexed_paths.write();
|
||||
indexed.remove(path);
|
||||
}
|
||||
|
||||
/// Get the number of cached indexes
|
||||
/// Get the number of indexed paths
|
||||
pub fn len(&self) -> usize {
|
||||
self.entries.read().len()
|
||||
self.indexed_paths.read().len()
|
||||
}
|
||||
|
||||
/// Check if the cache is empty
|
||||
/// Check if no paths have been indexed
|
||||
pub fn is_empty(&self) -> bool {
|
||||
self.entries.read().is_empty()
|
||||
self.indexed_paths.read().is_empty()
|
||||
}
|
||||
|
||||
/// Get all cached root paths
|
||||
pub fn cached_paths(&self) -> Vec<PathBuf> {
|
||||
self.entries.read().keys().cloned().collect()
|
||||
/// Get all indexed paths
|
||||
pub fn indexed_paths(&self) -> Vec<PathBuf> {
|
||||
self.indexed_paths.read().iter().cloned().collect()
|
||||
}
|
||||
|
||||
/// Get all paths currently being indexed
|
||||
pub fn paths_in_progress(&self) -> Vec<PathBuf> {
|
||||
self.indexing_in_progress.read().iter().cloned().collect()
|
||||
}
|
||||
|
||||
/// Get cache statistics
|
||||
pub fn stats(&self) -> EphemeralIndexCacheStats {
|
||||
let entries = self.entries.read();
|
||||
let total_entries = entries.len();
|
||||
let indexing_count = entries.values().filter(|e| e.indexing_in_progress).count();
|
||||
let indexed = self.indexed_paths.read();
|
||||
let in_progress = self.indexing_in_progress.read();
|
||||
|
||||
EphemeralIndexCacheStats {
|
||||
total_entries,
|
||||
indexing_count,
|
||||
indexed_paths: indexed.len(),
|
||||
indexing_in_progress: in_progress.len(),
|
||||
}
|
||||
}
|
||||
|
||||
/// Get the age of a cached index in seconds
|
||||
pub fn get_age(&self, path: &Path) -> Option<f64> {
|
||||
let entries = self.entries.read();
|
||||
entries
|
||||
.get(path)
|
||||
.map(|e| e.created_at.elapsed().as_secs_f64())
|
||||
/// Get how long the cache has existed
|
||||
pub fn age(&self) -> std::time::Duration {
|
||||
self.created_at.elapsed()
|
||||
}
|
||||
|
||||
/// Legacy: Get age for a specific path (returns cache age since all share one index)
|
||||
pub fn get_age(&self, _path: &Path) -> Option<f64> {
|
||||
Some(self.created_at.elapsed().as_secs_f64())
|
||||
}
|
||||
|
||||
// Legacy compatibility methods
|
||||
|
||||
/// Legacy: Get an index by exact path (for backward compatibility)
|
||||
#[deprecated(note = "Use get_for_path instead")]
|
||||
pub fn get(&self, path: &Path) -> Option<Arc<TokioRwLock<EphemeralIndex>>> {
|
||||
self.get_for_path(path)
|
||||
}
|
||||
|
||||
/// Legacy: Get all cached paths (returns indexed paths)
|
||||
#[deprecated(note = "Use indexed_paths instead")]
|
||||
pub fn cached_paths(&self) -> Vec<PathBuf> {
|
||||
self.indexed_paths()
|
||||
}
|
||||
|
||||
/// Legacy: Insert (no-op, entries are added directly to global index)
|
||||
#[deprecated(note = "Entries should be added directly to the global index")]
|
||||
pub fn insert(&self, path: PathBuf, _index: Arc<TokioRwLock<EphemeralIndex>>) {
|
||||
// Mark the path as indexed
|
||||
let mut indexed = self.indexed_paths.write();
|
||||
indexed.insert(path);
|
||||
}
|
||||
|
||||
/// Legacy: Remove (just invalidates the path)
|
||||
#[deprecated(note = "Use invalidate_path instead")]
|
||||
pub fn remove(&self, path: &Path) {
|
||||
self.invalidate_path(path);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -166,8 +188,24 @@ impl Default for EphemeralIndexCache {
|
||||
/// Statistics about the ephemeral index cache
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct EphemeralIndexCacheStats {
|
||||
pub total_entries: usize,
|
||||
pub indexing_count: usize,
|
||||
/// Number of paths that have been indexed
|
||||
pub indexed_paths: usize,
|
||||
/// Number of paths currently being indexed
|
||||
pub indexing_in_progress: usize,
|
||||
|
||||
// Legacy field names for compatibility
|
||||
}
|
||||
|
||||
impl EphemeralIndexCacheStats {
|
||||
/// Legacy: total_entries now means indexed_paths
|
||||
pub fn total_entries(&self) -> usize {
|
||||
self.indexed_paths
|
||||
}
|
||||
|
||||
/// Legacy: indexing_count now means indexing_in_progress
|
||||
pub fn indexing_count(&self) -> usize {
|
||||
self.indexing_in_progress
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
@@ -175,80 +213,90 @@ mod tests {
|
||||
use super::*;
|
||||
|
||||
#[test]
|
||||
fn test_insert_and_get() {
|
||||
fn test_single_global_index() {
|
||||
let cache = EphemeralIndexCache::new();
|
||||
let path = PathBuf::from("/test/path");
|
||||
let index = Arc::new(TokioRwLock::new(EphemeralIndex::new(path.clone())));
|
||||
|
||||
cache.insert(path.clone(), index.clone());
|
||||
|
||||
assert!(cache.get(&path).is_some());
|
||||
assert_eq!(cache.len(), 1);
|
||||
// Initially no paths are indexed
|
||||
assert!(cache.is_empty());
|
||||
assert!(cache.get_for_path(Path::new("/test")).is_none());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_get_nonexistent() {
|
||||
let cache = EphemeralIndexCache::new();
|
||||
assert!(cache.get(Path::new("/nonexistent")).is_none());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_create_for_indexing() {
|
||||
fn test_indexing_workflow() {
|
||||
let cache = EphemeralIndexCache::new();
|
||||
let path = PathBuf::from("/test/path");
|
||||
|
||||
// Start indexing
|
||||
let _index = cache.create_for_indexing(path.clone());
|
||||
|
||||
assert!(cache.is_indexing(&path));
|
||||
assert!(!cache.is_indexed(&path));
|
||||
|
||||
// Complete indexing
|
||||
cache.mark_indexing_complete(&path);
|
||||
|
||||
assert!(!cache.is_indexing(&path));
|
||||
assert!(cache.is_indexed(&path));
|
||||
|
||||
// Now get_for_path returns the index
|
||||
assert!(cache.get_for_path(&path).is_some());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_remove() {
|
||||
fn test_shared_index_across_paths() {
|
||||
let cache = EphemeralIndexCache::new();
|
||||
|
||||
let path1 = PathBuf::from("/test/path1");
|
||||
let path2 = PathBuf::from("/test/path2");
|
||||
|
||||
// Start indexing both paths
|
||||
let index1 = cache.create_for_indexing(path1.clone());
|
||||
let index2 = cache.create_for_indexing(path2.clone());
|
||||
|
||||
// They should be the same index
|
||||
assert!(Arc::ptr_eq(&index1, &index2));
|
||||
|
||||
// Complete both
|
||||
cache.mark_indexing_complete(&path1);
|
||||
cache.mark_indexing_complete(&path2);
|
||||
|
||||
// Both paths now indexed
|
||||
assert!(cache.is_indexed(&path1));
|
||||
assert!(cache.is_indexed(&path2));
|
||||
assert_eq!(cache.len(), 2);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_invalidate_path() {
|
||||
let cache = EphemeralIndexCache::new();
|
||||
let path = PathBuf::from("/test/path");
|
||||
let index = Arc::new(TokioRwLock::new(EphemeralIndex::new(path.clone())));
|
||||
|
||||
cache.insert(path.clone(), index);
|
||||
assert_eq!(cache.len(), 1);
|
||||
// Index the path
|
||||
let _index = cache.create_for_indexing(path.clone());
|
||||
cache.mark_indexing_complete(&path);
|
||||
assert!(cache.is_indexed(&path));
|
||||
|
||||
cache.remove(&path);
|
||||
assert_eq!(cache.len(), 0);
|
||||
// Invalidate it
|
||||
cache.invalidate_path(&path);
|
||||
assert!(!cache.is_indexed(&path));
|
||||
|
||||
// get_for_path now returns None
|
||||
assert!(cache.get_for_path(&path).is_none());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_get_for_path_exact_match_only() {
|
||||
fn test_stats() {
|
||||
let cache = EphemeralIndexCache::new();
|
||||
let root = PathBuf::from("/test");
|
||||
let child = PathBuf::from("/test/subdir/file.txt");
|
||||
let index = Arc::new(TokioRwLock::new(EphemeralIndex::new(root.clone())));
|
||||
|
||||
cache.insert(root.clone(), index);
|
||||
let path1 = PathBuf::from("/ready");
|
||||
let path2 = PathBuf::from("/in_progress");
|
||||
|
||||
// Should NOT find ancestor index - we only use exact matches
|
||||
// because ephemeral indexing is single-level (IndexScope::Current)
|
||||
assert!(cache.get_for_path(&child).is_none());
|
||||
// One indexed, one in progress
|
||||
let _index = cache.create_for_indexing(path1.clone());
|
||||
cache.mark_indexing_complete(&path1);
|
||||
|
||||
// Should find exact match
|
||||
assert!(cache.get_for_path(&root).is_some());
|
||||
}
|
||||
let _index = cache.create_for_indexing(path2.clone());
|
||||
|
||||
#[test]
|
||||
fn test_cache_persists() {
|
||||
// Test that cache entries persist (no TTL expiration)
|
||||
let cache = EphemeralIndexCache::new();
|
||||
let path = PathBuf::from("/test/path");
|
||||
let index = Arc::new(TokioRwLock::new(EphemeralIndex::new(path.clone())));
|
||||
|
||||
cache.insert(path.clone(), index);
|
||||
|
||||
// Wait a bit
|
||||
std::thread::sleep(std::time::Duration::from_millis(100));
|
||||
|
||||
// Should still be available (no expiration)
|
||||
assert!(cache.get(&path).is_some());
|
||||
let stats = cache.stats();
|
||||
assert_eq!(stats.indexed_paths, 1);
|
||||
assert_eq!(stats.indexing_in_progress, 1);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -26,9 +26,10 @@
|
||||
//! ```rust,ignore
|
||||
//! use sd_core::ops::indexing::ephemeral::EphemeralIndex;
|
||||
//!
|
||||
//! let mut index = EphemeralIndex::new("/path/to/root".into());
|
||||
//! // Create a unified index (supports multiple directory trees)
|
||||
//! let mut index = EphemeralIndex::new();
|
||||
//!
|
||||
//! // Add entries
|
||||
//! // Add entries with full paths - parent chain is created automatically
|
||||
//! index.add_entry(path, uuid, metadata);
|
||||
//!
|
||||
//! // Query
|
||||
|
||||
@@ -8,7 +8,12 @@ use crate::{
|
||||
use sea_orm::{ColumnTrait, EntityTrait, QueryFilter};
|
||||
use serde::{Deserialize, Serialize};
|
||||
use specta::Type;
|
||||
use std::{collections::HashMap, path::PathBuf, sync::Arc, time::Duration};
|
||||
use std::{
|
||||
collections::HashMap,
|
||||
path::{Path, PathBuf},
|
||||
sync::Arc,
|
||||
time::Duration,
|
||||
};
|
||||
use tokio::sync::RwLock;
|
||||
use tracing::{info, warn};
|
||||
use uuid::Uuid;
|
||||
@@ -155,18 +160,19 @@ impl IndexerJobConfig {
|
||||
///
|
||||
/// This implementation uses efficient data structures for memory optimization:
|
||||
/// - NodeArena: Contiguous storage for file nodes (~48 bytes per node)
|
||||
/// - NameCache: String interning for common filenames
|
||||
/// - NameCache: String interning for common filenames (shared across all entries)
|
||||
/// - NameRegistry: Fast name-based lookups
|
||||
///
|
||||
/// All browsed paths share a single index, maximizing string deduplication
|
||||
/// and memory efficiency. Parent-child relationships are established based
|
||||
/// on path hierarchy.
|
||||
///
|
||||
/// Memory usage: ~50 bytes per entry vs ~200 bytes with HashMap
|
||||
pub struct EphemeralIndex {
|
||||
/// Efficient tree storage
|
||||
arena: super::ephemeral::NodeArena,
|
||||
|
||||
/// Root node
|
||||
root: super::ephemeral::EntryId,
|
||||
|
||||
/// String interning
|
||||
/// String interning (shared across all paths)
|
||||
cache: std::sync::Arc<super::ephemeral::NameCache>,
|
||||
|
||||
/// Fast name lookups
|
||||
@@ -184,66 +190,104 @@ pub struct EphemeralIndex {
|
||||
/// Metadata
|
||||
created_at: std::time::Instant,
|
||||
last_accessed: std::time::Instant,
|
||||
pub root_path: PathBuf,
|
||||
pub stats: IndexerStats,
|
||||
}
|
||||
|
||||
impl std::fmt::Debug for EphemeralIndex {
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||
f.debug_struct("EphemeralIndex")
|
||||
.field("root_path", &self.root_path)
|
||||
.field("entry_count", &self.arena.len())
|
||||
.field("interned_names", &self.cache.len())
|
||||
.field("path_count", &self.path_index.len())
|
||||
.finish()
|
||||
}
|
||||
}
|
||||
|
||||
impl EphemeralIndex {
|
||||
pub fn new(root_path: PathBuf) -> Self {
|
||||
use super::ephemeral::{
|
||||
FileNode, FileType, MaybeEntryId, NameCache, NameRef, NameRegistry, NodeArena,
|
||||
NodeState, PackedMetadata,
|
||||
};
|
||||
/// Create a new empty ephemeral index
|
||||
///
|
||||
/// The index stores entries with their full paths and builds parent-child
|
||||
/// relationships based on path hierarchy. Multiple directory trees can
|
||||
/// coexist in the same index, sharing the arena and string interning pool.
|
||||
pub fn new() -> Self {
|
||||
use super::ephemeral::{NameCache, NameRegistry, NodeArena};
|
||||
|
||||
let cache = std::sync::Arc::new(NameCache::new());
|
||||
let mut arena = NodeArena::new();
|
||||
let arena = NodeArena::new();
|
||||
let registry = NameRegistry::new();
|
||||
|
||||
// Create root node
|
||||
let root_name = cache.intern(
|
||||
root_path
|
||||
.file_name()
|
||||
let now = std::time::Instant::now();
|
||||
|
||||
Self {
|
||||
arena,
|
||||
cache,
|
||||
registry,
|
||||
path_index: HashMap::new(),
|
||||
entry_uuids: HashMap::new(),
|
||||
content_kinds: HashMap::new(),
|
||||
created_at: now,
|
||||
last_accessed: now,
|
||||
stats: IndexerStats::default(),
|
||||
}
|
||||
}
|
||||
|
||||
/// Ensure a directory exists in the index, creating ancestor chain if needed
|
||||
///
|
||||
/// Returns the EntryId of the directory.
|
||||
pub fn ensure_directory(&mut self, path: &Path) -> super::ephemeral::EntryId {
|
||||
use super::ephemeral::{
|
||||
FileNode, FileType, MaybeEntryId, NameRef, NodeState, PackedMetadata,
|
||||
};
|
||||
use super::state::EntryKind;
|
||||
|
||||
// Already exists?
|
||||
if let Some(&id) = self.path_index.get(path) {
|
||||
return id;
|
||||
}
|
||||
|
||||
// Ensure parent exists first (recursive)
|
||||
let parent_id = if let Some(parent_path) = path.parent() {
|
||||
if parent_path.as_os_str().is_empty() {
|
||||
None
|
||||
} else {
|
||||
Some(self.ensure_directory(parent_path))
|
||||
}
|
||||
} else {
|
||||
None
|
||||
};
|
||||
|
||||
// Create this directory
|
||||
let name = self.cache.intern(
|
||||
path.file_name()
|
||||
.map(|s| s.to_string_lossy())
|
||||
.as_deref()
|
||||
.unwrap_or("/"),
|
||||
);
|
||||
|
||||
let root_node = FileNode::new(
|
||||
NameRef::new(root_name, MaybeEntryId::NONE),
|
||||
PackedMetadata::new(NodeState::Accessible, FileType::Directory, 0),
|
||||
);
|
||||
let parent_ref = parent_id
|
||||
.map(MaybeEntryId::some)
|
||||
.unwrap_or(MaybeEntryId::NONE);
|
||||
let meta = PackedMetadata::new(NodeState::Accessible, FileType::Directory, 0);
|
||||
let node = FileNode::new(NameRef::new(name, parent_ref), meta);
|
||||
|
||||
let root = arena.insert(root_node);
|
||||
let id = self.arena.insert(node);
|
||||
|
||||
let now = std::time::Instant::now();
|
||||
|
||||
// Add root path to path_index so list_directory works for the root
|
||||
let mut path_index = HashMap::new();
|
||||
path_index.insert(root_path.clone(), root);
|
||||
|
||||
Self {
|
||||
arena,
|
||||
root,
|
||||
cache,
|
||||
registry,
|
||||
path_index,
|
||||
entry_uuids: HashMap::new(),
|
||||
content_kinds: HashMap::new(),
|
||||
created_at: now,
|
||||
last_accessed: now,
|
||||
root_path,
|
||||
stats: IndexerStats::default(),
|
||||
// Add to parent's children
|
||||
if let Some(parent_id) = parent_id {
|
||||
if let Some(parent) = self.arena.get_mut(parent_id) {
|
||||
parent.add_child(id);
|
||||
}
|
||||
}
|
||||
|
||||
// Index by path and name
|
||||
self.path_index.insert(path.to_path_buf(), id);
|
||||
self.registry.insert(name, id);
|
||||
|
||||
// Generate UUID for directory
|
||||
let uuid = uuid::Uuid::new_v4();
|
||||
self.entry_uuids.insert(path.to_path_buf(), uuid);
|
||||
|
||||
id
|
||||
}
|
||||
|
||||
/// Add an entry to the index. Returns Some(content_kind) if added, None if duplicate.
|
||||
@@ -265,7 +309,24 @@ impl EphemeralIndex {
|
||||
return None;
|
||||
}
|
||||
|
||||
// Intern the filename
|
||||
// Ensure parent directory exists in the index FIRST (requires &mut self)
|
||||
// This must happen before interning the name to avoid borrow conflicts
|
||||
let parent_id = if let Some(parent_path) = path.parent() {
|
||||
if parent_path.as_os_str().is_empty() {
|
||||
// Root of filesystem, no parent
|
||||
None
|
||||
} else if let Some(&existing_id) = self.path_index.get(parent_path) {
|
||||
// Parent already exists
|
||||
Some(existing_id)
|
||||
} else {
|
||||
// Parent doesn't exist - ensure it (and ancestors) are created
|
||||
Some(self.ensure_directory(parent_path))
|
||||
}
|
||||
} else {
|
||||
None
|
||||
};
|
||||
|
||||
// Now intern the filename (borrows self.cache immutably)
|
||||
let name = self.cache.intern(
|
||||
path.file_name()
|
||||
.map(|s| s.to_string_lossy())
|
||||
@@ -273,12 +334,6 @@ impl EphemeralIndex {
|
||||
.unwrap_or("unknown"),
|
||||
);
|
||||
|
||||
// Find parent
|
||||
let parent_id = path
|
||||
.parent()
|
||||
.and_then(|p| self.path_index.get(p).copied())
|
||||
.unwrap_or(self.root);
|
||||
|
||||
// Create metadata
|
||||
let file_type = FileType::from(metadata.kind);
|
||||
|
||||
@@ -286,13 +341,18 @@ impl EphemeralIndex {
|
||||
.with_times(metadata.modified, metadata.created);
|
||||
|
||||
// Create node
|
||||
let node = FileNode::new(NameRef::new(name, MaybeEntryId::some(parent_id)), meta);
|
||||
let parent_ref = parent_id
|
||||
.map(MaybeEntryId::some)
|
||||
.unwrap_or(MaybeEntryId::NONE);
|
||||
let node = FileNode::new(NameRef::new(name, parent_ref), meta);
|
||||
|
||||
let id = self.arena.insert(node);
|
||||
|
||||
// Add to parent's children
|
||||
if let Some(parent) = self.arena.get_mut(parent_id) {
|
||||
parent.add_child(id);
|
||||
if let Some(parent_id) = parent_id {
|
||||
if let Some(parent) = self.arena.get_mut(parent_id) {
|
||||
parent.add_child(id);
|
||||
}
|
||||
}
|
||||
|
||||
// Detect content kind by extension (fast, no I/O)
|
||||
@@ -394,20 +454,23 @@ impl EphemeralIndex {
|
||||
let mut segments = Vec::new();
|
||||
let mut current = id;
|
||||
|
||||
// Walk up the tree collecting path segments
|
||||
while let Some(node) = self.arena.get(current) {
|
||||
segments.push(node.name().to_owned());
|
||||
if let Some(parent) = node.parent() {
|
||||
segments.push(node.name().to_owned());
|
||||
current = parent;
|
||||
} else {
|
||||
// Reached a root node (no parent)
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
if segments.is_empty() {
|
||||
return Some(self.root_path.clone());
|
||||
return None;
|
||||
}
|
||||
|
||||
let mut path = self.root_path.clone();
|
||||
// Build absolute path from segments (root to leaf)
|
||||
let mut path = PathBuf::from("/");
|
||||
for segment in segments.into_iter().rev() {
|
||||
path.push(segment);
|
||||
}
|
||||
@@ -519,6 +582,12 @@ impl EphemeralIndex {
|
||||
}
|
||||
}
|
||||
|
||||
impl Default for EphemeralIndex {
|
||||
fn default() -> Self {
|
||||
Self::new()
|
||||
}
|
||||
}
|
||||
|
||||
/// Statistics about an ephemeral index
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct EphemeralIndexStats {
|
||||
@@ -575,13 +644,7 @@ impl JobHandler for IndexerJob {
|
||||
|
||||
// Initialize ephemeral index if needed
|
||||
if self.config.is_ephemeral() && self.ephemeral_index.is_none() {
|
||||
let root_path =
|
||||
self.config.path.as_local_path().ok_or_else(|| {
|
||||
JobError::execution("Path not accessible locally".to_string())
|
||||
})?;
|
||||
self.ephemeral_index = Some(Arc::new(RwLock::new(EphemeralIndex::new(
|
||||
root_path.to_path_buf(),
|
||||
))));
|
||||
self.ephemeral_index = Some(Arc::new(RwLock::new(EphemeralIndex::new())));
|
||||
ctx.log("Initialized ephemeral index for non-persistent job");
|
||||
}
|
||||
|
||||
@@ -750,6 +813,7 @@ impl JobHandler for IndexerJob {
|
||||
state,
|
||||
&ctx,
|
||||
ephemeral_index,
|
||||
root_path,
|
||||
volume_backend.as_ref(),
|
||||
)
|
||||
.await?;
|
||||
@@ -879,15 +943,14 @@ impl JobHandler for IndexerJob {
|
||||
|
||||
// Mark ephemeral indexing as complete in the cache
|
||||
if self.config.is_ephemeral() {
|
||||
if let Some(ephemeral_index) = &self.ephemeral_index {
|
||||
let root_path = ephemeral_index.read().await.root_path.clone();
|
||||
if let Some(local_path) = self.config.path.as_local_path() {
|
||||
ctx.library()
|
||||
.core_context()
|
||||
.ephemeral_cache()
|
||||
.mark_indexing_complete(&root_path);
|
||||
.mark_indexing_complete(local_path);
|
||||
ctx.log(format!(
|
||||
"Marked ephemeral indexing complete for: {}",
|
||||
root_path.display()
|
||||
local_path.display()
|
||||
));
|
||||
}
|
||||
}
|
||||
@@ -1076,31 +1139,29 @@ impl IndexerJob {
|
||||
state: &mut IndexerState,
|
||||
ctx: &JobContext<'_>,
|
||||
ephemeral_index: Arc<RwLock<EphemeralIndex>>,
|
||||
volume_backend: Option<&Arc<dyn crate::volume::VolumeBackend>>,
|
||||
root_path: &Path,
|
||||
_volume_backend: Option<&Arc<dyn crate::volume::VolumeBackend>>,
|
||||
) -> JobResult<()> {
|
||||
use super::persistence::PersistenceFactory;
|
||||
|
||||
ctx.log("Starting ephemeral processing");
|
||||
|
||||
// Get root path from ephemeral index
|
||||
let root_path = {
|
||||
let index = ephemeral_index.read().await;
|
||||
index.root_path.clone()
|
||||
};
|
||||
|
||||
// Get event bus from library
|
||||
let event_bus = Some(ctx.library().event_bus().clone());
|
||||
|
||||
// Create ephemeral persistence layer (emits events as entries are stored)
|
||||
let persistence =
|
||||
PersistenceFactory::ephemeral(ephemeral_index.clone(), event_bus, root_path.clone());
|
||||
let persistence = PersistenceFactory::ephemeral(
|
||||
ephemeral_index.clone(),
|
||||
event_bus,
|
||||
root_path.to_path_buf(),
|
||||
);
|
||||
|
||||
// Process all batches through persistence layer
|
||||
while let Some(batch) = state.entry_batches.pop() {
|
||||
for entry in batch {
|
||||
// Store entry (this will emit ResourceChanged events)
|
||||
// Content kind is identified by extension during add_entry, no hashing needed
|
||||
let _entry_id = persistence.store_entry(&entry, None, &root_path).await?;
|
||||
let _entry_id = persistence.store_entry(&entry, None, root_path).await?;
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -620,9 +620,7 @@ mod tests {
|
||||
std::fs::write(&test_file, b"test content").unwrap();
|
||||
|
||||
// Create ephemeral index
|
||||
let index = Arc::new(RwLock::new(EphemeralIndex::new(
|
||||
temp_dir.path().to_path_buf(),
|
||||
)));
|
||||
let index = Arc::new(RwLock::new(EphemeralIndex::new()));
|
||||
|
||||
// Create event collector
|
||||
let collected_events = Arc::new(Mutex::new(Vec::new()));
|
||||
|
||||
@@ -104,7 +104,7 @@ impl IndexVerifyAction {
|
||||
tracing::debug!("Running ephemeral indexer job on {}", path.display());
|
||||
|
||||
// Create ephemeral index storage that we'll share with the job
|
||||
let ephemeral_index = Arc::new(RwLock::new(EphemeralIndex::new(path.to_path_buf())));
|
||||
let ephemeral_index = Arc::new(RwLock::new(EphemeralIndex::new()));
|
||||
|
||||
// Subscribe to job events before dispatching
|
||||
let mut event_subscriber = context.events.subscribe();
|
||||
|
||||
Reference in New Issue
Block a user