diff --git a/crates/matrix-sdk/src/event_cache/room/mod.rs b/crates/matrix-sdk/src/event_cache/room/mod.rs index e7b99e456..80c7ff87e 100644 --- a/crates/matrix-sdk/src/event_cache/room/mod.rs +++ b/crates/matrix-sdk/src/event_cache/room/mod.rs @@ -1447,7 +1447,9 @@ mod private { fn get_or_reload_thread(&mut self, root_event_id: OwnedEventId) -> &mut ThreadEventCache { // TODO: when there's persistent storage, try to lazily reload from disk, if // missing from memory. - self.threads.entry(root_event_id).or_insert_with(ThreadEventCache::new) + self.threads + .entry(root_event_id.clone()) + .or_insert_with(|| ThreadEventCache::new(root_event_id)) } #[instrument(skip_all)] diff --git a/crates/matrix-sdk/src/event_cache/room/threads.rs b/crates/matrix-sdk/src/event_cache/room/threads.rs index 443266854..c8c9290f8 100644 --- a/crates/matrix-sdk/src/event_cache/room/threads.rs +++ b/crates/matrix-sdk/src/event_cache/room/threads.rs @@ -42,6 +42,10 @@ pub struct ThreadEventCacheUpdate { /// All the information related to a single thread. pub(crate) struct ThreadEventCache { + /// The ID of the thread root event, which is the first event in the thread + /// (and eventually the first in the linked chunk). + thread_root: OwnedEventId, + /// The linked chunk for this thread. chunk: EventLinkedChunk, @@ -51,8 +55,8 @@ pub(crate) struct ThreadEventCache { impl ThreadEventCache { /// Create a new empty thread event cache. - pub fn new() -> Self { - Self { chunk: EventLinkedChunk::new(), sender: Sender::new(32) } + pub fn new(thread_root: OwnedEventId) -> Self { + Self { chunk: EventLinkedChunk::new(), sender: Sender::new(32), thread_root } } /// Subscribe to live events from this thread. @@ -113,10 +117,16 @@ impl ThreadEventCache { return LoadMoreEventsBackwardsOutcome::Gap { prev_token: Some(prev_token) }; } - // If we don't have any gap anymore, but we do have events, then we're done. - if self.chunk.events().next().is_some() { - trace!("thread chunk is fully loaded and non-empty: reached_start=true"); - return LoadMoreEventsBackwardsOutcome::StartOfTimeline; + // If we don't have a gap, then the first event should be the the thread's root; + // otherwise, we'll restart a pagination from the end. + if let Some((_pos, event)) = self.chunk.events().next() { + let first_event_id = + event.event_id().expect("a linked chunk only stores events with IDs"); + + if first_event_id == self.thread_root { + trace!("thread chunk is fully loaded and non-empty: reached_start=true"); + return LoadMoreEventsBackwardsOutcome::StartOfTimeline; + } } // Otherwise, we don't have a gap nor events. We don't have anything. Poor us. diff --git a/crates/matrix-sdk/tests/integration/event_cache.rs b/crates/matrix-sdk/tests/integration/event_cache.rs index 60b727ad0..9a7975f48 100644 --- a/crates/matrix-sdk/tests/integration/event_cache.rs +++ b/crates/matrix-sdk/tests/integration/event_cache.rs @@ -10,12 +10,13 @@ use matrix_sdk::{ deserialized_responses::TimelineEvent, event_cache::{ BackPaginationOutcome, EventCacheError, RoomEventCacheUpdate, RoomPaginationStatus, + ThreadEventCacheUpdate, }, linked_chunk::{ChunkIdentifier, LinkedChunkId, Position, Update}, store::StoreConfig, test_utils::{ assert_event_matches_msg, - mocks::{MatrixMockServer, RoomMessagesResponseTemplate}, + mocks::{MatrixMockServer, RoomMessagesResponseTemplate, RoomRelationsResponseTemplate}, }, }; use matrix_sdk_base::event_cache::{ @@ -2682,3 +2683,78 @@ async fn test_relations_ordering() { assert_eq!(relations[2].event_id().unwrap(), edit3); assert_eq!(relations[3].event_id().unwrap(), edit4); } + +#[async_test] +async fn test_thread_can_paginate_even_if_seen_sync_event() { + let server = MatrixMockServer::new().await; + let client = server.client_builder().build().await; + + let room_id = room_id!("!galette:saucisse.bzh"); + + let event_cache = client.event_cache(); + event_cache.subscribe().unwrap(); + + let thread_root_id = event_id!("$thread_root"); + let thread_resp_id = event_id!("$thread_resp"); + + // Receive an in-thread event. + let f = EventFactory::new().room(room_id).sender(*ALICE); + let room = server + .sync_room( + &client, + JoinedRoomBuilder::new(room_id).add_timeline_event( + f.text_msg("that's a good point") + .in_thread(thread_root_id, thread_root_id) + .event_id(thread_resp_id), + ), + ) + .await; + + let (room_event_cache, _drop_handles) = room.event_cache().await.unwrap(); + + let (mut thread_events, mut thread_stream) = + room_event_cache.subscribe_to_thread(thread_root_id.to_owned()).await; + + // Sanity check: the sync event is added to the thread. This is racy because the + // update might not have been handled by the event cache yet. + let first_event = if thread_events.is_empty() { + assert_let_timeout!(Ok(ThreadEventCacheUpdate { diffs, .. }) = thread_stream.recv()); + assert_eq!(diffs.len(), 1); + let mut diffs = diffs; + assert_let!(VectorDiff::Append { values } = diffs.remove(0)); + assert_eq!(values.len(), 1); + let mut values = values; + values.remove(0) + } else { + assert_eq!(thread_events.len(), 1); + thread_events.remove(0) + }; + assert_eq!(first_event.event_id().as_deref(), Some(thread_resp_id)); + + // It's possible to paginate the thread, and this will push the thread root + // because there's no prev-batch token. + server + .mock_room_relations() + .match_target_event(thread_root_id.to_owned()) + .ok(RoomRelationsResponseTemplate::default()) + .mock_once() + .mount() + .await; + + server + .mock_room_event() + .match_event_id() + .ok(f.text_msg("Thread root").event_id(thread_root_id).into()) + .mock_once() + .mount() + .await; + + let hit_start = + room_event_cache.paginate_thread_backwards(thread_root_id.to_owned(), 42).await.unwrap(); + assert!(hit_start); + + assert_let_timeout!(Ok(ThreadEventCacheUpdate { diffs, .. }) = thread_stream.recv()); + assert_eq!(diffs.len(), 1); + assert_let!(VectorDiff::Insert { index: 0, value } = &diffs[0]); + assert_eq!(value.event_id().as_deref(), Some(thread_root_id)); +}