diff --git a/crates/matrix-sdk/src/event_cache/room/mod.rs b/crates/matrix-sdk/src/event_cache/room/mod.rs index 9f52f5acd..cda2bd528 100644 --- a/crates/matrix-sdk/src/event_cache/room/mod.rs +++ b/crates/matrix-sdk/src/event_cache/room/mod.rs @@ -535,6 +535,7 @@ mod private { }, linked_chunk::{LinkedChunk, LinkedChunkBuilder, RawChunk, Update}, }; + use matrix_sdk_common::executor::spawn; use once_cell::sync::OnceCell; use ruma::{serde::Raw, OwnedEventId, OwnedRoomId, RoomId}; use tracing::{error, instrument, trace}; @@ -726,8 +727,8 @@ mod private { trace!("propagating {} updates", updates.len()); // Strip relations from updates which insert or replace items. - for up in updates.iter_mut() { - match up { + for update in updates.iter_mut() { + match update { Update::PushItems { items, .. } => Self::strip_relations_from_events(items), Update::ReplaceItem { item, .. } => Self::strip_relations_from_event(item), // Other update kinds don't involve adding new events. @@ -751,8 +752,8 @@ mod private { let store = store.clone(); let room_id = self.room.clone(); - matrix_sdk_common::executor::spawn(async move { - let locked = store.lock().await?; + spawn(async move { + let store = store.lock().await?; if let Err(err) = locked.handle_linked_chunk_updates(&room_id, updates).await { error!("unable to handle linked chunk updates: {err}"); @@ -1289,6 +1290,8 @@ mod tests { #[cfg(not(target_arch = "wasm32"))] // This uses the cross-process lock, so needs time support. #[async_test] async fn test_clear() { + use std::ops::ControlFlow; + use eyeball_im::VectorDiff; use matrix_sdk_base::linked_chunk::LinkedChunkBuilder; @@ -1305,6 +1308,153 @@ mod tests { let ev1 = f.text_msg("hello world").sender(*ALICE).event_id(event_id1).into_event(); let ev2 = f.text_msg("how's it going").sender(*BOB).event_id(event_id2).into_event(); + // Prefill the store with some data. + event_cache_store + .handle_linked_chunk_updates( + room_id, + vec![ + // An empty items chunk. + Update::NewItemsChunk { + previous: None, + new: ChunkIdentifier::new(0), + next: None, + }, + // A gap chunk. + Update::NewGapChunk { + previous: Some(ChunkIdentifier::new(0)), + // Chunk IDs aren't supposed to be ordered, so use a random value here. + new: ChunkIdentifier::new(42), + next: None, + gap: Gap { prev_token: "comté".to_owned() }, + }, + // Another items chunk, non-empty this time. + Update::NewItemsChunk { + previous: Some(ChunkIdentifier::new(42)), + new: ChunkIdentifier::new(1), + next: None, + }, + Update::PushItems { + at: Position::new(ChunkIdentifier::new(1), 0), + items: vec![ev1.clone()], + }, + // And another items chunk, non-empty again. + Update::NewItemsChunk { + previous: Some(ChunkIdentifier::new(1)), + new: ChunkIdentifier::new(2), + next: None, + }, + Update::PushItems { + at: Position::new(ChunkIdentifier::new(2), 0), + items: vec![ev2.clone()], + }, + ], + ) + .await + .unwrap(); + + let client = MockClientBuilder::new("http://localhost".to_owned()) + .store_config( + StoreConfig::new("hodlor".to_owned()).event_cache_store(event_cache_store.clone()), + ) + .build() + .await; + + let event_cache = client.event_cache(); + + // Don't forget to subscribe and like^W enable storage! + event_cache.subscribe().unwrap(); + event_cache.enable_storage().unwrap(); + + client.base_client().get_or_create_room(room_id, matrix_sdk_base::RoomState::Joined); + let room = client.get_room(room_id).unwrap(); + + let (room_event_cache, _drop_handles) = room.event_cache().await.unwrap(); + + let (items, mut stream) = room_event_cache.subscribe().await; + + // The rooms knows about some cached events. + { + // The chunk containing this event is not loaded yet + assert!(room_event_cache.event(event_id1).await.is_none()); + // The chunk containing this event **is** loaded. + assert!(room_event_cache.event(event_id2).await.is_some()); + + // The reloaded room must contain only one event. + assert_eq!(items.len(), 1); + assert_eq!(items[0].event_id().unwrap(), event_id2); + + assert!(stream.is_empty()); + } + + // Let's load more chunks to get all events. + { + room_event_cache + .pagination() + .run_backwards(20, |outcome, _| async move { ControlFlow::Break(outcome) }) + .await + .unwrap(); + + assert_let_timeout!( + Ok(RoomEventCacheUpdate::UpdateTimelineEvents { diffs, .. }) = stream.recv() + ); + assert_eq!(diffs.len(), 1); + assert_matches!(&diffs[0], VectorDiff::Insert { index: 0, value: _ }); + + // The rooms knows about more cached events. + assert!(room_event_cache.event(event_id1).await.is_some()); + assert!(room_event_cache.event(event_id2).await.is_some()); + + assert!(stream.is_empty()); + } + + // After clearing,… + room_event_cache.clear().await.unwrap(); + + //… we get an update that the content has been cleared. + assert_let_timeout!( + Ok(RoomEventCacheUpdate::UpdateTimelineEvents { diffs, .. }) = stream.recv() + ); + assert_eq!(diffs.len(), 1); + assert_let!(VectorDiff::Clear = &diffs[0]); + + // The room event cache has forgotten about the events. + assert!(room_event_cache.event(event_id1).await.is_none()); + + let (items, _) = room_event_cache.subscribe().await; + assert!(items.is_empty()); + + // The event cache store too. + let raws = event_cache_store.load_all_chunks(room_id).await.unwrap(); + let linked_chunk = LinkedChunkBuilder::<3, _, _>::from_raw_parts(raws).build().unwrap(); + + // Note: while the event cache store could return `None` here, clearing it will + // reset it to its initial form, maintaining the invariant that it + // contains a single items chunk that's empty. + let linked_chunk = linked_chunk.unwrap(); + assert_eq!(linked_chunk.num_items(), 0); + } + + #[cfg(not(target_arch = "wasm32"))] // This uses the cross-process lock, so needs time support. + #[async_test] + async fn test_load_from_storage() { + use std::ops::ControlFlow; + + use eyeball_im::VectorDiff; + + use super::RoomEventCacheUpdate; + use crate::assert_let_timeout; + + let room_id = room_id!("!galette:saucisse.bzh"); + let f = EventFactory::new().room(room_id).sender(user_id!("@ben:saucisse.bzh")); + + let event_cache_store = Arc::new(MemoryStore::new()); + + let event_id1 = event_id!("$1"); + let event_id2 = event_id!("$2"); + + let ev1 = f.text_msg("hello world").sender(*ALICE).event_id(event_id1).into_event(); + let ev2 = f.text_msg("how's it going").sender(*BOB).event_id(event_id2).into_event(); + // Prefill the store with some data. event_cache_store .handle_linked_chunk_updates( @@ -1369,125 +1519,34 @@ mod tests { let (items, mut stream) = room_event_cache.subscribe().await; - // The rooms knows about the cached events. - assert!(room_event_cache.event(event_id1).await.is_some()); - - // The reloaded room must contain the two events. - assert_eq!(items.len(), 2); - assert_eq!(items[0].event_id().unwrap(), event_id1); - assert_eq!(items[1].event_id().unwrap(), event_id2); - + // The initial items contain one event because only the last chunk is loaded by + // default. + assert_eq!(items.len(), 1); + assert_eq!(items[0].event_id().unwrap(), event_id2); assert!(stream.is_empty()); - // After clearing,… - room_event_cache.clear().await.unwrap(); + // The event cache knows only one event. + assert!(room_event_cache.event(event_id1).await.is_none()); + assert!(room_event_cache.event(event_id2).await.is_some()); + + // Let's paginate to load more events. + room_event_cache + .pagination() + .run_backwards(20, |outcome, _| async move { ControlFlow::Break(outcome) }) + .await + .unwrap(); - //… we get an update that the content has been cleared. assert_let_timeout!( Ok(RoomEventCacheUpdate::UpdateTimelineEvents { diffs, .. }) = stream.recv() ); assert_eq!(diffs.len(), 1); - assert_let!(VectorDiff::Clear = &diffs[0]); + assert_matches!(&diffs[0], VectorDiff::Insert { index: 0, value: _ }); - // The room event cache has forgotten about the events. - assert!(room_event_cache.event(event_id1).await.is_none()); + // The event cache knows about the two events now! + assert!(room_event_cache.event(event_id1).await.is_some()); + assert!(room_event_cache.event(event_id2).await.is_some()); - let (items, _) = room_event_cache.subscribe().await; - assert!(items.is_empty()); - - // The event cache store too. - let raws = event_cache_store.load_all_chunks(room_id).await.unwrap(); - let linked_chunk = LinkedChunkBuilder::<3, _, _>::from_raw_parts(raws).build().unwrap(); - - // Note: while the event cache store could return `None` here, clearing it will - // reset it to its initial form, maintaining the invariant that it - // contains a single items chunk that's empty. - let linked_chunk = linked_chunk.unwrap(); - assert_eq!(linked_chunk.num_items(), 0); - } - - #[cfg(not(target_arch = "wasm32"))] // This uses the cross-process lock, so needs time support. - #[async_test] - async fn test_load_from_storage() { - let room_id = room_id!("!galette:saucisse.bzh"); - let f = EventFactory::new().room(room_id).sender(user_id!("@ben:saucisse.bzh")); - - let event_cache_store = Arc::new(MemoryStore::new()); - - let event_id1 = event_id!("$1"); - let event_id2 = event_id!("$2"); - - let ev1 = f.text_msg("hello world").sender(*ALICE).event_id(event_id1).into_event(); - let ev2 = f.text_msg("how's it going").sender(*BOB).event_id(event_id2).into_event(); - - // Prefill the store with some data. - event_cache_store - .handle_linked_chunk_updates( - room_id, - vec![ - // An empty items chunk. - Update::NewItemsChunk { - previous: None, - new: ChunkIdentifier::new(0), - next: None, - }, - // A gap chunk. - Update::NewGapChunk { - previous: Some(ChunkIdentifier::new(0)), - // Chunk IDs aren't supposed to be ordered, so use a random value here. - new: ChunkIdentifier::new(42), - next: None, - gap: Gap { prev_token: "cheddar".to_owned() }, - }, - // Another items chunk, non-empty this time. - Update::NewItemsChunk { - previous: Some(ChunkIdentifier::new(42)), - new: ChunkIdentifier::new(1), - next: None, - }, - Update::PushItems { - at: Position::new(ChunkIdentifier::new(1), 0), - items: vec![ev1.clone()], - }, - // And another items chunk, non-empty again. - Update::NewItemsChunk { - previous: Some(ChunkIdentifier::new(1)), - new: ChunkIdentifier::new(2), - next: None, - }, - Update::PushItems { - at: Position::new(ChunkIdentifier::new(2), 0), - items: vec![ev2.clone()], - }, - ], - ) - .await - .unwrap(); - - let client = MockClientBuilder::new("http://localhost".to_owned()) - .store_config( - StoreConfig::new("hodlor".to_owned()).event_cache_store(event_cache_store.clone()), - ) - .build() - .await; - - let event_cache = client.event_cache(); - - // Don't forget to subscribe and like^W enable storage! - event_cache.subscribe().unwrap(); - event_cache.enable_storage().unwrap(); - - client.base_client().get_or_create_room(room_id, matrix_sdk_base::RoomState::Joined); - let room = client.get_room(room_id).unwrap(); - - let (room_event_cache, _drop_handles) = room.event_cache().await.unwrap(); - - let (items, _stream) = room_event_cache.subscribe().await; - - // The reloaded room must contain the two events. - assert_eq!(items.len(), 2); - assert_eq!(items[0].event_id().unwrap(), event_id1); - assert_eq!(items[1].event_id().unwrap(), event_id2); + assert!(stream.is_empty()); // A new update with one of these events leads to deduplication. let timeline = Timeline { limited: false, prev_batch: None, events: vec![ev2] }; @@ -1510,7 +1569,7 @@ mod tests { #[cfg(not(target_arch = "wasm32"))] // This uses the cross-process lock, so needs time support. #[async_test] async fn test_load_from_storage_resilient_to_failure() { - let room_id = room_id!("!galette:saucisse.bzh"); + let room_id = room_id!("!fondue:patate.ch"); let event_cache_store = Arc::new(MemoryStore::new()); let event = EventFactory::new()