mirror of
https://github.com/matrix-org/matrix-rust-sdk.git
synced 2026-05-09 08:27:32 -04:00
fix(sdk): Disable OrderTracker for the moment.
This patch removes the use of `OrderTracker` because the implementation of `EventCacheStore::load_all_chunks_metadata` for `SqliteEventCacheStore` is the cause of severe slownesses (up to 100s for some account). We are going to undo this patch once the problem has been solved.
This commit is contained in:
@@ -19,7 +19,7 @@ use matrix_sdk_base::{
|
||||
event_cache::store::DEFAULT_CHUNK_CAPACITY,
|
||||
linked_chunk::{
|
||||
lazy_loader::{self, LazyLoaderError},
|
||||
ChunkContent, ChunkIdentifierGenerator, ChunkMetadata, OrderTracker, RawChunk,
|
||||
ChunkContent, ChunkIdentifierGenerator, RawChunk,
|
||||
},
|
||||
};
|
||||
use matrix_sdk_common::linked_chunk::{
|
||||
@@ -38,9 +38,6 @@ pub(in crate::event_cache) struct EventLinkedChunk {
|
||||
///
|
||||
/// [`Update`]: matrix_sdk_base::linked_chunk::Update
|
||||
chunks_updates_as_vectordiffs: AsVector<Event, Gap>,
|
||||
|
||||
/// Tracker of the events ordering in this room.
|
||||
pub order_tracker: OrderTracker<Event, Gap>,
|
||||
}
|
||||
|
||||
impl Default for EventLinkedChunk {
|
||||
@@ -52,7 +49,7 @@ impl Default for EventLinkedChunk {
|
||||
impl EventLinkedChunk {
|
||||
/// Build a new [`EventLinkedChunk`] struct with zero events.
|
||||
pub fn new() -> Self {
|
||||
Self::with_initial_linked_chunk(None, None)
|
||||
Self::with_initial_linked_chunk(None)
|
||||
}
|
||||
|
||||
/// Build a new [`EventLinkedChunk`] struct with prior chunks knowledge.
|
||||
@@ -60,7 +57,6 @@ impl EventLinkedChunk {
|
||||
/// The provided [`LinkedChunk`] must have been built with update history.
|
||||
pub fn with_initial_linked_chunk(
|
||||
linked_chunk: Option<LinkedChunk<DEFAULT_CHUNK_CAPACITY, Event, Gap>>,
|
||||
full_linked_chunk_metadata: Option<Vec<ChunkMetadata>>,
|
||||
) -> Self {
|
||||
let mut linked_chunk = linked_chunk.unwrap_or_else(LinkedChunk::new_with_update_history);
|
||||
|
||||
@@ -68,11 +64,7 @@ impl EventLinkedChunk {
|
||||
.as_vector()
|
||||
.expect("`LinkedChunk` must have been built with `new_with_update_history`");
|
||||
|
||||
let order_tracker = linked_chunk
|
||||
.order_tracker(full_linked_chunk_metadata)
|
||||
.expect("`LinkedChunk` must have been built with `new_with_update_history`");
|
||||
|
||||
Self { chunks: linked_chunk, chunks_updates_as_vectordiffs, order_tracker }
|
||||
Self { chunks: linked_chunk, chunks_updates_as_vectordiffs }
|
||||
}
|
||||
|
||||
/// Clear all events.
|
||||
@@ -194,36 +186,6 @@ impl EventLinkedChunk {
|
||||
self.chunks.items()
|
||||
}
|
||||
|
||||
/// Return the order of an event in the room linked chunk.
|
||||
///
|
||||
/// Can return `None` if the event can't be found in the linked chunk.
|
||||
pub fn event_order(&self, event_pos: Position) -> Option<usize> {
|
||||
self.order_tracker.ordering(event_pos)
|
||||
}
|
||||
|
||||
#[cfg(any(test, debug_assertions))]
|
||||
#[allow(dead_code)] // Temporarily, until we figure out why it's crashing production builds.
|
||||
fn assert_event_ordering(&self) {
|
||||
let mut iter = self.chunks.items().enumerate();
|
||||
let Some((i, (first_event_pos, _))) = iter.next() else {
|
||||
return;
|
||||
};
|
||||
|
||||
// Sanity check.
|
||||
assert_eq!(i, 0);
|
||||
|
||||
// That's the offset in the full linked chunk. Will be 0 if the linked chunk is
|
||||
// entirely loaded, may be non-zero otherwise.
|
||||
let offset =
|
||||
self.event_order(first_event_pos).expect("first event's ordering must be known");
|
||||
|
||||
for (i, (next_pos, _)) in iter {
|
||||
let next_index =
|
||||
self.event_order(next_pos).expect("next event's ordering must be known");
|
||||
assert_eq!(offset + i, next_index, "event ordering must be continuous");
|
||||
}
|
||||
}
|
||||
|
||||
/// Get all updates from the room events as [`VectorDiff`].
|
||||
///
|
||||
/// Be careful that each `VectorDiff` is returned only once!
|
||||
@@ -232,11 +194,7 @@ impl EventLinkedChunk {
|
||||
///
|
||||
/// [`Update`]: matrix_sdk_base::linked_chunk::Update
|
||||
pub fn updates_as_vector_diffs(&mut self) -> Vec<VectorDiff<Event>> {
|
||||
let updates = self.chunks_updates_as_vectordiffs.take();
|
||||
|
||||
self.order_tracker.flush_updates(false);
|
||||
|
||||
updates
|
||||
self.chunks_updates_as_vectordiffs.take()
|
||||
}
|
||||
|
||||
/// Get a mutable reference to the [`LinkedChunk`] updates, aka
|
||||
@@ -256,8 +214,7 @@ impl EventLinkedChunk {
|
||||
let mut result = Vec::new();
|
||||
|
||||
for chunk in self.chunks.chunks() {
|
||||
let content =
|
||||
chunk_debug_string(chunk.identifier(), chunk.content(), &self.order_tracker);
|
||||
let content = chunk_debug_string(chunk.content());
|
||||
let lazy_previous = if let Some(cid) = chunk.lazy_previous() {
|
||||
format!(" (lazy previous = {})", cid.index())
|
||||
} else {
|
||||
@@ -400,28 +357,6 @@ impl EventLinkedChunk {
|
||||
|
||||
// Methods related to lazy-loading.
|
||||
impl EventLinkedChunk {
|
||||
/// Inhibits all the linked chunk updates caused by the function `f` on the
|
||||
/// ordering tracker.
|
||||
///
|
||||
/// Updates to the linked chunk that happen because of lazy loading must not
|
||||
/// be taken into account by the order tracker, otherwise the
|
||||
/// fully-loaded state (tracked by the order tracker) wouldn't match
|
||||
/// reality anymore. This provides a facility to help applying such
|
||||
/// updates.
|
||||
fn inhibit_updates_to_ordering_tracker<F: FnOnce(&mut Self) -> R, R>(&mut self, f: F) -> R {
|
||||
// Start by flushing previous pending updates to the chunk ordering, if any.
|
||||
self.order_tracker.flush_updates(false);
|
||||
|
||||
// Call the function.
|
||||
let r = f(self);
|
||||
|
||||
// Now, flush other pending updates which have been caused by the function, and
|
||||
// ignore them.
|
||||
self.order_tracker.flush_updates(true);
|
||||
|
||||
r
|
||||
}
|
||||
|
||||
/// Replace the events with the given last chunk of events and generator.
|
||||
///
|
||||
/// Happens only during lazy loading.
|
||||
@@ -433,11 +368,7 @@ impl EventLinkedChunk {
|
||||
last_chunk: Option<RawChunk<Event, Gap>>,
|
||||
chunk_identifier_generator: ChunkIdentifierGenerator,
|
||||
) -> Result<(), LazyLoaderError> {
|
||||
// Since `replace_with` is used only to unload some chunks, we don't want it to
|
||||
// affect the chunk ordering.
|
||||
self.inhibit_updates_to_ordering_tracker(move |this| {
|
||||
lazy_loader::replace_with(&mut this.chunks, last_chunk, chunk_identifier_generator)
|
||||
})
|
||||
lazy_loader::replace_with(&mut self.chunks, last_chunk, chunk_identifier_generator)
|
||||
}
|
||||
|
||||
/// Prepends a lazily-loaded chunk at the beginning of the linked chunk.
|
||||
@@ -445,20 +376,12 @@ impl EventLinkedChunk {
|
||||
&mut self,
|
||||
raw_new_first_chunk: RawChunk<Event, Gap>,
|
||||
) -> Result<(), LazyLoaderError> {
|
||||
// This is only used when reinserting a chunk that was in persisted storage, so
|
||||
// we don't need to touch the chunk ordering for this.
|
||||
self.inhibit_updates_to_ordering_tracker(move |this| {
|
||||
lazy_loader::insert_new_first_chunk(&mut this.chunks, raw_new_first_chunk)
|
||||
})
|
||||
lazy_loader::insert_new_first_chunk(&mut self.chunks, raw_new_first_chunk)
|
||||
}
|
||||
}
|
||||
|
||||
/// Create a debug string for a [`ChunkContent`] for an event/gap pair.
|
||||
fn chunk_debug_string(
|
||||
chunk_id: ChunkIdentifier,
|
||||
content: &ChunkContent<Event, Gap>,
|
||||
order_tracker: &OrderTracker<Event, Gap>,
|
||||
) -> String {
|
||||
fn chunk_debug_string(content: &ChunkContent<Event, Gap>) -> String {
|
||||
match content {
|
||||
ChunkContent::Gap(Gap { prev_token }) => {
|
||||
format!("gap['{prev_token}']")
|
||||
@@ -466,18 +389,12 @@ fn chunk_debug_string(
|
||||
ChunkContent::Items(vec) => {
|
||||
let items = vec
|
||||
.iter()
|
||||
.enumerate()
|
||||
.map(|(i, event)| {
|
||||
.map(|event| {
|
||||
event.event_id().map_or_else(
|
||||
|| "<no event id>".to_owned(),
|
||||
|id| {
|
||||
let pos = Position::new(chunk_id, i);
|
||||
let order = format!("#{}: ", order_tracker.ordering(pos).unwrap());
|
||||
|
||||
// Limit event ids to 8 chars *after* the $.
|
||||
let event_id = id.as_str().chars().take(1 + 8).collect::<String>();
|
||||
|
||||
format!("{order}{event_id}")
|
||||
id.as_str().chars().take(1 + 8).collect::<String>()
|
||||
},
|
||||
)
|
||||
})
|
||||
@@ -762,13 +679,10 @@ mod tests {
|
||||
]);
|
||||
linked_chunk.chunks.push_gap_back(Gap { prev_token: "raclette".to_owned() });
|
||||
|
||||
// Flush updates to the order tracker.
|
||||
let _ = linked_chunk.updates_as_vector_diffs();
|
||||
|
||||
let output = linked_chunk.debug_string();
|
||||
|
||||
assert_eq!(output.len(), 2);
|
||||
assert_eq!(&output[0], "chunk #0: events[#0: $12345678, #1: $2]");
|
||||
assert_eq!(&output[0], "chunk #0: events[$12345678, $2]");
|
||||
assert_eq!(&output[1], "chunk #1: gap['raclette']");
|
||||
}
|
||||
|
||||
|
||||
@@ -618,13 +618,10 @@ mod private {
|
||||
use matrix_sdk_base::{
|
||||
apply_redaction,
|
||||
deserialized_responses::{ThreadSummary, ThreadSummaryStatus, TimelineEventKind},
|
||||
event_cache::{
|
||||
store::{DynEventCacheStore, EventCacheStoreLock},
|
||||
Event, Gap,
|
||||
},
|
||||
event_cache::{store::EventCacheStoreLock, Event, Gap},
|
||||
linked_chunk::{
|
||||
lazy_loader::{self},
|
||||
ChunkContent, ChunkIdentifierGenerator, ChunkMetadata, LinkedChunkId, Position, Update,
|
||||
ChunkContent, ChunkIdentifierGenerator, LinkedChunkId, Position, Update,
|
||||
},
|
||||
serde_helpers::extract_thread_root,
|
||||
sync::Timeline,
|
||||
@@ -707,42 +704,29 @@ mod private {
|
||||
|
||||
let linked_chunk_id = LinkedChunkId::Room(&room_id);
|
||||
|
||||
// Load the full linked chunk's metadata, so as to feed the order tracker.
|
||||
//
|
||||
// If loading the full linked chunk failed, we'll clear the event cache, as it
|
||||
// indicates that at some point, there's some malformed data.
|
||||
let full_linked_chunk_metadata =
|
||||
match Self::load_linked_chunk_metadata(&*store_lock, linked_chunk_id).await {
|
||||
Ok(metas) => metas,
|
||||
Err(err) => {
|
||||
error!(
|
||||
"error when loading a linked chunk's metadata from the store: {err}"
|
||||
);
|
||||
|
||||
// Clear storage for this room.
|
||||
store_lock
|
||||
.handle_linked_chunk_updates(linked_chunk_id, vec![Update::Clear])
|
||||
.await?;
|
||||
|
||||
// Restart with an empty linked chunk.
|
||||
None
|
||||
}
|
||||
};
|
||||
|
||||
let linked_chunk = store_lock
|
||||
let linked_chunk = match store_lock
|
||||
.load_last_chunk(linked_chunk_id)
|
||||
.await
|
||||
.map_err(EventCacheError::from)
|
||||
.and_then(|(last_chunk, chunk_identifier_generator)| {
|
||||
lazy_loader::from_last_chunk(last_chunk, chunk_identifier_generator)
|
||||
.map_err(EventCacheError::from)
|
||||
})
|
||||
.expect("fully loading the linked chunk just worked, so loading it partially should also work");
|
||||
}) {
|
||||
Ok(linked_chunk) => linked_chunk,
|
||||
Err(err) => {
|
||||
error!("error when loading the linked chunk from the store: {err}");
|
||||
|
||||
let room_linked_chunk = EventLinkedChunk::with_initial_linked_chunk(
|
||||
linked_chunk,
|
||||
full_linked_chunk_metadata,
|
||||
);
|
||||
// Clear storage for this room.
|
||||
store_lock
|
||||
.handle_linked_chunk_updates(linked_chunk_id, vec![Update::Clear])
|
||||
.await?;
|
||||
|
||||
// Restart with an empty linked chunk.
|
||||
None
|
||||
}
|
||||
};
|
||||
|
||||
let room_linked_chunk = EventLinkedChunk::with_initial_linked_chunk(linked_chunk);
|
||||
|
||||
// The threads mapping is intentionally empty at start, since we're going to
|
||||
// reload threads lazily, as soon as we need to (based on external
|
||||
@@ -762,131 +746,6 @@ mod private {
|
||||
})
|
||||
}
|
||||
|
||||
/// Load a linked chunk's full metadata, making sure the chunks are
|
||||
/// according to their their links.
|
||||
///
|
||||
/// Returns `None` if there's no such linked chunk in the store, or an
|
||||
/// error if the linked chunk is malformed.
|
||||
async fn load_linked_chunk_metadata(
|
||||
store: &DynEventCacheStore,
|
||||
linked_chunk_id: LinkedChunkId<'_>,
|
||||
) -> Result<Option<Vec<ChunkMetadata>>, EventCacheError> {
|
||||
let mut all_chunks = store
|
||||
.load_all_chunks_metadata(linked_chunk_id)
|
||||
.await
|
||||
.map_err(EventCacheError::from)?;
|
||||
|
||||
if all_chunks.is_empty() {
|
||||
// There are no chunks, so there's nothing to do.
|
||||
return Ok(None);
|
||||
}
|
||||
|
||||
// Transform the vector into a hashmap, for quick lookup of the predecessors.
|
||||
let chunk_map: HashMap<_, _> =
|
||||
all_chunks.iter().map(|meta| (meta.identifier, meta)).collect();
|
||||
|
||||
// Find a last chunk.
|
||||
let mut iter = all_chunks.iter().filter(|meta| meta.next.is_none());
|
||||
let Some(last) = iter.next() else {
|
||||
return Err(EventCacheError::InvalidLinkedChunkMetadata {
|
||||
details: "no last chunk found".to_owned(),
|
||||
});
|
||||
};
|
||||
|
||||
// There must at most one last chunk.
|
||||
if let Some(other_last) = iter.next() {
|
||||
return Err(EventCacheError::InvalidLinkedChunkMetadata {
|
||||
details: format!(
|
||||
"chunks {} and {} both claim to be last chunks",
|
||||
last.identifier.index(),
|
||||
other_last.identifier.index()
|
||||
),
|
||||
});
|
||||
}
|
||||
|
||||
// Rewind the chain back to the first chunk, and do some checks at the same
|
||||
// time.
|
||||
let mut seen = HashSet::new();
|
||||
let mut current = last;
|
||||
loop {
|
||||
// If we've already seen this chunk, there's a cycle somewhere.
|
||||
if !seen.insert(current.identifier) {
|
||||
return Err(EventCacheError::InvalidLinkedChunkMetadata {
|
||||
details: format!(
|
||||
"cycle detected in linked chunk at {}",
|
||||
current.identifier.index()
|
||||
),
|
||||
});
|
||||
}
|
||||
|
||||
let Some(prev_id) = current.previous else {
|
||||
// If there's no previous chunk, we're done.
|
||||
if seen.len() != all_chunks.len() {
|
||||
return Err(EventCacheError::InvalidLinkedChunkMetadata {
|
||||
details: format!(
|
||||
"linked chunk likely has multiple components: {} chunks seen through the chain of predecessors, but {} expected",
|
||||
seen.len(),
|
||||
all_chunks.len()
|
||||
),
|
||||
});
|
||||
}
|
||||
break;
|
||||
};
|
||||
|
||||
// If the previous chunk is not in the map, then it's unknown
|
||||
// and missing.
|
||||
let Some(pred_meta) = chunk_map.get(&prev_id) else {
|
||||
return Err(EventCacheError::InvalidLinkedChunkMetadata {
|
||||
details: format!(
|
||||
"missing predecessor {} chunk for {}",
|
||||
prev_id.index(),
|
||||
current.identifier.index()
|
||||
),
|
||||
});
|
||||
};
|
||||
|
||||
// If the previous chunk isn't connected to the next, then the link is invalid.
|
||||
if pred_meta.next != Some(current.identifier) {
|
||||
return Err(EventCacheError::InvalidLinkedChunkMetadata {
|
||||
details: format!(
|
||||
"chunk {}'s next ({:?}) doesn't match the current chunk ({})",
|
||||
pred_meta.identifier.index(),
|
||||
pred_meta.next.map(|chunk_id| chunk_id.index()),
|
||||
current.identifier.index()
|
||||
),
|
||||
});
|
||||
}
|
||||
|
||||
current = *pred_meta;
|
||||
}
|
||||
|
||||
// At this point, `current` is the identifier of the first chunk.
|
||||
//
|
||||
// Reorder the resulting vector, by going through the chain of `next` links, and
|
||||
// swapping items into their final position.
|
||||
//
|
||||
// Invariant in this loop: all items in [0..i[ are in their final, correct
|
||||
// position.
|
||||
let mut current = current.identifier;
|
||||
for i in 0..all_chunks.len() {
|
||||
// Find the target metadata.
|
||||
let j = all_chunks
|
||||
.iter()
|
||||
.rev()
|
||||
.position(|meta| meta.identifier == current)
|
||||
.map(|j| all_chunks.len() - 1 - j)
|
||||
.expect("the target chunk must be present in the metadata");
|
||||
if i != j {
|
||||
all_chunks.swap(i, j);
|
||||
}
|
||||
if let Some(next) = all_chunks[i].next {
|
||||
current = next;
|
||||
}
|
||||
}
|
||||
|
||||
Ok(Some(all_chunks))
|
||||
}
|
||||
|
||||
/// Given a fully-loaded linked chunk with no gaps, return the
|
||||
/// [`LoadMoreEventsBackwardsOutcome`] expected for this room's cache.
|
||||
fn conclude_load_more_for_fully_loaded_chunk(&mut self) -> LoadMoreEventsBackwardsOutcome {
|
||||
@@ -1083,10 +942,6 @@ mod private {
|
||||
Ok(self.room_linked_chunk.updates_as_vector_diffs())
|
||||
}
|
||||
|
||||
pub(crate) fn room_event_order(&self, event_pos: Position) -> Option<usize> {
|
||||
self.room_linked_chunk.event_order(event_pos)
|
||||
}
|
||||
|
||||
/// Removes the bundled relations from an event, if they were present.
|
||||
///
|
||||
/// Only replaces the present if it contained bundled relations.
|
||||
@@ -1191,7 +1046,6 @@ mod private {
|
||||
&mut self,
|
||||
updates: Vec<Update<Event, Gap>>,
|
||||
) -> Result<(), EventCacheError> {
|
||||
self.room_linked_chunk.order_tracker.map_updates(&updates);
|
||||
self.send_updates_to_store(updates).await
|
||||
}
|
||||
|
||||
@@ -1388,32 +1242,6 @@ mod private {
|
||||
|
||||
trace!(num_related = %related.len(), num_iters, "computed transitive closure of related events");
|
||||
|
||||
// Sort the results by their positions in the linked chunk, if available.
|
||||
//
|
||||
// If an event doesn't have a known position, it goes to the start of the array.
|
||||
related.sort_by(|(_, lhs), (_, rhs)| {
|
||||
use std::cmp::Ordering;
|
||||
match (lhs, rhs) {
|
||||
(None, None) => Ordering::Equal,
|
||||
(None, Some(_)) => Ordering::Less,
|
||||
(Some(_), None) => Ordering::Greater,
|
||||
(Some(lhs), Some(rhs)) => {
|
||||
let lhs = self.room_event_order(*lhs);
|
||||
let rhs = self.room_event_order(*rhs);
|
||||
|
||||
// The events should have a definite position, but in the case they don't,
|
||||
// still consider that not having a position means you'll end at the start
|
||||
// of the array.
|
||||
match (lhs, rhs) {
|
||||
(None, None) => Ordering::Equal,
|
||||
(None, Some(_)) => Ordering::Less,
|
||||
(Some(_), None) => Ordering::Greater,
|
||||
(Some(lhs), Some(rhs)) => lhs.cmp(&rhs),
|
||||
}
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
// Keep only the events, not their positions.
|
||||
let related = related.into_iter().map(|(event, _pos)| event).collect();
|
||||
|
||||
@@ -2977,141 +2805,6 @@ mod timed_tests {
|
||||
assert!(outcome.reached_start);
|
||||
}
|
||||
|
||||
#[async_test]
|
||||
async fn test_room_ordering() {
|
||||
let room_id = room_id!("!galette:saucisse.bzh");
|
||||
|
||||
let client = MockClientBuilder::new(None).build().await;
|
||||
|
||||
let f = EventFactory::new().room(room_id).sender(*ALICE);
|
||||
|
||||
let evid1 = event_id!("$1");
|
||||
let evid2 = event_id!("$2");
|
||||
let evid3 = event_id!("$3");
|
||||
|
||||
let ev1 = f.text_msg("hello world").event_id(evid1).into_event();
|
||||
let ev2 = f.text_msg("howdy").sender(*BOB).event_id(evid2).into_event();
|
||||
let ev3 = f.text_msg("yo").event_id(evid3).into_event();
|
||||
|
||||
// Fill the event cache store with an initial linked chunk with 2 events chunks.
|
||||
{
|
||||
let store = client.event_cache_store();
|
||||
let store = store.lock().await.unwrap();
|
||||
store
|
||||
.handle_linked_chunk_updates(
|
||||
LinkedChunkId::Room(room_id),
|
||||
vec![
|
||||
Update::NewItemsChunk {
|
||||
previous: None,
|
||||
new: ChunkIdentifier::new(0),
|
||||
next: None,
|
||||
},
|
||||
Update::PushItems {
|
||||
at: Position::new(ChunkIdentifier::new(0), 0),
|
||||
items: vec![ev1, ev2],
|
||||
},
|
||||
Update::NewItemsChunk {
|
||||
previous: Some(ChunkIdentifier::new(0)),
|
||||
new: ChunkIdentifier::new(1),
|
||||
next: None,
|
||||
},
|
||||
Update::PushItems {
|
||||
at: Position::new(ChunkIdentifier::new(1), 0),
|
||||
items: vec![ev3.clone()],
|
||||
},
|
||||
],
|
||||
)
|
||||
.await
|
||||
.unwrap();
|
||||
}
|
||||
|
||||
let event_cache = client.event_cache();
|
||||
event_cache.subscribe().unwrap();
|
||||
|
||||
client.base_client().get_or_create_room(room_id, matrix_sdk_base::RoomState::Joined);
|
||||
let room = client.get_room(room_id).unwrap();
|
||||
let (room_event_cache, _drop_handles) = room.event_cache().await.unwrap();
|
||||
|
||||
// Initially, the linked chunk only contains the last chunk, so only ev3 is
|
||||
// loaded.
|
||||
{
|
||||
let state = room_event_cache.inner.state.read().await;
|
||||
|
||||
// But we can get the order of ev1.
|
||||
assert_eq!(state.room_event_order(Position::new(ChunkIdentifier::new(0), 0)), Some(0));
|
||||
|
||||
// And that of ev2 as well.
|
||||
assert_eq!(state.room_event_order(Position::new(ChunkIdentifier::new(0), 1)), Some(1));
|
||||
|
||||
// ev3, which is loaded, also has a known ordering.
|
||||
let mut events = state.room_linked_chunk().events();
|
||||
let (pos, ev) = events.next().unwrap();
|
||||
assert_eq!(pos, Position::new(ChunkIdentifier::new(1), 0));
|
||||
assert_eq!(ev.event_id().as_deref(), Some(evid3));
|
||||
assert_eq!(state.room_event_order(pos), Some(2));
|
||||
|
||||
// No other loaded events.
|
||||
assert!(events.next().is_none());
|
||||
}
|
||||
|
||||
// Force loading the full linked chunk by back-paginating.
|
||||
let outcome = room_event_cache.pagination().run_backwards_once(20).await.unwrap();
|
||||
assert!(outcome.reached_start);
|
||||
|
||||
// All events are now loaded, so their order is precisely their enumerated index
|
||||
// in a linear iteration.
|
||||
{
|
||||
let state = room_event_cache.inner.state.read().await;
|
||||
for (i, (pos, _)) in state.room_linked_chunk().events().enumerate() {
|
||||
assert_eq!(state.room_event_order(pos), Some(i));
|
||||
}
|
||||
}
|
||||
|
||||
// Handle a gappy sync with two events (including one duplicate, so
|
||||
// deduplication kicks in), so that the linked chunk is shrunk to the
|
||||
// last chunk, and that the linked chunk only contains the last two
|
||||
// events.
|
||||
let evid4 = event_id!("$4");
|
||||
room_event_cache
|
||||
.inner
|
||||
.handle_joined_room_update(JoinedRoomUpdate {
|
||||
timeline: Timeline {
|
||||
limited: true,
|
||||
prev_batch: Some("fondue".to_owned()),
|
||||
events: vec![ev3, f.text_msg("sup").event_id(evid4).into_event()],
|
||||
},
|
||||
..Default::default()
|
||||
})
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
{
|
||||
let state = room_event_cache.inner.state.read().await;
|
||||
|
||||
// After the shrink, only evid3 and evid4 are loaded.
|
||||
let mut events = state.room_linked_chunk().events();
|
||||
|
||||
let (pos, ev) = events.next().unwrap();
|
||||
assert_eq!(ev.event_id().as_deref(), Some(evid3));
|
||||
assert_eq!(state.room_event_order(pos), Some(2));
|
||||
|
||||
let (pos, ev) = events.next().unwrap();
|
||||
assert_eq!(ev.event_id().as_deref(), Some(evid4));
|
||||
assert_eq!(state.room_event_order(pos), Some(3));
|
||||
|
||||
// No other loaded events.
|
||||
assert!(events.next().is_none());
|
||||
|
||||
// But we can still get the order of previous events.
|
||||
assert_eq!(state.room_event_order(Position::new(ChunkIdentifier::new(0), 0)), Some(0));
|
||||
assert_eq!(state.room_event_order(Position::new(ChunkIdentifier::new(0), 1)), Some(1));
|
||||
|
||||
// ev3 doesn't have an order with its previous position, since it's been
|
||||
// deduplicated.
|
||||
assert_eq!(state.room_event_order(Position::new(ChunkIdentifier::new(1), 0)), None);
|
||||
}
|
||||
}
|
||||
|
||||
#[async_test]
|
||||
async fn test_auto_shrink_after_all_subscribers_are_gone() {
|
||||
let room_id = room_id!("!galette:saucisse.bzh");
|
||||
|
||||
@@ -28,10 +28,7 @@ use matrix_sdk_test::{
|
||||
};
|
||||
use ruma::{
|
||||
event_id,
|
||||
events::{
|
||||
room::message::RoomMessageEventContentWithoutRelation, AnySyncMessageLikeEvent,
|
||||
AnySyncTimelineEvent, TimelineEventType,
|
||||
},
|
||||
events::{AnySyncMessageLikeEvent, AnySyncTimelineEvent, TimelineEventType},
|
||||
room_id, user_id, EventId, RoomVersionId,
|
||||
};
|
||||
use serde_json::json;
|
||||
@@ -2525,162 +2522,3 @@ async fn test_sync_while_back_paginate() {
|
||||
|
||||
assert!(subscriber.is_empty());
|
||||
}
|
||||
|
||||
#[async_test]
|
||||
async fn test_relations_ordering() {
|
||||
let server = MatrixMockServer::new().await;
|
||||
|
||||
let room_id = room_id!("!galette:saucisse.bzh");
|
||||
let f = EventFactory::new().room(room_id).sender(*ALICE);
|
||||
|
||||
let target_event_id = event_id!("$1");
|
||||
|
||||
// Start with a prefilled event cache store that includes the target event.
|
||||
let ev1 = f.text_msg("bonjour monde").event_id(target_event_id).into_event();
|
||||
|
||||
let event_cache_store = Arc::new(MemoryStore::new());
|
||||
event_cache_store
|
||||
.handle_linked_chunk_updates(
|
||||
LinkedChunkId::Room(room_id),
|
||||
vec![
|
||||
// An empty items chunk.
|
||||
Update::NewItemsChunk { previous: None, new: ChunkIdentifier::new(0), next: None },
|
||||
Update::PushItems {
|
||||
at: Position::new(ChunkIdentifier::new(0), 0),
|
||||
items: vec![ev1.clone()],
|
||||
},
|
||||
],
|
||||
)
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
let client = server
|
||||
.client_builder()
|
||||
.store_config(
|
||||
StoreConfig::new("hodlor".to_owned()).event_cache_store(event_cache_store.clone()),
|
||||
)
|
||||
.build()
|
||||
.await;
|
||||
|
||||
let event_cache = client.event_cache();
|
||||
event_cache.subscribe().unwrap();
|
||||
|
||||
let room = server.sync_joined_room(&client, room_id).await;
|
||||
|
||||
let (room_event_cache, _drop_handles) = room.event_cache().await.unwrap();
|
||||
|
||||
let (initial_events, mut listener) = room_event_cache.subscribe().await;
|
||||
assert_eq!(initial_events.len(), 1);
|
||||
assert!(listener.recv().now_or_never().is_none());
|
||||
|
||||
// Sanity check: there are no relations for the target event yet.
|
||||
let (_, relations) =
|
||||
room_event_cache.find_event_with_relations(target_event_id, None).await.unwrap();
|
||||
assert!(relations.is_empty());
|
||||
|
||||
let edit2 = event_id!("$edit2");
|
||||
let ev2 = f
|
||||
.text_msg("* hola mundo")
|
||||
.edit(target_event_id, RoomMessageEventContentWithoutRelation::text_plain("hola mundo"))
|
||||
.event_id(edit2)
|
||||
.into_raw();
|
||||
|
||||
let edit3 = event_id!("$edit3");
|
||||
let ev3 = f
|
||||
.text_msg("* ciao mondo")
|
||||
.edit(target_event_id, RoomMessageEventContentWithoutRelation::text_plain("ciao mondo"))
|
||||
.event_id(edit3)
|
||||
.into_raw();
|
||||
|
||||
let edit4 = event_id!("$edit4");
|
||||
let ev4 = f
|
||||
.text_msg("* hello world")
|
||||
.edit(target_event_id, RoomMessageEventContentWithoutRelation::text_plain("hello world"))
|
||||
.event_id(edit4)
|
||||
.into_raw();
|
||||
|
||||
// We receive two edit events via sync, as well as a gap; this will shrink the
|
||||
// linked chunk.
|
||||
server
|
||||
.sync_room(
|
||||
&client,
|
||||
JoinedRoomBuilder::new(room_id)
|
||||
.add_timeline_event(ev3.clone())
|
||||
.add_timeline_event(ev4.clone())
|
||||
.set_timeline_limited()
|
||||
.set_timeline_prev_batch("prev_batch"),
|
||||
)
|
||||
.await;
|
||||
|
||||
// Wait for the listener to tell us we've received something.
|
||||
loop {
|
||||
assert_let_timeout!(
|
||||
Ok(RoomEventCacheUpdate::UpdateTimelineEvents { diffs, .. }) = listener.recv()
|
||||
);
|
||||
// We've received the shrink.
|
||||
if diffs.iter().any(|diff| matches!(diff, VectorDiff::Clear)) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
// At this point, relations are known for the target event.
|
||||
let (_, relations) =
|
||||
room_event_cache.find_event_with_relations(target_event_id, None).await.unwrap();
|
||||
assert_eq!(relations.len(), 2);
|
||||
// And the edit events are correctly ordered according to their position in the
|
||||
// linked chunk.
|
||||
assert_eq!(relations[0].event_id().unwrap(), edit3);
|
||||
assert_eq!(relations[1].event_id().unwrap(), edit4);
|
||||
|
||||
// Now, we resolve the gap; this returns ev2, another edit.
|
||||
server
|
||||
.mock_room_messages()
|
||||
.match_from("prev_batch")
|
||||
.ok(RoomMessagesResponseTemplate::default().events(vec![ev2.clone()]))
|
||||
.named("room/messages")
|
||||
.mock_once()
|
||||
.mount()
|
||||
.await;
|
||||
|
||||
// Run the pagination.
|
||||
let outcome = room_event_cache.pagination().run_backwards_once(1).await.unwrap();
|
||||
assert!(outcome.reached_start.not());
|
||||
assert_eq!(outcome.events.len(), 1);
|
||||
|
||||
{
|
||||
// Sanity check: we load the first chunk with the first event, from disk, and
|
||||
// reach the start of the timeline.
|
||||
let outcome = room_event_cache.pagination().run_backwards_once(1).await.unwrap();
|
||||
assert!(outcome.reached_start);
|
||||
}
|
||||
|
||||
// Relations are returned accordingly.
|
||||
let (_, relations) =
|
||||
room_event_cache.find_event_with_relations(target_event_id, None).await.unwrap();
|
||||
assert_eq!(relations.len(), 3);
|
||||
assert_eq!(relations[0].event_id().unwrap(), edit2);
|
||||
assert_eq!(relations[1].event_id().unwrap(), edit3);
|
||||
assert_eq!(relations[2].event_id().unwrap(), edit4);
|
||||
|
||||
// If I save an additional event without storing it in the linked chunk, it will
|
||||
// be present at the start of the relations list.
|
||||
let edit5 = event_id!("$edit5");
|
||||
let ev5 = f
|
||||
.text_msg("* hallo Welt")
|
||||
.edit(target_event_id, RoomMessageEventContentWithoutRelation::text_plain("hallo Welt"))
|
||||
.event_id(edit5)
|
||||
.into_event();
|
||||
|
||||
server.mock_room_event().ok(ev5).mock_once().mount().await;
|
||||
|
||||
// This saves the event, but without a position.
|
||||
room.event(edit5, None).await.unwrap();
|
||||
|
||||
let (_, relations) =
|
||||
room_event_cache.find_event_with_relations(target_event_id, None).await.unwrap();
|
||||
assert_eq!(relations.len(), 4);
|
||||
assert_eq!(relations[0].event_id().unwrap(), edit5);
|
||||
assert_eq!(relations[1].event_id().unwrap(), edit2);
|
||||
assert_eq!(relations[2].event_id().unwrap(), edit3);
|
||||
assert_eq!(relations[3].event_id().unwrap(), edit4);
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user