diff --git a/crates/matrix-sdk/src/event_cache/caches/thread/mod.rs b/crates/matrix-sdk/src/event_cache/caches/thread/mod.rs index df63210b7..c946294c4 100644 --- a/crates/matrix-sdk/src/event_cache/caches/thread/mod.rs +++ b/crates/matrix-sdk/src/event_cache/caches/thread/mod.rs @@ -22,13 +22,13 @@ use std::{fmt, sync::Arc}; use matrix_sdk_base::event_cache::{Event, store::EventCacheStoreLock}; use ruma::{EventId, OwnedEventId, OwnedRoomId, OwnedUserId}; -pub(super) use state::LockedThreadEventCacheState; use tokio::sync::{ Notify, broadcast::{Receiver, Sender}, }; use tracing::{error, trace}; +pub(super) use self::state::LockedThreadEventCacheState; use self::{pagination::ThreadPagination, updates::ThreadEventCacheUpdateSender}; use super::{ super::Result, EventsOrigin, TimelineVectorDiffs, room::RoomEventCacheLinkedChunkUpdate, @@ -202,3 +202,143 @@ impl ThreadEventCache { .and_then(|(_position, event)| event.event_id())) } } + +#[cfg(all(test, not(target_family = "wasm")))] // This uses the cross-process lock, so needs time support. +mod timed_tests { + use std::sync::Arc; + + use assert_matches::assert_matches; + use eyeball_im::VectorDiff; + use matrix_sdk_base::{ + RoomState, ThreadingSupport, + cross_process_lock::CrossProcessLockConfig, + event_cache::store::{EventCacheStore as _, MemoryStore}, + linked_chunk::{ChunkContent, LinkedChunkId, lazy_loader::from_all_chunks}, + store::StoreConfig, + sync::{JoinedRoomUpdate, Timeline}, + }; + use matrix_sdk_test::{async_test, event_factory::EventFactory}; + use ruma::{event_id, room_id, user_id}; + + use super::super::{ + super::RoomEventCacheGenericUpdate, TimelineVectorDiffs, room::RoomEventCacheUpdate, + }; + use crate::test_utils::client::MockClientBuilder; + + #[async_test] + async fn test_write_to_storage() { + let room_id = room_id!("!r0"); + let thread_root = event_id!("$t0_ev0"); + let thread_event_id_0 = event_id!("$t0_ev1"); + + let f = EventFactory::new().room(room_id).sender(user_id!("@mnt_io:matrix.org")); + + let event_cache_store = Arc::new(MemoryStore::new()); + + let client = MockClientBuilder::new(None) + .on_builder(|builder| { + builder + .store_config( + StoreConfig::new(CrossProcessLockConfig::multi_process("hodor")) + .event_cache_store(event_cache_store.clone()), + ) + .with_threading_support(ThreadingSupport::Enabled { with_subscriptions: true }) + }) + .build() + .await; + + let event_cache = client.event_cache(); + event_cache.subscribe().unwrap(); + + client.base_client().get_or_create_room(room_id, RoomState::Joined); + let room = client.get_room(room_id).unwrap(); + + let mut generic_stream = event_cache.subscribe_to_room_generic_updates(); + let (room_event_cache, _drop_handles) = room.event_cache().await.unwrap(); + let (room_events, mut room_stream) = room_event_cache.subscribe().await.unwrap(); + let (thread_events, mut thread_stream) = + room_event_cache.subscribe_to_thread(thread_root.to_owned()).await.unwrap(); + + assert!(room_events.is_empty()); + assert!(thread_events.is_empty()); + + // Propagate an update for a message and a prev-batch token. + let timeline = Timeline { + limited: true, + prev_batch: Some("raclette".to_owned()), + events: vec![ + f.text_msg("salut") + .event_id(thread_event_id_0) + .in_thread(thread_root, thread_root) + .into_event(), + ], + }; + + room_event_cache + .handle_joined_room_update(JoinedRoomUpdate { timeline, ..Default::default() }) + .await + .unwrap(); + + // Checking the update are corrects. + assert_matches!( + generic_stream.recv().await, + Ok(RoomEventCacheGenericUpdate { room_id: expected_room_id }) => { + assert_eq!(expected_room_id, room_id); + } + ); + assert!(generic_stream.is_empty()); + + assert_matches!( + room_stream.recv().await, + Ok(RoomEventCacheUpdate::UpdateTimelineEvents(TimelineVectorDiffs { diffs, .. })) => { + assert_eq!(diffs.len(), 2); + assert_matches!(&diffs[0], VectorDiff::Clear); + assert_matches!(&diffs[1], VectorDiff::Append { values: events } => { + assert_eq!(events.len(), 1); + assert_eq!(events[0].event_id().as_deref(), Some(thread_event_id_0)); + }); + } + ); + assert!(room_stream.is_empty()); + + assert_matches!( + thread_stream.recv().await, + Ok(TimelineVectorDiffs { diffs, .. }) => { + assert_eq!(diffs.len(), 1); + assert_matches!(&diffs[0], VectorDiff::Append { values: events } => { + assert_eq!(events.len(), 1); + assert_eq!(events[0].event_id().as_deref(), Some(thread_event_id_0)); + }); + } + ); + assert!(thread_stream.is_empty()); + + // Check the storage. + let linked_chunk = from_all_chunks::<3, _, _>( + event_cache_store + .load_all_chunks(LinkedChunkId::Thread(room_id, thread_root)) + .await + .unwrap(), + ) + .unwrap() + .unwrap(); + + assert_eq!(linked_chunk.chunks().count(), 2); + + let mut chunks = linked_chunk.chunks(); + + // We start with the gap. + assert_matches!(chunks.next().unwrap().content(), ChunkContent::Gap(gap) => { + assert_eq!(gap.token, "raclette"); + }); + + // Then we have the stored event. + assert_matches!(chunks.next().unwrap().content(), ChunkContent::Items(events) => { + assert_eq!(events.len(), 1); + assert_eq!(events[0].event_id().as_deref(), Some(thread_event_id_0)); + }); + + // That's all, folks! + assert!(chunks.next().is_none()); + } +}