diff --git a/bindings/matrix-sdk-ffi/src/client_builder.rs b/bindings/matrix-sdk-ffi/src/client_builder.rs index 535d33479..254c9392a 100644 --- a/bindings/matrix-sdk-ffi/src/client_builder.rs +++ b/bindings/matrix-sdk-ffi/src/client_builder.rs @@ -288,10 +288,6 @@ pub struct ClientBuilder { room_key_recipient_strategy: CollectStrategy, decryption_trust_requirement: TrustRequirement, request_config: Option, - - /// Whether to enable use of the event cache store, for reloading events - /// when building timelines et al. - use_event_cache_persistent_storage: bool, } #[matrix_sdk_ffi_macros::export] @@ -326,27 +322,9 @@ impl ClientBuilder { room_key_recipient_strategy: Default::default(), decryption_trust_requirement: TrustRequirement::Untrusted, request_config: Default::default(), - use_event_cache_persistent_storage: false, }) } - /// Whether to use the event cache persistent storage or not. - /// - /// This is a temporary feature flag, for testing the event cache's - /// persistent storage. Follow new developments in https://github.com/matrix-org/matrix-rust-sdk/issues/3280. - /// - /// This is disabled by default. When disabled, a one-time cleanup is - /// performed when creating the client, and it will clear all the events - /// previously stored in the event cache. - /// - /// When enabled, it will attempt to store events in the event cache as - /// they're received, and reuse them when reconstructing timelines. - pub fn use_event_cache_persistent_storage(self: Arc, value: bool) -> Arc { - let mut builder = unwrap_or_clone_arc(self); - builder.use_event_cache_persistent_storage = value; - Arc::new(builder) - } - pub fn cross_process_store_locks_holder_name( self: Arc, holder_name: String, @@ -739,19 +717,6 @@ impl ClientBuilder { let sdk_client = inner_builder.build().await?; - if builder.use_event_cache_persistent_storage { - // Enable the persistent storage \o/ - sdk_client.event_cache().enable_storage()?; - } else { - // Get rid of all the previous events, if any. - let store = sdk_client - .event_cache_store() - .lock() - .await - .map_err(EventCacheError::LockingStorage)?; - store.clear_all_rooms_chunks().await.map_err(EventCacheError::Storage)?; - } - Ok(Arc::new( Client::new(sdk_client, builder.enable_oidc_refresh_lock, builder.session_delegate) .await?, diff --git a/crates/matrix-sdk-ui/tests/integration/timeline/decryption.rs b/crates/matrix-sdk-ui/tests/integration/timeline/decryption.rs index 8fe7abcfb..f1b4de4a9 100644 --- a/crates/matrix-sdk-ui/tests/integration/timeline/decryption.rs +++ b/crates/matrix-sdk-ui/tests/integration/timeline/decryption.rs @@ -134,7 +134,6 @@ async fn test_an_utd_from_the_event_cache_as_an_initial_item_is_decrypted() { // Set up the event cache. let event_cache = client.event_cache(); event_cache.subscribe().unwrap(); - event_cache.enable_storage().unwrap(); let room = mock_server.sync_joined_room(&client, room_id).await; let timeline = room.timeline().await.unwrap(); @@ -289,7 +288,6 @@ async fn test_an_utd_from_the_event_cache_as_a_paginated_item_is_decrypted() { // Set up the event cache. let event_cache = client.event_cache(); event_cache.subscribe().unwrap(); - event_cache.enable_storage().unwrap(); let room = mock_server.sync_joined_room(&client, room_id).await; let timeline = room.timeline().await.unwrap(); diff --git a/crates/matrix-sdk-ui/tests/integration/timeline/mod.rs b/crates/matrix-sdk-ui/tests/integration/timeline/mod.rs index 898cb04b4..592a1ab99 100644 --- a/crates/matrix-sdk-ui/tests/integration/timeline/mod.rs +++ b/crates/matrix-sdk-ui/tests/integration/timeline/mod.rs @@ -749,7 +749,6 @@ async fn test_timeline_receives_a_limited_number_of_events_when_subscribing() { // Set up the event cache. let event_cache = client.event_cache(); event_cache.subscribe().unwrap(); - event_cache.enable_storage().unwrap(); let room = client.get_room(room_id).unwrap(); diff --git a/crates/matrix-sdk-ui/tests/integration/timeline/pagination.rs b/crates/matrix-sdk-ui/tests/integration/timeline/pagination.rs index 001ef621d..ffadca16a 100644 --- a/crates/matrix-sdk-ui/tests/integration/timeline/pagination.rs +++ b/crates/matrix-sdk-ui/tests/integration/timeline/pagination.rs @@ -267,9 +267,6 @@ async fn test_wait_for_token() { let room_id = room_id!("!a98sd12bjh:example.org"); let (client, server) = logged_in_client_with_server().await; - client.event_cache().subscribe().unwrap(); - client.event_cache().enable_storage().unwrap(); - let sync_settings = SyncSettings::new().timeout(Duration::from_millis(3000)); let f = EventFactory::new(); @@ -309,6 +306,7 @@ async fn test_wait_for_token() { let paginate = async { timeline.paginate_backwards(10).await.unwrap(); }; + let observe_paginating = async { assert_eq!(back_pagination_status.next().await, Some(RoomPaginationStatus::Paginating)); assert_eq!( @@ -316,11 +314,13 @@ async fn test_wait_for_token() { Some(RoomPaginationStatus::Idle { hit_timeline_start: false }) ); }; + let sync = async { // Make sure syncing starts a little bit later than pagination sleep(Duration::from_millis(100)).await; client.sync_once(sync_settings.clone()).await.unwrap(); }; + timeout(Duration::from_secs(4), join3(paginate, observe_paginating, sync)).await.unwrap(); // Make sure pagination was called (with the right parameters) @@ -405,9 +405,10 @@ async fn test_timeline_reset_while_paginating() { let room = client.get_room(room_id).unwrap(); let timeline = Arc::new(room.timeline().await.unwrap()); + let alice_event = f.text_msg("live event!").sender(&ALICE).room(room_id).into_raw(); sync_builder.add_joined_room( JoinedRoomBuilder::new(room_id) - .add_timeline_event(f.text_msg("live event!").sender(&ALICE)) + .add_timeline_event(alice_event.clone()) .set_timeline_prev_batch("pagination_1".to_owned()) .set_timeline_limited(), ); @@ -425,7 +426,10 @@ async fn test_timeline_reset_while_paginating() { ); mock_sync(&server, sync_builder.build_json_sync_response(), None).await; - // pagination with first token + // The pagination with the first token will be hit twice: + // - first, before the sync response comes, then the gap is stored in the cache. + // - second, after all other gaps have been resolved, we get back to resolving + // this one. Mock::given(method("GET")) .and(path_regex(r"^/_matrix/client/r0/rooms/.*/messages$")) .and(header("authorization", "Bearer 1234")) @@ -435,23 +439,25 @@ async fn test_timeline_reset_while_paginating() { .set_body_json(json!({ "chunk": [], "start": "pagination_1", - "end": "some_other_token", + "end": "pagination_3", })) // Make sure the concurrent sync request returns first .set_delay(Duration::from_millis(200)), ) - .expect(1) + .expect(2) .named("pagination_1") .mount(&server) .await; - // pagination with second token + // The pagination with the second token must return Alice's event, to be + // consistent with the room's timeline (Alice's event was observed before + // Bob's event). Mock::given(method("GET")) .and(path_regex(r"^/_matrix/client/r0/rooms/.*/messages$")) .and(header("authorization", "Bearer 1234")) .and(query_param("from", "pagination_2")) .respond_with(ResponseTemplate::new(200).set_body_json(json!({ - "chunk": [], + "chunk": [alice_event], "start": "pagination_2", }))) .expect(1) @@ -459,12 +465,36 @@ async fn test_timeline_reset_while_paginating() { .mount(&server) .await; + // pagination with third token + Mock::given(method("GET")) + .and(path_regex(r"^/_matrix/client/r0/rooms/.*/messages$")) + .and(header("authorization", "Bearer 1234")) + .and(query_param("from", "pagination_3")) + .respond_with(ResponseTemplate::new(200).set_body_json(json!({ + "chunk": [], + "start": "pagination_3", + }))) + .expect(1) + .named("pagination_3") + .mount(&server) + .await; + let (_, mut back_pagination_status) = timeline.live_back_pagination_status().await.unwrap(); - let paginate = async { timeline.paginate_backwards(10).await.unwrap() }; + let paginate = async { + let mut hit_start; + loop { + hit_start = timeline.paginate_backwards(10).await.unwrap(); + if hit_start { + break; + } + } + hit_start + }; let observe_paginating = async { let mut seen_paginating = false; + let mut seen_idle_no_start = false; // Observe paginating updates: we want to make sure we see at least once // Paginating, and that it settles with Idle. @@ -476,12 +506,16 @@ async fn test_timeline_reset_while_paginating() { if state == RoomPaginationStatus::Paginating { seen_paginating = true; } + if matches!(state, RoomPaginationStatus::Idle { hit_timeline_start: false }) { + seen_idle_no_start = true; + } } None => break, } } assert!(seen_paginating); + assert!(seen_idle_no_start); let (status, _) = timeline.live_back_pagination_status().await.unwrap(); @@ -502,10 +536,10 @@ async fn test_timeline_reset_while_paginating() { assert!(hit_start); // No events in back-pagination responses, start of timeline + date divider + - // event from latest sync is present - assert_eq!(timeline.items().await.len(), 3); + // all events from the previous syncs are present. + assert_eq!(timeline.items().await.len(), 4); - // Make sure both pagination mocks were called + // Make sure both pagination mocks were called. server.verify().await; } @@ -1085,6 +1119,8 @@ async fn test_lazy_back_pagination() { let hit_end_of_timeline = timeline.paginate_backwards(5).await.unwrap(); + // There was no previous-batch token in the previous /messages response, so + // we've hit the start of the timeline. assert!(hit_end_of_timeline); // The start of the timeline is inserted as its own timeline update. @@ -1097,12 +1133,12 @@ async fn test_lazy_back_pagination() { // // They are inserted after the date divider and start of timeline, hence the // indices 2 and 3. - assert_timeline_stream! { [timeline_stream] insert[2] "$ev201"; insert[3] "$ev200"; }; + // So cool. assert_pending!(timeline_stream); diff --git a/crates/matrix-sdk-ui/tests/integration/timeline/pinned_event.rs b/crates/matrix-sdk-ui/tests/integration/timeline/pinned_event.rs index 280067060..19119e926 100644 --- a/crates/matrix-sdk-ui/tests/integration/timeline/pinned_event.rs +++ b/crates/matrix-sdk-ui/tests/integration/timeline/pinned_event.rs @@ -242,7 +242,6 @@ async fn test_cached_events_are_kept_for_different_room_instances() { // Subscribe to the event cache. let event_cache = client.event_cache(); event_cache.subscribe().unwrap(); - event_cache.enable_storage().unwrap(); let f = EventFactory::new().room(room_id).sender(*BOB); let pinned_event = f diff --git a/crates/matrix-sdk/src/event_cache/mod.rs b/crates/matrix-sdk/src/event_cache/mod.rs index fadc5bc45..5e05438a1 100644 --- a/crates/matrix-sdk/src/event_cache/mod.rs +++ b/crates/matrix-sdk/src/event_cache/mod.rs @@ -168,23 +168,6 @@ impl EventCache { } } - /// Enable storing updates to storage, and reload events from storage. - /// - /// Has an effect only the first time it's called. It's safe to call it - /// multiple times. - pub fn enable_storage(&self) -> Result<()> { - let _ = self.inner.store.get_or_try_init::<_, EventCacheError>(|| { - let client = self.inner.client()?; - Ok(client.event_cache_store().clone()) - })?; - Ok(()) - } - - /// Check whether the storage is enabled or not. - pub fn has_storage(&self) -> bool { - self.inner.has_storage() - } - /// Starts subscribing the [`EventCache`] to sync responses, if not done /// before. /// @@ -193,6 +176,13 @@ impl EventCache { pub fn subscribe(&self) -> Result<()> { let client = self.inner.client()?; + // Initialize storage. + let _ = self.inner.store.get_or_try_init::<_, EventCacheError>(|| { + let client = self.inner.client()?; + Ok(client.event_cache_store().clone()) + })?; + + // Initialize the drop handles. let _ = self.inner.drop_handles.get_or_init(|| { // Spawn the task that will listen to all the room updates at once. let listen_updates_task = spawn(Self::listen_task( @@ -411,11 +401,6 @@ impl EventCacheInner { self.client.get().ok_or(EventCacheError::ClientDropped) } - /// Has persistent storage been enabled for the event cache? - fn has_storage(&self) -> bool { - self.store.get().is_some() - } - /// Clears all the room's data. async fn clear_all_rooms(&self) -> Result<()> { // Okay, here's where things get complicated. @@ -499,9 +484,7 @@ impl EventCacheInner { for (room_id, left_room_update) in updates.left { let room = self.for_room(&room_id).await?; - if let Err(err) = - room.inner.handle_left_room_update(self.has_storage(), left_room_update).await - { + if let Err(err) = room.inner.handle_left_room_update(left_room_update).await { // Non-fatal error, try to continue to the next room. error!("handling left room update: {err}"); } @@ -511,9 +494,7 @@ impl EventCacheInner { for (room_id, joined_room_update) in updates.joined { let room = self.for_room(&room_id).await?; - if let Err(err) = - room.inner.handle_joined_room_update(self.has_storage(), joined_room_update).await - { + if let Err(err) = room.inner.handle_joined_room_update(joined_room_update).await { // Non-fatal error, try to continue to the next room. error!(%room_id, "handling joined room update: {err}"); } @@ -716,10 +697,7 @@ mod tests { room_event_cache .inner - .handle_joined_room_update( - event_cache.inner.has_storage(), - JoinedRoomUpdate { account_data, ..Default::default() }, - ) + .handle_joined_room_update(JoinedRoomUpdate { account_data, ..Default::default() }) .await .unwrap(); @@ -798,7 +776,6 @@ mod tests { let event_cache = client.event_cache(); event_cache.subscribe().unwrap(); - event_cache.enable_storage().unwrap(); let f = EventFactory::new().room(room_id).sender(user_id!("@ben:saucisse.bzh")); let event_id = event_id!("$1"); diff --git a/crates/matrix-sdk/src/event_cache/room/mod.rs b/crates/matrix-sdk/src/event_cache/room/mod.rs index 4d5ebbbde..585276d84 100644 --- a/crates/matrix-sdk/src/event_cache/room/mod.rs +++ b/crates/matrix-sdk/src/event_cache/room/mod.rs @@ -337,13 +337,8 @@ impl RoomEventCacheInner { } #[instrument(skip_all, fields(room_id = %self.room_id))] - pub(super) async fn handle_joined_room_update( - &self, - has_storage: bool, - updates: JoinedRoomUpdate, - ) -> Result<()> { + pub(super) async fn handle_joined_room_update(&self, updates: JoinedRoomUpdate) -> Result<()> { self.handle_timeline( - has_storage, updates.timeline, updates.ephemeral.clone(), updates.ambiguity_changes, @@ -355,106 +350,14 @@ impl RoomEventCacheInner { Ok(()) } + #[instrument(skip_all, fields(room_id = %self.room_id))] + pub(super) async fn handle_left_room_update(&self, updates: LeftRoomUpdate) -> Result<()> { + self.handle_timeline(updates.timeline, Vec::new(), updates.ambiguity_changes).await?; + Ok(()) + } + async fn handle_timeline( &self, - has_storage: bool, - timeline: Timeline, - ephemeral_events: Vec>, - ambiguity_changes: BTreeMap, - ) -> Result<()> { - if !has_storage && timeline.limited { - // Ideally we'd try to reconcile existing events against those received in the - // timeline, but we're not there yet. In the meanwhile, clear the - // items from the room. TODO: implement Smart Matching™. - trace!("limited timeline, clearing all previous events and pushing new events"); - - self.replace_all_events_by( - timeline.events, - timeline.prev_batch, - ephemeral_events, - ambiguity_changes, - EventsOrigin::Sync, - ) - .await?; - } else { - // Add all the events to the backend. - trace!("adding new events"); - - let mut state = self.state.write().await; - self.append_events_locked( - has_storage, - &mut state, - timeline, - ephemeral_events, - ambiguity_changes, - ) - .await?; - } - - Ok(()) - } - - #[instrument(skip_all, fields(room_id = %self.room_id))] - pub(super) async fn handle_left_room_update( - &self, - has_storage: bool, - updates: LeftRoomUpdate, - ) -> Result<()> { - self.handle_timeline(has_storage, updates.timeline, Vec::new(), updates.ambiguity_changes) - .await?; - Ok(()) - } - - /// Remove existing events, and append a set of events to the room cache and - /// storage, notifying observers. - pub(super) async fn replace_all_events_by( - &self, - timeline_events: Vec, - prev_batch: Option, - ephemeral_events: Vec>, - ambiguity_changes: BTreeMap, - events_origin: EventsOrigin, - ) -> Result<()> { - // Acquire the lock. - let mut state = self.state.write().await; - - // Reset the room's state. - let updates_as_vector_diffs = state.reset().await?; - - // Propagate to observers. - let _ = self.sender.send(RoomEventCacheUpdate::UpdateTimelineEvents { - diffs: updates_as_vector_diffs, - origin: events_origin, - }); - - // Push the new events. - - // This method is only used when we don't have storage, and - // it's conservative to consider that this new timeline is "limited", - // since we don't know if we have a gap or not. - let has_storage = false; - let limited = true; - - self.append_events_locked( - has_storage, - &mut state, - Timeline { limited, prev_batch, events: timeline_events }, - ephemeral_events, - ambiguity_changes, - ) - .await?; - - Ok(()) - } - - /// Append a set of events to the room cache and storage, notifying - /// observers. - /// - /// This is a private implementation. It must not be exposed publicly. - async fn append_events_locked( - &self, - has_storage: bool, - state: &mut RoomEventCacheState, timeline: Timeline, ephemeral_events: Vec>, ambiguity_changes: BTreeMap, @@ -468,11 +371,18 @@ impl RoomEventCacheInner { return Ok(()); } - // Ditch the previous-batch token if we have storage, the sync isn't limited and - // we've seen at least one event in the past. In this case (and only this one), - // we should definitely know what the head of the timeline is (either we - // know about all the events, or we have a gap somewhere). - if has_storage && !timeline.limited && state.events().events().next().is_some() { + // Add all the events to the backend. + trace!("adding new events"); + + let mut state = self.state.write().await; + + // Ditch the previous-batch token if the sync isn't limited and we've seen at + // least one event in the past. + // + // In this case (and only this one), we should definitely know what the head of + // the timeline is (either we know about all the events, or we have a + // gap somewhere), since storage is enabled by default. + if !timeline.limited && state.events().events().next().is_some() { prev_batch = None; } @@ -1577,7 +1487,6 @@ mod tests { let event_cache = client.event_cache(); event_cache.subscribe().unwrap(); - event_cache.enable_storage().unwrap(); client.base_client().get_or_create_room(room_id, matrix_sdk_base::RoomState::Joined); let room = client.get_room(room_id).unwrap(); @@ -1638,7 +1547,6 @@ mod tests { let event_cache = client.event_cache(); event_cache.subscribe().unwrap(); - event_cache.enable_storage().unwrap(); client.base_client().get_or_create_room(room_id, matrix_sdk_base::RoomState::Joined); let room = client.get_room(room_id).unwrap(); @@ -1690,7 +1598,6 @@ mod tests { // Don't forget to subscribe and like^W enable storage! event_cache.subscribe().unwrap(); - event_cache.enable_storage().unwrap(); client.base_client().get_or_create_room(room_id, matrix_sdk_base::RoomState::Joined); let room = client.get_room(room_id).unwrap(); @@ -1706,7 +1613,7 @@ mod tests { room_event_cache .inner - .handle_joined_room_update(true, JoinedRoomUpdate { timeline, ..Default::default() }) + .handle_joined_room_update(JoinedRoomUpdate { timeline, ..Default::default() }) .await .unwrap(); @@ -1758,7 +1665,6 @@ mod tests { // Don't forget to subscribe and like^W enable storage! event_cache.subscribe().unwrap(); - event_cache.enable_storage().unwrap(); client.base_client().get_or_create_room(room_id, matrix_sdk_base::RoomState::Joined); let room = client.get_room(room_id).unwrap(); @@ -1775,7 +1681,7 @@ mod tests { room_event_cache .inner - .handle_joined_room_update(true, JoinedRoomUpdate { timeline, ..Default::default() }) + .handle_joined_room_update(JoinedRoomUpdate { timeline, ..Default::default() }) .await .unwrap(); @@ -1893,7 +1799,6 @@ mod tests { // Don't forget to subscribe and like^W enable storage! event_cache.subscribe().unwrap(); - event_cache.enable_storage().unwrap(); client.base_client().get_or_create_room(room_id, matrix_sdk_base::RoomState::Joined); let room = client.get_room(room_id).unwrap(); @@ -2037,7 +1942,6 @@ mod tests { // Don't forget to subscribe and like^W enable storage! event_cache.subscribe().unwrap(); - event_cache.enable_storage().unwrap(); client.base_client().get_or_create_room(room_id, matrix_sdk_base::RoomState::Joined); let room = client.get_room(room_id).unwrap(); @@ -2073,7 +1977,7 @@ mod tests { let timeline = Timeline { limited: false, prev_batch: None, events: vec![ev2] }; room_event_cache .inner - .handle_joined_room_update(true, JoinedRoomUpdate { timeline, ..Default::default() }) + .handle_joined_room_update(JoinedRoomUpdate { timeline, ..Default::default() }) .await .unwrap(); @@ -2135,7 +2039,6 @@ mod tests { // Don't forget to subscribe and like^W enable storage! event_cache.subscribe().unwrap(); - event_cache.enable_storage().unwrap(); client.base_client().get_or_create_room(room_id, matrix_sdk_base::RoomState::Joined); let room = client.get_room(room_id).unwrap(); @@ -2166,9 +2069,6 @@ mod tests { let event_cache = client.event_cache(); event_cache.subscribe().unwrap(); - let has_storage = true; // for testing purposes only - event_cache.enable_storage().unwrap(); - client.base_client().get_or_create_room(room_id, matrix_sdk_base::RoomState::Joined); let room = client.get_room(room_id).unwrap(); let (room_event_cache, _drop_handles) = room.event_cache().await.unwrap(); @@ -2179,17 +2079,14 @@ mod tests { // prev-batch token. room_event_cache .inner - .handle_joined_room_update( - has_storage, - JoinedRoomUpdate { - timeline: Timeline { - limited: true, - prev_batch: Some("raclette".to_owned()), - events: vec![f.text_msg("hey yo").into_event()], - }, - ..Default::default() + .handle_joined_room_update(JoinedRoomUpdate { + timeline: Timeline { + limited: true, + prev_batch: Some("raclette".to_owned()), + events: vec![f.text_msg("hey yo").into_event()], }, - ) + ..Default::default() + }) .await .unwrap(); @@ -2235,17 +2132,14 @@ mod tests { // this time. room_event_cache .inner - .handle_joined_room_update( - has_storage, - JoinedRoomUpdate { - timeline: Timeline { - limited: false, - prev_batch: Some("fondue".to_owned()), - events: vec![f.text_msg("sup").into_event()], - }, - ..Default::default() + .handle_joined_room_update(JoinedRoomUpdate { + timeline: Timeline { + limited: false, + prev_batch: Some("fondue".to_owned()), + events: vec![f.text_msg("sup").into_event()], }, - ) + ..Default::default() + }) .await .unwrap(); @@ -2281,7 +2175,6 @@ mod tests { let event_cache = client.event_cache(); event_cache.subscribe().unwrap(); - event_cache.enable_storage().unwrap(); client.base_client().get_or_create_room(room_id, matrix_sdk_base::RoomState::Joined); let room = client.get_room(room_id).unwrap(); @@ -2369,7 +2262,6 @@ mod tests { let event_cache = client.event_cache(); event_cache.subscribe().unwrap(); - event_cache.enable_storage().unwrap(); client.base_client().get_or_create_room(room_id, matrix_sdk_base::RoomState::Joined); let room = client.get_room(room_id).unwrap(); @@ -2486,7 +2378,6 @@ mod tests { let event_cache = client.event_cache(); event_cache.subscribe().unwrap(); - event_cache.enable_storage().unwrap(); client.base_client().get_or_create_room(room_id, matrix_sdk_base::RoomState::Joined); let room = client.get_room(room_id).unwrap(); diff --git a/crates/matrix-sdk/tests/integration/event_cache.rs b/crates/matrix-sdk/tests/integration/event_cache.rs index e8330e2eb..7831ebfe0 100644 --- a/crates/matrix-sdk/tests/integration/event_cache.rs +++ b/crates/matrix-sdk/tests/integration/event_cache.rs @@ -32,7 +32,7 @@ use ruma::{ room_id, user_id, EventId, RoomVersionId, }; use serde_json::json; -use tokio::{spawn, sync::broadcast, task::yield_now, time::sleep}; +use tokio::{spawn, sync::broadcast, time::sleep}; macro_rules! assert_event_id { ($timeline_event:expr, $event_id:literal) => { @@ -285,6 +285,9 @@ async fn test_backpaginate_once() { // I'll get all the previous events, in "reverse" order (same as the response). let BackPaginationOutcome { events, reached_start } = outcome; + + // The event cache figures this is the last chunk of events in the room, because + // there's no prior gap this time. assert!(reached_start); assert_eq!(events.len(), 2); @@ -307,6 +310,11 @@ async fn test_backpaginate_once() { }); assert!(room_stream.is_empty()); + + // Another back-pagination doesn't return any new information. + let outcome = room_event_cache.pagination().run_backwards_once(20).await.unwrap(); + assert!(outcome.events.is_empty()); + assert!(outcome.reached_start); } #[async_test] @@ -382,8 +390,9 @@ async fn test_backpaginate_many_times_with_many_iterations() { } } - // I'll get all the previous events, - assert_eq!(num_iterations, 2); // in two iterations + // I'll get all the previous events, in two iterations, one for each + // back-pagination. + assert_eq!(num_iterations, 2); assert_event_matches_msg(&global_events[0], "world"); assert_event_matches_msg(&global_events[1], "hello"); @@ -446,7 +455,7 @@ async fn test_backpaginate_many_times_with_one_iteration() { let f = EventFactory::new().room(room_id).sender(user_id!("@a:b.c")); - server + let room = server .sync_room( &client, JoinedRoomBuilder::new(room_id) @@ -458,8 +467,7 @@ async fn test_backpaginate_many_times_with_one_iteration() { ) .await; - let (room_event_cache, _drop_handles) = - client.get_room(room_id).unwrap().event_cache().await.unwrap(); + let (room_event_cache, _drop_handles) = room.event_cache().await.unwrap(); let (events, mut room_stream) = room_event_cache.subscribe().await; @@ -647,32 +655,26 @@ async fn test_reset_while_backpaginating() { // Assert the updates as diffs. { + // The room shrinks the linked chunk to the last event chunk, so it clears and + // re-adds the latest event. assert_let_timeout!( Ok(RoomEventCacheUpdate::UpdateTimelineEvents { diffs, .. }) = room_stream.recv() ); - assert_eq!(diffs.len(), 1); - // The clear. + assert_eq!(diffs.len(), 2); assert_matches!(&diffs[0], VectorDiff::Clear); - } - - { - assert_let_timeout!( - Ok(RoomEventCacheUpdate::UpdateTimelineEvents { diffs, .. }) = room_stream.recv() - ); - assert_eq!(diffs.len(), 1); - // The event from the sync. - assert_matches!(&diffs[0], VectorDiff::Append { values: events } => { - assert_eq!(events.len(), 1); - assert_event_matches_msg(&events[0], "heyo"); + assert_matches!(&diffs[1], VectorDiff::Append { values } => { + assert_eq!(values.len(), 1); + assert_event_matches_msg(&values[0], "heyo"); }); } { + // Then we receive the event from the restarted back-pagination with + // `second_backpagination`. assert_let_timeout!( Ok(RoomEventCacheUpdate::UpdateTimelineEvents { diffs, .. }) = room_stream.recv() ); assert_eq!(diffs.len(), 1); - // The event from the pagination. assert_matches!(&diffs[0], VectorDiff::Insert { index, value: event } => { assert_eq!(*index, 0); assert_event_matches_msg(event, "finally!"); @@ -800,14 +802,21 @@ async fn test_limited_timeline_resets_pagination() { ); // When a limited sync comes back from the server, - server.sync_room(&client, JoinedRoomBuilder::new(room_id).set_timeline_limited()).await; + server + .sync_room( + &client, + JoinedRoomBuilder::new(room_id) + .set_timeline_limited() + .set_timeline_prev_batch("prev-batch".to_owned()), + ) + .await; - // We receive an update about the limited timeline. + // We have a limited sync, which triggers a shrink to the latest chunk: a gap. assert_let_timeout!( Ok(RoomEventCacheUpdate::UpdateTimelineEvents { diffs, .. }) = room_stream.recv() ); assert_eq!(diffs.len(), 1); - assert_let!(VectorDiff::Clear = &diffs[0]); + assert_matches!(&diffs[0], VectorDiff::Clear); // The paginator state is reset: status set to Idle, hasn't hit the timeline // start. @@ -819,72 +828,6 @@ async fn test_limited_timeline_resets_pagination() { assert!(room_stream.is_empty()); } -#[async_test] -async fn test_persistent_storage_waits_for_pagination_token() { - let server = MatrixMockServer::new().await; - let client = server.client_builder().build().await; - - let event_cache = client.event_cache(); - - // Immediately subscribe the event cache to sync updates. - event_cache.subscribe().unwrap(); - // TODO: remove this test when persistent storage is enabled by default, as it's - // doing the same as the one above. - event_cache.enable_storage().unwrap(); - - // If I sync and get informed I've joined The Room, without a previous batch - // token, - let room_id = room_id!("!omelette:fromage.fr"); - let f = EventFactory::new().room(room_id).sender(user_id!("@a:b.c")); - let room = server.sync_joined_room(&client, room_id).await; - - let (room_event_cache, _drop_handles) = room.event_cache().await.unwrap(); - - let (events, mut room_stream) = room_event_cache.subscribe().await; - - assert!(events.is_empty()); - assert!(room_stream.is_empty()); - - server - .mock_room_messages() - .ok(RoomMessagesResponseTemplate::default() - .events(vec![f.text_msg("hi").event_id(event_id!("$2")).into_raw_timeline()])) - .mock_once() - .mount() - .await; - - // At the beginning, the paginator is in the initial state. - let pagination = room_event_cache.pagination(); - let mut pagination_status = pagination.status(); - assert_eq!(pagination_status.get(), RoomPaginationStatus::Idle { hit_timeline_start: false }); - - // If we try to back-paginate with a token, it will hit the end of the timeline - // and give us the resulting event. - let BackPaginationOutcome { events, reached_start } = - pagination.run_backwards_once(20).await.unwrap(); - - assert_eq!(events.len(), 1); - assert!(reached_start); - - assert_let_timeout!( - Ok(RoomEventCacheUpdate::UpdateTimelineEvents { diffs, .. }) = room_stream.recv() - ); - assert_eq!(diffs.len(), 1); - assert_matches!(&diffs[0], VectorDiff::Append { values: events } => { - assert_eq!(events.len(), 1); - assert_event_matches_msg(&events[0], "hi"); - }); - - // And the paginator state delivers this as an update, and is internally - // consistent with it: - assert_next_matches_with_timeout!( - pagination_status, - RoomPaginationStatus::Idle { hit_timeline_start: true } - ); - - assert!(room_stream.is_empty()); -} - #[async_test] async fn test_limited_timeline_with_storage() { let server = MatrixMockServer::new().await; @@ -894,7 +837,6 @@ async fn test_limited_timeline_with_storage() { // Don't forget to subscribe and like^W enable storage! event_cache.subscribe().unwrap(); - event_cache.enable_storage().unwrap(); let room_id = room_id!("!galette:saucisse.bzh"); let room = server.sync_joined_room(&client, room_id).await; @@ -953,80 +895,6 @@ async fn test_limited_timeline_with_storage() { assert!(subscriber.is_empty()); } -#[async_test] -async fn test_limited_timeline_without_storage() { - let server = MatrixMockServer::new().await; - let client = server.client_builder().build().await; - - let event_cache = client.event_cache(); - - event_cache.subscribe().unwrap(); - - let room_id = room_id!("!galette:saucisse.bzh"); - let room = server.sync_joined_room(&client, room_id).await; - - let (room_event_cache, _drop_handles) = room.event_cache().await.unwrap(); - - let f = EventFactory::new().room(room_id).sender(user_id!("@ben:saucisse.bzh")); - - // Get a sync for a non-limited timeline, but with a prev-batch token. - // - // When we don't have storage, we should still keep this prev-batch around, - // because the previous sync events may not have been saved to disk in the - // first place. - server - .sync_room( - &client, - JoinedRoomBuilder::new(room_id) - .add_timeline_event(f.text_msg("hey yo")) - .set_timeline_prev_batch("prev-batch".to_owned()), - ) - .await; - - let (initial_events, mut subscriber) = room_event_cache.subscribe().await; - - // This is racy: either the sync has been handled, or it hasn't yet. - if initial_events.is_empty() { - assert_let_timeout!( - Ok(RoomEventCacheUpdate::UpdateTimelineEvents { diffs, .. }) = subscriber.recv() - ); - assert_eq!(diffs.len(), 1); - - assert_let!(VectorDiff::Append { values: events } = &diffs[0]); - assert_eq!(events.len(), 1); - assert_event_matches_msg(&events[0], "hey yo"); - } else { - assert_eq!(initial_events.len(), 1); - assert_event_matches_msg(&initial_events[0], "hey yo"); - } - - // Back-pagination should thus work. The real assertion is in the "mock_once" - // call, which checks this endpoint is called once. - server - .mock_room_messages() - .match_from("prev-batch") - .ok(RoomMessagesResponseTemplate::default() - .events(vec![f.text_msg("oh well").event_id(event_id!("$1"))])) - .mock_once() - .mount() - .await; - - // We run back-pagination with success. - room_event_cache.pagination().run_backwards_once(20).await.unwrap(); - - // And we get the back-paginated event. - assert_let_timeout!( - Ok(RoomEventCacheUpdate::UpdateTimelineEvents { diffs, .. }) = subscriber.recv() - ); - assert_eq!(diffs.len(), 1); - - assert_let!(VectorDiff::Insert { index: 0, value: event } = &diffs[0]); - assert_event_matches_msg(event, "oh well"); - - // That's all, folks! - assert!(subscriber.is_empty()); -} - #[async_test] async fn test_backpaginate_with_no_initial_events() { let server = MatrixMockServer::new().await; @@ -1195,7 +1063,6 @@ async fn test_no_gap_stored_after_deduplicated_sync() { // Immediately subscribe the event cache to sync updates. event_cache.subscribe().unwrap(); - event_cache.enable_storage().unwrap(); let room_id = room_id!("!omelette:fromage.fr"); @@ -1299,7 +1166,6 @@ async fn test_no_gap_stored_after_deduplicated_backpagination() { // Immediately subscribe the event cache to sync updates. event_cache.subscribe().unwrap(); - event_cache.enable_storage().unwrap(); let room_id = room_id!("!omelette:fromage.fr"); @@ -1424,7 +1290,6 @@ async fn test_dont_delete_gap_that_wasnt_inserted() { // Immediately subscribe the event cache to sync updates. event_cache.subscribe().unwrap(); - event_cache.enable_storage().unwrap(); let room_id = room_id!("!omelette:fromage.fr"); @@ -1488,7 +1353,6 @@ async fn test_apply_redaction_when_redaction_comes_later() { // Immediately subscribe the event cache to sync updates. event_cache.subscribe().unwrap(); - event_cache.enable_storage().unwrap(); let room_id = room_id!("!omelette:fromage.fr"); @@ -1611,7 +1475,6 @@ async fn test_apply_redaction_on_an_in_store_event() { // Set up the event cache. let event_cache = client.event_cache(); event_cache.subscribe().unwrap(); - event_cache.enable_storage().unwrap(); let room = mock_server.sync_joined_room(&client, room_id).await; let (room_event_cache, _room_event_cache_drop_handle) = room.event_cache().await.unwrap(); @@ -1703,7 +1566,6 @@ async fn test_apply_redaction_when_redacted_and_redaction_are_in_same_sync() { // Immediately subscribe the event cache to sync updates. event_cache.subscribe().unwrap(); - event_cache.enable_storage().unwrap(); let room_id = room_id!("!omelette:fromage.fr"); let room = server.sync_joined_room(&client, room_id).await; @@ -1854,7 +1716,6 @@ async fn test_lazy_loading() { // Set up the event cache. let event_cache = client.event_cache(); event_cache.subscribe().unwrap(); - event_cache.enable_storage().unwrap(); let room = mock_server.sync_joined_room(&client, room_id).await; let (room_event_cache, _room_event_cache_drop_handle) = room.event_cache().await.unwrap(); @@ -2194,7 +2055,6 @@ async fn test_deduplication() { // Set up the event cache. let event_cache = client.event_cache(); event_cache.subscribe().unwrap(); - event_cache.enable_storage().unwrap(); let room = mock_server.sync_joined_room(&client, room_id).await; let (room_event_cache, _room_event_cache_drop_handle) = room.event_cache().await.unwrap(); @@ -2300,118 +2160,12 @@ async fn test_deduplication() { } } -#[async_test] -async fn test_timeline_then_empty_timeline_then_deduplication() { - let server = MatrixMockServer::new().await; - let client = server.client_builder().build().await; - - client.event_cache().subscribe().unwrap(); - - let room_id = room_id!("!galette:saucisse.bzh"); - let room = server.sync_joined_room(&client, room_id).await; - - let f = EventFactory::new().room(room_id).sender(user_id!("@ben:saucisse.bzh")); - - // Previous batch of events which will be received via /messages, in - // chronological order. - let previous_events = [ - f.text_msg("previous1").event_id(event_id!("$prev1")).into_raw_timeline(), - f.text_msg("previous2").event_id(event_id!("$prev2")).into_raw_timeline(), - f.text_msg("previous3").event_id(event_id!("$prev3")).into_raw_timeline(), - ]; - - // Latest events which will be received via /sync, in chronological order. - let latest_events = [ - f.text_msg("latest3").event_id(event_id!("$latest3")).into_raw_timeline(), - f.text_msg("latest2").event_id(event_id!("$latest2")).into_raw_timeline(), - f.text_msg("latest1").event_id(event_id!("$latest1")).into_raw_timeline(), - ]; - - let (room_event_cache, _drop_handles) = room.event_cache().await.unwrap(); - let (initial_events, mut subscriber) = room_event_cache.subscribe().await; - assert!(initial_events.is_empty()); - - // Receive a sync with only the latest events. - server - .sync_room( - &client, - JoinedRoomBuilder::new(room_id) - .set_timeline_prev_batch("token-before-latest") - .add_timeline_bulk(latest_events.clone().into_iter().map(ruma::serde::Raw::cast)), - ) - .await; - - assert_let_timeout!( - Ok(RoomEventCacheUpdate::UpdateTimelineEvents { diffs, .. }) = subscriber.recv() - ); - assert_eq!(diffs.len(), 1); - assert_let!(VectorDiff::Append { values } = &diffs[0]); - - assert_eq!(values.len(), 3); - assert_event_matches_msg(&values[0], "latest3"); - assert_event_matches_msg(&values[1], "latest2"); - assert_event_matches_msg(&values[2], "latest1"); - - // Receive a timeline without any items. - server - .sync_room( - &client, - JoinedRoomBuilder::new(room_id).set_timeline_prev_batch("token-after-latest"), - ) - .await; - - // Let the event cache handle the timeline result. - yield_now().await; - - // No update happened. - assert!(subscriber.is_empty()); - - // Back-paginate. - let all_events = previous_events.into_iter().chain(latest_events).rev().collect::>(); - - server - .mock_room_messages() - // The prev_batch from the second sync. - .match_from("token-after-latest") - .ok(RoomMessagesResponseTemplate::default().end_token("messages-end-2").events(all_events)) - .named("messages-since-after-latest") - .mount() - .await; - - room_event_cache.pagination().run_backwards_once(10).await.unwrap(); - - assert_let_timeout!( - Ok(RoomEventCacheUpdate::UpdateTimelineEvents { diffs, .. }) = subscriber.recv() - ); - assert_eq!(diffs.len(), 4); - - // Yay. - assert_matches!(&diffs[0], VectorDiff::Remove { index: 2 }); - assert_matches!(&diffs[1], VectorDiff::Remove { index: 1 }); - assert_matches!(&diffs[2], VectorDiff::Remove { index: 0 }); - - assert_let!(VectorDiff::Append { values } = &diffs[3]); - assert_eq!(values.len(), 6); - assert_event_matches_msg(&values[0], "previous1"); - assert_event_matches_msg(&values[1], "previous2"); - assert_event_matches_msg(&values[2], "previous3"); - assert_event_matches_msg(&values[3], "latest3"); - assert_event_matches_msg(&values[4], "latest2"); - assert_event_matches_msg(&values[5], "latest1"); - - // That's all, folks! - assert!(subscriber.is_empty()); -} - #[async_test] async fn test_timeline_then_empty_timeline_then_deduplication_with_storage() { let server = MatrixMockServer::new().await; let client = server.client_builder().build().await; client.event_cache().subscribe().unwrap(); - // TODO: remove the other test above, which doesn't enable storage, when - // persistent storage's enabled by default. - client.event_cache().enable_storage().unwrap(); let room_id = room_id!("!galette:saucisse.bzh"); let room = server.sync_joined_room(&client, room_id).await; @@ -2520,7 +2274,6 @@ async fn test_dont_remove_only_gap() { let client = server.client_builder().build().await; client.event_cache().subscribe().unwrap(); - client.event_cache().enable_storage().unwrap(); let room_id = room_id!("!galette:saucisse.bzh"); let room = server @@ -2581,7 +2334,6 @@ async fn test_clear_all_rooms() { .await; client.event_cache().subscribe().unwrap(); - client.event_cache().enable_storage().unwrap(); // Another room gets a live event: it's loaded in the event cache now, while // sleeping_room_id is not. @@ -2669,7 +2421,6 @@ async fn test_sync_while_back_paginate() { let room = client.get_room(room_id).unwrap(); client.event_cache().subscribe().unwrap(); - client.event_cache().enable_storage().unwrap(); let (room_event_cache, _drop_handles) = room.event_cache().await.unwrap(); let (initial_events, mut subscriber) = room_event_cache.subscribe().await; @@ -2721,7 +2472,7 @@ async fn test_sync_while_back_paginate() { assert_event_matches_msg(&values[1], "sync2"); assert_event_matches_msg(&values[2], "sync3"); - // Back pagination should succeed, and we don't have reached the start. + // Back pagination should succeed, and we haven't reached the start. let outcome = back_pagination_handle.await.unwrap(); assert!(outcome.reached_start.not()); assert_eq!(outcome.events.len(), 3); diff --git a/crates/matrix-sdk/tests/integration/room/common.rs b/crates/matrix-sdk/tests/integration/room/common.rs index 0a4ed5fb4..edbe57fce 100644 --- a/crates/matrix-sdk/tests/integration/room/common.rs +++ b/crates/matrix-sdk/tests/integration/room/common.rs @@ -612,7 +612,6 @@ async fn test_event() { let cache = client.event_cache(); let _ = cache.subscribe(); - cache.enable_storage().unwrap(); let room = server .sync_room( @@ -674,9 +673,7 @@ async fn test_event_with_context() { let next_event_id = event_id!("$next_1234"); let (client, server) = logged_in_client_with_server().await; - let cache = client.event_cache(); - let _ = cache.subscribe(); - cache.enable_storage().unwrap(); + client.event_cache().subscribe().unwrap(); let sync_settings = SyncSettings::new().timeout(Duration::from_millis(3000)); diff --git a/crates/matrix-sdk/tests/integration/room/left.rs b/crates/matrix-sdk/tests/integration/room/left.rs index 7a757bea6..0966b5257 100644 --- a/crates/matrix-sdk/tests/integration/room/left.rs +++ b/crates/matrix-sdk/tests/integration/room/left.rs @@ -27,7 +27,6 @@ async fn test_forget_non_direct_room() { let event_cache = client.event_cache(); event_cache.subscribe().unwrap(); - event_cache.enable_storage().unwrap(); Mock::given(method("POST")) .and(path_regex(r"^/_matrix/client/r0/rooms/.*/forget$")) @@ -84,7 +83,6 @@ async fn test_forget_banned_room() { let event_cache = client.event_cache(); event_cache.subscribe().unwrap(); - event_cache.enable_storage().unwrap(); Mock::given(method("POST")) .and(path_regex(r"^/_matrix/client/r0/rooms/.*/forget$")) diff --git a/labs/multiverse/src/main.rs b/labs/multiverse/src/main.rs index 0f92be497..eee8ebe18 100644 --- a/labs/multiverse/src/main.rs +++ b/labs/multiverse/src/main.rs @@ -112,7 +112,6 @@ async fn main() -> Result<()> { let event_cache = client.event_cache(); event_cache.subscribe()?; - event_cache.enable_storage()?; let terminal = ratatui::init(); execute!(stdout(), EnableMouseCapture)?;