feat(sdk): Add read marker logic to the timeline API

This commit is contained in:
Kévin Commaille
2022-11-08 13:16:09 +01:00
committed by GitHub
parent c014f980cb
commit 93be96c85c
6 changed files with 194 additions and 24 deletions

View File

@@ -120,12 +120,12 @@ impl Room {
}
pub fn add_timeline_listener(&self, listener: Box<dyn TimelineListener>) {
let timeline_signal = self
.timeline
.write()
.unwrap()
.get_or_insert_with(|| Arc::new(self.room.timeline()))
.signal();
let room = self.room.clone();
let timeline = RUNTIME.block_on(async move { room.timeline().await });
let timeline_signal =
self.timeline.write().unwrap().get_or_insert_with(|| Arc::new(timeline)).signal();
let listener: Arc<dyn TimelineListener> = listener.into();
RUNTIME.spawn(timeline_signal.for_each(move |diff| {

View File

@@ -258,8 +258,8 @@ impl Common {
/// like edits and reactions as updates of existing items rather than new
/// independent events.
#[cfg(feature = "experimental-timeline")]
pub fn timeline(&self) -> Timeline {
Timeline::new(self)
pub async fn timeline(&self) -> Timeline {
Timeline::new(self).await
}
/// Fetch the event with the given `EventId` in this room.

View File

@@ -18,6 +18,7 @@ use indexmap::map::Entry;
use matrix_sdk_base::deserialized_responses::EncryptionInfo;
use ruma::{
events::{
fully_read::FullyReadEvent,
reaction::ReactionEventContent,
room::{
encrypted::{self, RoomEncryptedEventContent},
@@ -37,8 +38,8 @@ use tracing::{debug, error, info, warn};
use super::{
event_item::{BundledReactions, TimelineDetails},
find_event, EventTimelineItem, Message, TimelineInner, TimelineItem, TimelineItemContent,
TimelineKey,
find_event, find_fully_read, EventTimelineItem, Message, TimelineInner, TimelineItem,
TimelineItemContent, TimelineKey, VirtualTimelineItem,
};
impl TimelineInner {
@@ -114,6 +115,62 @@ impl TimelineInner {
TimelineEventHandler::new(meta, flow, self).handle_event(event.into())
}
pub(super) fn handle_fully_read(&self, raw: Raw<FullyReadEvent>) {
let fully_read_event = match raw.deserialize() {
Ok(ev) => ev.content.event_id,
Err(error) => {
error!(?error, "Failed to deserialize `m.fully_read` account data");
return;
}
};
self.set_fully_read_event(fully_read_event);
}
pub(super) fn set_fully_read_event(&self, fully_read_event: OwnedEventId) {
{
let mut fully_read_lock = self.fully_read_event.lock().unwrap();
if fully_read_lock.as_ref() == Some(&fully_read_event) {
return;
}
*fully_read_lock = Some(fully_read_event);
}
self.update_fully_read_item();
}
fn update_fully_read_item(&self) {
let fully_read_lock = self.fully_read_event.lock().unwrap();
let fully_read_event = match &*fully_read_lock {
Some(event) => event,
None => return,
};
let mut items_lock = self.items.lock_mut();
let old_idx = find_fully_read(&items_lock);
let new_idx = find_event(&items_lock, fully_read_event).map(|(idx, _)| idx + 1);
match (old_idx, new_idx) {
(None, None) => {}
(None, Some(idx)) => {
*self.fully_read_event_in_timeline.lock().unwrap() = true;
let item = TimelineItem::Virtual(VirtualTimelineItem::ReadMarker);
items_lock.insert_cloned(idx, item.into());
}
(Some(_), None) => {
// Keep the current position of the read marker, hopefully we
// should have a new position later.
*self.fully_read_event_in_timeline.lock().unwrap() = false;
}
(Some(from), Some(to)) => {
items_lock.move_from_to(from, to);
}
}
}
}
enum Flow {
@@ -464,6 +521,15 @@ impl<'a> TimelineEventHandler<'a> {
}
}
}
drop(lock);
// See if we got the event corresponding to the fully read marker now.
let fully_read_event_in_timeline =
*self.timeline.fully_read_event_in_timeline.lock().unwrap();
if !fully_read_event_in_timeline {
self.timeline.update_fully_read_item();
}
}
/// Returns whether an update happened

View File

@@ -26,7 +26,10 @@ use futures_signals::signal_vec::{MutableVec, SignalVec, SignalVecExt, VecDiff};
use matrix_sdk_base::deserialized_responses::EncryptionInfo;
use ruma::{
assign,
events::{reaction::Relation as AnnotationRelation, AnyMessageLikeEventContent},
events::{
fully_read::FullyReadEventContent, reaction::Relation as AnnotationRelation,
AnyMessageLikeEventContent,
},
OwnedEventId, OwnedUserId, TransactionId, UInt,
};
use tracing::{error, instrument, warn};
@@ -61,7 +64,8 @@ pub struct Timeline {
room: room::Common,
start_token: Mutex<Option<String>>,
_end_token: Mutex<Option<String>>,
_event_handler_guard: EventHandlerDropGuard,
_timeline_event_handler_guard: EventHandlerDropGuard,
_fully_read_handler_guard: EventHandlerDropGuard,
}
#[derive(Clone, Debug, Default)]
@@ -69,13 +73,28 @@ struct TimelineInner {
items: MutableVec<Arc<TimelineItem>>,
// Reaction event / txn ID => sender and reaction data
reaction_map: Arc<Mutex<HashMap<TimelineKey, (OwnedUserId, AnnotationRelation)>>>,
fully_read_event: Arc<Mutex<Option<OwnedEventId>>>,
fully_read_event_in_timeline: Arc<Mutex<bool>>,
}
impl Timeline {
pub(super) fn new(room: &room::Common) -> Self {
pub(super) async fn new(room: &room::Common) -> Self {
let inner = TimelineInner::default();
let handle = room.add_event_handler({
match room.account_data_static::<FullyReadEventContent>().await {
Ok(Some(fully_read)) => match fully_read.deserialize() {
Ok(fully_read) => inner.set_fully_read_event(fully_read.content.event_id),
Err(error) => {
error!(?error, "Failed to deserialize `m.fully_read` account data")
}
},
Err(error) => {
error!(?error, "Failed to get `m.fully_read` account data from the store")
}
_ => {}
}
let timeline_event_handle = room.add_event_handler({
let inner = inner.clone();
move |event, encryption_info: Option<EncryptionInfo>, room: Room| {
let inner = inner.clone();
@@ -84,14 +103,27 @@ impl Timeline {
}
}
});
let _event_handler_guard = room.client.event_handler_drop_guard(handle);
let _timeline_event_handler_guard =
room.client.event_handler_drop_guard(timeline_event_handle);
let fully_read_handle = room.add_event_handler({
let inner = inner.clone();
move |event| {
let inner = inner.clone();
async move {
inner.handle_fully_read(event);
}
}
});
let _fully_read_handler_guard = room.client.event_handler_drop_guard(fully_read_handle);
Timeline {
inner,
room: room.clone(),
start_token: Mutex::new(None),
_end_token: Mutex::new(None),
_event_handler_guard,
_timeline_event_handler_guard,
_fully_read_handler_guard,
}
}
@@ -205,6 +237,15 @@ impl TimelineItem {
_ => None,
}
}
/// Get the inner `VirtualTimelineItem`, if this is a
/// `TimelineItem::Virtual`.
pub fn as_virtual(&self) -> Option<&VirtualTimelineItem> {
match self {
Self::Virtual(v) => Some(v),
_ => None,
}
}
}
// FIXME: Put an upper bound on timeline size or add a separate map to look up
@@ -219,6 +260,15 @@ fn find_event(
.rfind(|(_, it)| key == it.key)
}
fn find_fully_read(lock: &[Arc<TimelineItem>]) -> Option<usize> {
lock.iter()
.enumerate()
.rfind(|(_, item)| {
item.as_virtual().filter(|v| matches!(v, VirtualTimelineItem::ReadMarker)).is_some()
})
.map(|(idx, _)| idx)
}
fn add_event_id(items: &TimelineInner, txn_id: &TransactionId, event_id: OwnedEventId) {
let mut lock = items.items.lock_mut();
if let Some((idx, item)) = find_event(&lock, txn_id) {

View File

@@ -7,11 +7,14 @@ use futures_signals::signal_vec::{SignalVecExt, VecDiff};
use futures_util::StreamExt;
use matrix_sdk::{
config::SyncSettings,
room::timeline::{TimelineDetails, TimelineItemContent, TimelineKey},
room::timeline::{TimelineDetails, TimelineItemContent, TimelineKey, VirtualTimelineItem},
ruma::MilliSecondsSinceUnixEpoch,
};
use matrix_sdk_common::executor::spawn;
use matrix_sdk_test::{async_test, test_json, EventBuilder, JoinedRoomBuilder, TimelineTestEvent};
use matrix_sdk_test::{
async_test, test_json, EventBuilder, JoinedRoomBuilder, RoomAccountDataTestEvent,
TimelineTestEvent,
};
use ruma::{
event_id,
events::room::message::{MessageType, RoomMessageEventContent},
@@ -39,7 +42,7 @@ async fn edit() {
server.reset().await;
let room = client.get_room(room_id).unwrap();
let timeline = room.timeline();
let timeline = room.timeline().await;
let mut timeline_stream = timeline.signal().to_stream();
ev_builder.add_joined_room(JoinedRoomBuilder::new(room_id).add_timeline_event(
@@ -148,7 +151,7 @@ async fn echo() {
server.reset().await;
let room = client.get_room(room_id).unwrap();
let timeline = Arc::new(room.timeline());
let timeline = Arc::new(room.timeline().await);
let mut timeline_stream = timeline.signal().to_stream();
let event_id = event_id!("$wWgymRfo7ri1uQx0NXO40vLJ");
@@ -240,7 +243,7 @@ async fn back_pagination() {
server.reset().await;
let room = client.get_room(room_id).unwrap();
let timeline = Arc::new(room.timeline());
let timeline = Arc::new(room.timeline().await);
let mut timeline_stream = timeline.signal().to_stream();
Mock::given(method("GET"))
@@ -291,7 +294,7 @@ async fn reaction() {
server.reset().await;
let room = client.get_room(room_id).unwrap();
let timeline = room.timeline();
let timeline = room.timeline().await;
let mut timeline_stream = timeline.signal().to_stream();
ev_builder.add_joined_room(
@@ -383,7 +386,7 @@ async fn redacted_message() {
server.reset().await;
let room = client.get_room(room_id).unwrap();
let timeline = room.timeline();
let timeline = room.timeline().await;
let mut timeline_stream = timeline.signal().to_stream();
ev_builder.add_joined_room(
@@ -425,3 +428,54 @@ async fn redacted_message() {
// TODO: After adding raw timeline items, check for one here
}
#[async_test]
async fn read_marker() {
let room_id = room_id!("!a98sd12bjh:example.org");
let (client, server) = logged_in_client().await;
let sync_settings = SyncSettings::new().timeout(Duration::from_millis(3000));
let mut ev_builder = EventBuilder::new();
ev_builder.add_joined_room(JoinedRoomBuilder::new(room_id));
mock_sync(&server, ev_builder.build_json_sync_response(), None).await;
let _response = client.sync_once(sync_settings.clone()).await.unwrap();
server.reset().await;
let room = client.get_room(room_id).unwrap();
let timeline = room.timeline().await;
let mut timeline_stream = timeline.signal().to_stream();
ev_builder.add_joined_room(JoinedRoomBuilder::new(room_id).add_timeline_event(
TimelineTestEvent::Custom(json!({
"content": {
"body": "hello",
"msgtype": "m.text",
},
"event_id": "$someplace:example.org",
"origin_server_ts": 152037280,
"sender": "@alice:example.org",
"type": "m.room.message",
})),
));
mock_sync(&server, ev_builder.build_json_sync_response(), None).await;
let _response = client.sync_once(sync_settings.clone()).await.unwrap();
server.reset().await;
let message =
assert_matches!(timeline_stream.next().await, Some(VecDiff::Push { value }) => value);
assert_matches!(message.as_event().unwrap().content(), TimelineItemContent::Message(_));
ev_builder.add_joined_room(
JoinedRoomBuilder::new(room_id).add_account_data(RoomAccountDataTestEvent::FullyRead),
);
mock_sync(&server, ev_builder.build_json_sync_response(), None).await;
let _response = client.sync_once(sync_settings.clone()).await.unwrap();
server.reset().await;
let marker =
assert_matches!(timeline_stream.next().await, Some(VecDiff::Push { value }) => value);
assert_matches!(marker.as_virtual().unwrap(), VirtualTimelineItem::ReadMarker);
}

View File

@@ -68,7 +68,7 @@ async fn main() -> Result<()> {
// Get the timeline stream and listen to it.
let room = client.get_room(&room_id).unwrap();
let timeline = room.timeline();
let timeline = room.timeline().await;
let mut timeline_stream = timeline.signal().to_stream();
tokio::spawn(async move {