From 93be96c85c8fb94fa275beac80d83232d4fa3320 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?K=C3=A9vin=20Commaille?= <76261501+zecakeh@users.noreply.github.com> Date: Tue, 8 Nov 2022 13:16:09 +0100 Subject: [PATCH] feat(sdk): Add read marker logic to the timeline API --- bindings/matrix-sdk-ffi/src/room.rs | 12 ++-- crates/matrix-sdk/src/room/common.rs | 4 +- .../src/room/timeline/event_handler.rs | 70 ++++++++++++++++++- crates/matrix-sdk/src/room/timeline/mod.rs | 62 ++++++++++++++-- .../tests/integration/room/timeline.rs | 68 ++++++++++++++++-- examples/timeline/src/main.rs | 2 +- 6 files changed, 194 insertions(+), 24 deletions(-) diff --git a/bindings/matrix-sdk-ffi/src/room.rs b/bindings/matrix-sdk-ffi/src/room.rs index f39991bd2..24231822a 100644 --- a/bindings/matrix-sdk-ffi/src/room.rs +++ b/bindings/matrix-sdk-ffi/src/room.rs @@ -120,12 +120,12 @@ impl Room { } pub fn add_timeline_listener(&self, listener: Box) { - let timeline_signal = self - .timeline - .write() - .unwrap() - .get_or_insert_with(|| Arc::new(self.room.timeline())) - .signal(); + let room = self.room.clone(); + + let timeline = RUNTIME.block_on(async move { room.timeline().await }); + + let timeline_signal = + self.timeline.write().unwrap().get_or_insert_with(|| Arc::new(timeline)).signal(); let listener: Arc = listener.into(); RUNTIME.spawn(timeline_signal.for_each(move |diff| { diff --git a/crates/matrix-sdk/src/room/common.rs b/crates/matrix-sdk/src/room/common.rs index 9a26564f1..a6ca62b04 100644 --- a/crates/matrix-sdk/src/room/common.rs +++ b/crates/matrix-sdk/src/room/common.rs @@ -258,8 +258,8 @@ impl Common { /// like edits and reactions as updates of existing items rather than new /// independent events. #[cfg(feature = "experimental-timeline")] - pub fn timeline(&self) -> Timeline { - Timeline::new(self) + pub async fn timeline(&self) -> Timeline { + Timeline::new(self).await } /// Fetch the event with the given `EventId` in this room. diff --git a/crates/matrix-sdk/src/room/timeline/event_handler.rs b/crates/matrix-sdk/src/room/timeline/event_handler.rs index 74c7b832b..b41069b16 100644 --- a/crates/matrix-sdk/src/room/timeline/event_handler.rs +++ b/crates/matrix-sdk/src/room/timeline/event_handler.rs @@ -18,6 +18,7 @@ use indexmap::map::Entry; use matrix_sdk_base::deserialized_responses::EncryptionInfo; use ruma::{ events::{ + fully_read::FullyReadEvent, reaction::ReactionEventContent, room::{ encrypted::{self, RoomEncryptedEventContent}, @@ -37,8 +38,8 @@ use tracing::{debug, error, info, warn}; use super::{ event_item::{BundledReactions, TimelineDetails}, - find_event, EventTimelineItem, Message, TimelineInner, TimelineItem, TimelineItemContent, - TimelineKey, + find_event, find_fully_read, EventTimelineItem, Message, TimelineInner, TimelineItem, + TimelineItemContent, TimelineKey, VirtualTimelineItem, }; impl TimelineInner { @@ -114,6 +115,62 @@ impl TimelineInner { TimelineEventHandler::new(meta, flow, self).handle_event(event.into()) } + + pub(super) fn handle_fully_read(&self, raw: Raw) { + let fully_read_event = match raw.deserialize() { + Ok(ev) => ev.content.event_id, + Err(error) => { + error!(?error, "Failed to deserialize `m.fully_read` account data"); + return; + } + }; + + self.set_fully_read_event(fully_read_event); + } + + pub(super) fn set_fully_read_event(&self, fully_read_event: OwnedEventId) { + { + let mut fully_read_lock = self.fully_read_event.lock().unwrap(); + + if fully_read_lock.as_ref() == Some(&fully_read_event) { + return; + } + + *fully_read_lock = Some(fully_read_event); + } + + self.update_fully_read_item(); + } + + fn update_fully_read_item(&self) { + let fully_read_lock = self.fully_read_event.lock().unwrap(); + + let fully_read_event = match &*fully_read_lock { + Some(event) => event, + None => return, + }; + + let mut items_lock = self.items.lock_mut(); + let old_idx = find_fully_read(&items_lock); + let new_idx = find_event(&items_lock, fully_read_event).map(|(idx, _)| idx + 1); + + match (old_idx, new_idx) { + (None, None) => {} + (None, Some(idx)) => { + *self.fully_read_event_in_timeline.lock().unwrap() = true; + let item = TimelineItem::Virtual(VirtualTimelineItem::ReadMarker); + items_lock.insert_cloned(idx, item.into()); + } + (Some(_), None) => { + // Keep the current position of the read marker, hopefully we + // should have a new position later. + *self.fully_read_event_in_timeline.lock().unwrap() = false; + } + (Some(from), Some(to)) => { + items_lock.move_from_to(from, to); + } + } + } } enum Flow { @@ -464,6 +521,15 @@ impl<'a> TimelineEventHandler<'a> { } } } + + drop(lock); + + // See if we got the event corresponding to the fully read marker now. + let fully_read_event_in_timeline = + *self.timeline.fully_read_event_in_timeline.lock().unwrap(); + if !fully_read_event_in_timeline { + self.timeline.update_fully_read_item(); + } } /// Returns whether an update happened diff --git a/crates/matrix-sdk/src/room/timeline/mod.rs b/crates/matrix-sdk/src/room/timeline/mod.rs index 78ea9ca87..8f9d4b8bd 100644 --- a/crates/matrix-sdk/src/room/timeline/mod.rs +++ b/crates/matrix-sdk/src/room/timeline/mod.rs @@ -26,7 +26,10 @@ use futures_signals::signal_vec::{MutableVec, SignalVec, SignalVecExt, VecDiff}; use matrix_sdk_base::deserialized_responses::EncryptionInfo; use ruma::{ assign, - events::{reaction::Relation as AnnotationRelation, AnyMessageLikeEventContent}, + events::{ + fully_read::FullyReadEventContent, reaction::Relation as AnnotationRelation, + AnyMessageLikeEventContent, + }, OwnedEventId, OwnedUserId, TransactionId, UInt, }; use tracing::{error, instrument, warn}; @@ -61,7 +64,8 @@ pub struct Timeline { room: room::Common, start_token: Mutex>, _end_token: Mutex>, - _event_handler_guard: EventHandlerDropGuard, + _timeline_event_handler_guard: EventHandlerDropGuard, + _fully_read_handler_guard: EventHandlerDropGuard, } #[derive(Clone, Debug, Default)] @@ -69,13 +73,28 @@ struct TimelineInner { items: MutableVec>, // Reaction event / txn ID => sender and reaction data reaction_map: Arc>>, + fully_read_event: Arc>>, + fully_read_event_in_timeline: Arc>, } impl Timeline { - pub(super) fn new(room: &room::Common) -> Self { + pub(super) async fn new(room: &room::Common) -> Self { let inner = TimelineInner::default(); - let handle = room.add_event_handler({ + match room.account_data_static::().await { + Ok(Some(fully_read)) => match fully_read.deserialize() { + Ok(fully_read) => inner.set_fully_read_event(fully_read.content.event_id), + Err(error) => { + error!(?error, "Failed to deserialize `m.fully_read` account data") + } + }, + Err(error) => { + error!(?error, "Failed to get `m.fully_read` account data from the store") + } + _ => {} + } + + let timeline_event_handle = room.add_event_handler({ let inner = inner.clone(); move |event, encryption_info: Option, room: Room| { let inner = inner.clone(); @@ -84,14 +103,27 @@ impl Timeline { } } }); - let _event_handler_guard = room.client.event_handler_drop_guard(handle); + let _timeline_event_handler_guard = + room.client.event_handler_drop_guard(timeline_event_handle); + + let fully_read_handle = room.add_event_handler({ + let inner = inner.clone(); + move |event| { + let inner = inner.clone(); + async move { + inner.handle_fully_read(event); + } + } + }); + let _fully_read_handler_guard = room.client.event_handler_drop_guard(fully_read_handle); Timeline { inner, room: room.clone(), start_token: Mutex::new(None), _end_token: Mutex::new(None), - _event_handler_guard, + _timeline_event_handler_guard, + _fully_read_handler_guard, } } @@ -205,6 +237,15 @@ impl TimelineItem { _ => None, } } + + /// Get the inner `VirtualTimelineItem`, if this is a + /// `TimelineItem::Virtual`. + pub fn as_virtual(&self) -> Option<&VirtualTimelineItem> { + match self { + Self::Virtual(v) => Some(v), + _ => None, + } + } } // FIXME: Put an upper bound on timeline size or add a separate map to look up @@ -219,6 +260,15 @@ fn find_event( .rfind(|(_, it)| key == it.key) } +fn find_fully_read(lock: &[Arc]) -> Option { + lock.iter() + .enumerate() + .rfind(|(_, item)| { + item.as_virtual().filter(|v| matches!(v, VirtualTimelineItem::ReadMarker)).is_some() + }) + .map(|(idx, _)| idx) +} + fn add_event_id(items: &TimelineInner, txn_id: &TransactionId, event_id: OwnedEventId) { let mut lock = items.items.lock_mut(); if let Some((idx, item)) = find_event(&lock, txn_id) { diff --git a/crates/matrix-sdk/tests/integration/room/timeline.rs b/crates/matrix-sdk/tests/integration/room/timeline.rs index 16844c4fa..93a4b9096 100644 --- a/crates/matrix-sdk/tests/integration/room/timeline.rs +++ b/crates/matrix-sdk/tests/integration/room/timeline.rs @@ -7,11 +7,14 @@ use futures_signals::signal_vec::{SignalVecExt, VecDiff}; use futures_util::StreamExt; use matrix_sdk::{ config::SyncSettings, - room::timeline::{TimelineDetails, TimelineItemContent, TimelineKey}, + room::timeline::{TimelineDetails, TimelineItemContent, TimelineKey, VirtualTimelineItem}, ruma::MilliSecondsSinceUnixEpoch, }; use matrix_sdk_common::executor::spawn; -use matrix_sdk_test::{async_test, test_json, EventBuilder, JoinedRoomBuilder, TimelineTestEvent}; +use matrix_sdk_test::{ + async_test, test_json, EventBuilder, JoinedRoomBuilder, RoomAccountDataTestEvent, + TimelineTestEvent, +}; use ruma::{ event_id, events::room::message::{MessageType, RoomMessageEventContent}, @@ -39,7 +42,7 @@ async fn edit() { server.reset().await; let room = client.get_room(room_id).unwrap(); - let timeline = room.timeline(); + let timeline = room.timeline().await; let mut timeline_stream = timeline.signal().to_stream(); ev_builder.add_joined_room(JoinedRoomBuilder::new(room_id).add_timeline_event( @@ -148,7 +151,7 @@ async fn echo() { server.reset().await; let room = client.get_room(room_id).unwrap(); - let timeline = Arc::new(room.timeline()); + let timeline = Arc::new(room.timeline().await); let mut timeline_stream = timeline.signal().to_stream(); let event_id = event_id!("$wWgymRfo7ri1uQx0NXO40vLJ"); @@ -240,7 +243,7 @@ async fn back_pagination() { server.reset().await; let room = client.get_room(room_id).unwrap(); - let timeline = Arc::new(room.timeline()); + let timeline = Arc::new(room.timeline().await); let mut timeline_stream = timeline.signal().to_stream(); Mock::given(method("GET")) @@ -291,7 +294,7 @@ async fn reaction() { server.reset().await; let room = client.get_room(room_id).unwrap(); - let timeline = room.timeline(); + let timeline = room.timeline().await; let mut timeline_stream = timeline.signal().to_stream(); ev_builder.add_joined_room( @@ -383,7 +386,7 @@ async fn redacted_message() { server.reset().await; let room = client.get_room(room_id).unwrap(); - let timeline = room.timeline(); + let timeline = room.timeline().await; let mut timeline_stream = timeline.signal().to_stream(); ev_builder.add_joined_room( @@ -425,3 +428,54 @@ async fn redacted_message() { // TODO: After adding raw timeline items, check for one here } + +#[async_test] +async fn read_marker() { + let room_id = room_id!("!a98sd12bjh:example.org"); + let (client, server) = logged_in_client().await; + let sync_settings = SyncSettings::new().timeout(Duration::from_millis(3000)); + + let mut ev_builder = EventBuilder::new(); + ev_builder.add_joined_room(JoinedRoomBuilder::new(room_id)); + + mock_sync(&server, ev_builder.build_json_sync_response(), None).await; + let _response = client.sync_once(sync_settings.clone()).await.unwrap(); + server.reset().await; + + let room = client.get_room(room_id).unwrap(); + let timeline = room.timeline().await; + let mut timeline_stream = timeline.signal().to_stream(); + + ev_builder.add_joined_room(JoinedRoomBuilder::new(room_id).add_timeline_event( + TimelineTestEvent::Custom(json!({ + "content": { + "body": "hello", + "msgtype": "m.text", + }, + "event_id": "$someplace:example.org", + "origin_server_ts": 152037280, + "sender": "@alice:example.org", + "type": "m.room.message", + })), + )); + + mock_sync(&server, ev_builder.build_json_sync_response(), None).await; + let _response = client.sync_once(sync_settings.clone()).await.unwrap(); + server.reset().await; + + let message = + assert_matches!(timeline_stream.next().await, Some(VecDiff::Push { value }) => value); + assert_matches!(message.as_event().unwrap().content(), TimelineItemContent::Message(_)); + + ev_builder.add_joined_room( + JoinedRoomBuilder::new(room_id).add_account_data(RoomAccountDataTestEvent::FullyRead), + ); + + mock_sync(&server, ev_builder.build_json_sync_response(), None).await; + let _response = client.sync_once(sync_settings.clone()).await.unwrap(); + server.reset().await; + + let marker = + assert_matches!(timeline_stream.next().await, Some(VecDiff::Push { value }) => value); + assert_matches!(marker.as_virtual().unwrap(), VirtualTimelineItem::ReadMarker); +} diff --git a/examples/timeline/src/main.rs b/examples/timeline/src/main.rs index c4f5c92b9..8bf228161 100644 --- a/examples/timeline/src/main.rs +++ b/examples/timeline/src/main.rs @@ -68,7 +68,7 @@ async fn main() -> Result<()> { // Get the timeline stream and listen to it. let room = client.get_room(&room_id).unwrap(); - let timeline = room.timeline(); + let timeline = room.timeline().await; let mut timeline_stream = timeline.signal().to_stream(); tokio::spawn(async move {