diff --git a/crates/matrix-sdk-base/src/store/integration_tests.rs b/crates/matrix-sdk-base/src/store/integration_tests.rs index 01c231bf1..b982ebd2a 100644 --- a/crates/matrix-sdk-base/src/store/integration_tests.rs +++ b/crates/matrix-sdk-base/src/store/integration_tests.rs @@ -4,9 +4,19 @@ macro_rules! statestore_integration_tests { ($($name:ident)*) => { $( mod $name { + + use futures_util::StreamExt; + use http::Response; use matrix_sdk_test::{async_test, test_json}; use ruma::{ - api::client::r0::media::get_content_thumbnail::Method, + api::{ + client::r0::{ + media::get_content_thumbnail::Method, + message::get_message_events::Response as MessageResponse, + sync::sync_events::Response as SyncResponse, + }, + IncomingResponse, + }, device_id, event_id, events::{ presence::PresenceEvent, EventContent, @@ -30,7 +40,7 @@ macro_rules! statestore_integration_tests { use crate::{ RoomType, Session, - deserialized_responses::{MemberEvent, StrippedMemberEvent}, + deserialized_responses::{MemberEvent, StrippedMemberEvent, RoomEvent, SyncRoomEvent, TimelineSlice}, media::{MediaFormat, MediaRequest, MediaThumbnailSize, MediaType}, store::{ Store, @@ -519,7 +529,155 @@ macro_rules! statestore_integration_tests { assert_eq!(store.get_stripped_room_infos().await?.len(), 0); Ok(()) } + + #[async_test] + async fn test_room_timeline() { + let store = get_store().await.unwrap(); + let mut stored_events = Vec::new(); + let room_id = room_id!("!SVkFJHzfwvuaIEawgC:localhost"); + + // Before the first sync the timeline should be empty + assert!(store.room_timeline(room_id).await.unwrap().is_none()); + + // Add sync response + let sync = SyncResponse::try_from_http_response( + Response::builder().body(serde_json::to_vec(&*test_json::MORE_SYNC).unwrap()).unwrap(), + ) + .unwrap(); + + let timeline = &sync.rooms.join[room_id].timeline; + let events: Vec = timeline.events.iter().cloned().map(Into::into).collect(); + + stored_events.append(&mut events.clone()); + + let timeline_slice = TimelineSlice::new( + events, + sync.next_batch.clone(), + timeline.prev_batch.clone(), + false, + true, + ); + let mut changes = StateChanges::new(sync.next_batch.clone()); + changes.add_timeline(room_id, timeline_slice); + store.save_changes(&changes).await.unwrap(); + + check_timeline_events(room_id, &store, &stored_events, timeline.prev_batch.as_deref()) + .await; + + // Add message response + let messages = MessageResponse::try_from_http_response( + Response::builder() + .body(serde_json::to_vec(&*test_json::SYNC_ROOM_MESSAGES_BATCH_1).unwrap()) + .unwrap(), + ) + .unwrap(); + + let events: Vec = messages + .chunk + .iter() + .cloned() + .map(|event| RoomEvent { event, encryption_info: None }.into()) + .collect(); + + stored_events.append(&mut events.clone()); + + let timeline_slice = + TimelineSlice::new(events, messages.start.clone(), messages.end.clone(), false, false); + let mut changes = StateChanges::default(); + changes.add_timeline(room_id, timeline_slice); + store.save_changes(&changes).await.unwrap(); + + check_timeline_events(room_id, &store, &stored_events, messages.end.as_deref()).await; + + // Add second message response + let messages = MessageResponse::try_from_http_response( + Response::builder() + .body(serde_json::to_vec(&*test_json::SYNC_ROOM_MESSAGES_BATCH_2).unwrap()) + .unwrap(), + ) + .unwrap(); + + let events: Vec = messages + .chunk + .iter() + .cloned() + .map(|event| RoomEvent { event, encryption_info: None }.into()) + .collect(); + + stored_events.append(&mut events.clone()); + + let timeline_slice = + TimelineSlice::new(events, messages.start.clone(), messages.end.clone(), false, false); + let mut changes = StateChanges::default(); + changes.add_timeline(room_id, timeline_slice); + store.save_changes(&changes).await.unwrap(); + + check_timeline_events(room_id, &store, &stored_events, messages.end.as_deref()).await; + + // Add second sync response + let sync = SyncResponse::try_from_http_response( + Response::builder() + .body(serde_json::to_vec(&*test_json::MORE_SYNC_2).unwrap()) + .unwrap(), + ) + .unwrap(); + + let timeline = &sync.rooms.join[room_id].timeline; + let events: Vec = timeline.events.iter().cloned().map(Into::into).collect(); + + let mut prev_stored_events = stored_events; + stored_events = events.clone(); + stored_events.append(&mut prev_stored_events); + + let timeline_slice = TimelineSlice::new( + events, + sync.next_batch.clone(), + timeline.prev_batch.clone(), + false, + true, + ); + let mut changes = StateChanges::new(sync.next_batch.clone()); + changes.add_timeline(room_id, timeline_slice); + store.save_changes(&changes).await.unwrap(); + + check_timeline_events(room_id, &store, &stored_events, messages.end.as_deref()).await; + + // Check if limited sync removes the stored timeline + let end_token = Some("end token".to_string()); + let timeline_slice = TimelineSlice::new( + Vec::new(), + "start token".to_string(), + end_token.clone(), + true, + true, + ); + let mut changes = StateChanges::default(); + changes.add_timeline(room_id, timeline_slice); + store.save_changes(&changes).await.unwrap(); + + check_timeline_events(room_id, &store, &Vec::new(), end_token.as_deref()).await; + } + + async fn check_timeline_events( + room_id: &RoomId, + store: &dyn StateStore, + stored_events: &[SyncRoomEvent], + expected_end_token: Option<&str>, + ) { + let (timeline_iter, end_token) = store.room_timeline(room_id).await.unwrap().unwrap(); + + assert_eq!(end_token.as_deref(), expected_end_token); + + let timeline = timeline_iter.collect::>>().await; + + assert!(timeline + .into_iter() + .zip(stored_events.iter()) + .all(|(a, b)| a.unwrap().event_id() == b.event_id())); + } + } + )* } }