diff --git a/Cargo.lock b/Cargo.lock index f4e01b012..f6089c197 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -179,6 +179,12 @@ dependencies = [ "serde_json", ] +[[package]] +name = "assert_matches" +version = "1.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9b34d609dfbaf33d6889b2b7106d3ca345eacad44200913df5ba02bfd31d2ba9" + [[package]] name = "assign" version = "1.1.1" @@ -1364,6 +1370,7 @@ version = "0.1.0" dependencies = [ "anyhow", "futures", + "futures-signals", "matrix-sdk", "tokio", "tracing-subscriber", @@ -2302,6 +2309,7 @@ version = "0.6.0" dependencies = [ "anyhow", "anymap2", + "assert_matches", "async-once-cell", "async-stream", "async-trait", @@ -2320,6 +2328,7 @@ dependencies = [ "getrandom 0.2.7", "http", "image 0.24.3", + "indexmap", "matches", "matrix-sdk-base", "matrix-sdk-common", @@ -3660,9 +3669,9 @@ dependencies = [ [[package]] name = "ruma" -version = "0.7.1" +version = "0.7.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3daa593bddbe225bc78760329afaba54d0c653e015f18ce6405fa723ec0f34d5" +checksum = "8dc348e3a4a18abc4e97fffa5e2e623f6edd50ba3a1dd5f47eb249fea713b69f" dependencies = [ "assign", "js_int", @@ -3686,9 +3695,9 @@ dependencies = [ [[package]] name = "ruma-client-api" -version = "0.15.0" +version = "0.15.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a2709c891d277ef94d56657c3ec92ed464779dbfff055e518425eedf11d9ecb7" +checksum = "5bcfd3a3853ffdd151fc228441dd9c9e3d835ac85560dface7abda50b3888791" dependencies = [ "assign", "bytes", @@ -3703,9 +3712,9 @@ dependencies = [ [[package]] name = "ruma-common" -version = "0.10.1" +version = "0.10.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "67dab5e934f2e280875cf3a863c14d876265bda169e4fd18334058e7307142d6" +checksum = "a1e629a01f359234798531a99ba83997abd4c15a65b5bcb8354c4171b59c25be" dependencies = [ "base64", "bytes", @@ -3756,9 +3765,9 @@ dependencies = [ [[package]] name = "ruma-macros" -version = "0.10.1" +version = "0.10.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3b3e5a61180840ebfdeb4bcc4dc4a0d0c21aa22f587360b16b785c79058d99f3" +checksum = "9f7cd8cf8771aaca36042fb7659f4647b05e74a2058d843474dde5e51a56cd85" dependencies = [ "once_cell", "proc-macro-crate", diff --git a/bindings/matrix-sdk-ffi/Cargo.toml b/bindings/matrix-sdk-ffi/Cargo.toml index 846c98800..23d029860 100644 --- a/bindings/matrix-sdk-ffi/Cargo.toml +++ b/bindings/matrix-sdk-ffi/Cargo.toml @@ -23,7 +23,7 @@ futures-signals = { version = "0.3.28" } futures-util = { version = "0.3.17", default-features = false } # FIXME: we currently can't feature flag anything in the api.udl, therefore we must enforce sliding-sync being exposed here.. # see https://github.com/matrix-org/matrix-rust-sdk/issues/1014 -matrix-sdk = { path = "../../crates/matrix-sdk", features = ["anyhow", "markdown", "sliding-sync", "socks"], version = "0.6.0" } +matrix-sdk = { path = "../../crates/matrix-sdk", features = ["anyhow", "experimental-timeline", "markdown", "sliding-sync", "socks"], version = "0.6.0" } once_cell = "1.10.0" sanitize-filename-reader-friendly = "2.2.1" serde = { version = "1", features = ["derive"] } diff --git a/crates/matrix-sdk/Cargo.toml b/crates/matrix-sdk/Cargo.toml index 5ee456e48..d26985028 100644 --- a/crates/matrix-sdk/Cargo.toml +++ b/crates/matrix-sdk/Cargo.toml @@ -42,6 +42,8 @@ appservice = ["ruma/appservice-api-s"] image-proc = ["dep:image"] image-rayon = ["image-proc", "image?/jpeg_rayon"] +experimental-timeline = [] + sliding-sync = [ "matrix-sdk-base/sliding-sync", "anyhow", @@ -70,6 +72,7 @@ futures-core = "0.3.21" futures-signals = { version = "0.3.30", default-features = false } futures-util = { version = "0.3.21", default-features = false } http = "0.2.6" +indexmap = "1.9.1" matrix-sdk-base = { version = "0.6.0", path = "../matrix-sdk-base", default_features = false } matrix-sdk-common = { version = "0.6.0", path = "../matrix-sdk-common" } matrix-sdk-indexeddb = { version = "0.2.0", path = "../matrix-sdk-indexeddb", default-features = false, optional = true } @@ -107,7 +110,7 @@ features = [ optional = true [dependencies.ruma] -version = "0.7.0" +version = "0.7.4" features = ["client-api-c", "compat", "rand", "unstable-msc2448", "unstable-msc2965"] [target.'cfg(target_arch = "wasm32")'.dependencies] @@ -120,6 +123,7 @@ tokio = { version = "1.17.0", default-features = false, features = ["fs", "rt"] [dev-dependencies] anyhow = "1.0.57" +assert_matches = "1.5.0" dirs = "4.0.0" futures = { version = "0.3.21", default-features = false, features = ["executor"] } matches = "0.1.9" diff --git a/crates/matrix-sdk/src/room/common.rs b/crates/matrix-sdk/src/room/common.rs index 5cc0aa6fc..d9aeead01 100644 --- a/crates/matrix-sdk/src/room/common.rs +++ b/crates/matrix-sdk/src/room/common.rs @@ -37,6 +37,8 @@ use ruma::{ }; use serde::de::DeserializeOwned; +#[cfg(feature = "experimental-timeline")] +use super::timeline::Timeline; use crate::{ event_handler::{EventHandler, EventHandlerHandle, EventHandlerResult, SyncEvent}, media::{MediaFormat, MediaRequest}, @@ -251,6 +253,16 @@ impl Common { self.client.add_room_event_handler(self.room_id(), handler) } + /// Get a [`Timeline`] for this room. + /// + /// This offers a higher-level API than event handlers, in treating things + /// like edits and reactions as updates of existing items rather than new + /// independent events. + #[cfg(feature = "experimental-timeline")] + pub fn timeline(&self) -> Timeline { + Timeline::new(self) + } + /// Fetch the event with the given `EventId` in this room. pub async fn event(&self, event_id: &EventId) -> Result { let request = get_room_event::v3::Request::new(self.room_id(), event_id); diff --git a/crates/matrix-sdk/src/room/mod.rs b/crates/matrix-sdk/src/room/mod.rs index 13905c733..633213bec 100644 --- a/crates/matrix-sdk/src/room/mod.rs +++ b/crates/matrix-sdk/src/room/mod.rs @@ -9,6 +9,8 @@ mod invited; mod joined; mod left; mod member; +#[cfg(feature = "experimental-timeline")] +pub mod timeline; pub use self::{ common::{Common, Messages, MessagesOptions}, diff --git a/crates/matrix-sdk/src/room/timeline/event_handler.rs b/crates/matrix-sdk/src/room/timeline/event_handler.rs new file mode 100644 index 000000000..bc228d2f5 --- /dev/null +++ b/crates/matrix-sdk/src/room/timeline/event_handler.rs @@ -0,0 +1,486 @@ +// Copyright 2022 The Matrix.org Foundation C.I.C. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::sync::Arc; + +use indexmap::map::Entry; +use matrix_sdk_base::deserialized_responses::EncryptionInfo; +use ruma::{ + events::{ + reaction::ReactionEventContent, + room::{ + message::{Relation, Replacement, RoomMessageEventContent}, + redaction::{ + OriginalSyncRoomRedactionEvent, RoomRedactionEventContent, SyncRoomRedactionEvent, + }, + }, + AnyMessageLikeEventContent, AnyStateEventContent, AnySyncMessageLikeEvent, + AnySyncTimelineEvent, Relations, + }, + serde::Raw, + uint, EventId, MilliSecondsSinceUnixEpoch, OwnedEventId, OwnedTransactionId, OwnedUserId, + UserId, +}; +use tracing::{debug, error, info, warn}; + +use super::{ + event_item::{BundledReactions, TimelineDetails}, + find_event, EventTimelineItem, Message, TimelineInner, TimelineItem, TimelineItemContent, + TimelineKey, +}; + +pub(super) fn handle_live_event( + raw: Raw, + encryption_info: Option, + own_user_id: &UserId, + timeline: &TimelineInner, +) { + handle_remote_event(raw, encryption_info, own_user_id, TimelineItemPosition::End, timeline) +} + +pub(super) fn handle_local_event( + txn_id: OwnedTransactionId, + content: AnyMessageLikeEventContent, + own_user_id: &UserId, + timeline: &TimelineInner, +) { + let meta = TimelineEventMetadata { + sender: own_user_id.to_owned(), + origin_server_ts: None, + raw_event: None, + is_own_event: true, + relations: None, + // FIXME: Should we supply something here for encrypted rooms? + encryption_info: None, + }; + + let flow = Flow::Local { txn_id }; + let kind = TimelineEventKind::Message { content }; + + TimelineEventHandler::new(meta, flow, timeline).handle_event(kind) +} + +pub(super) fn handle_back_paginated_event( + raw: Raw, + encryption_info: Option, + own_user_id: &UserId, + timeline: &TimelineInner, +) { + handle_remote_event(raw, encryption_info, own_user_id, TimelineItemPosition::Start, timeline) +} + +fn handle_remote_event( + raw: Raw, + encryption_info: Option, + own_user_id: &UserId, + position: TimelineItemPosition, + timeline: &TimelineInner, +) { + let event = match raw.deserialize() { + Ok(ev) => ev, + Err(_e) => { + // TODO: Add some sort of error timeline item + return; + } + }; + + let sender = event.sender().to_owned(); + let is_own_event = sender == own_user_id; + + let meta = TimelineEventMetadata { + raw_event: Some(raw), + sender, + origin_server_ts: Some(event.origin_server_ts()), + is_own_event, + relations: event.relations().cloned(), + encryption_info, + }; + let flow = Flow::Remote { + event_id: event.event_id().to_owned(), + txn_id: event.transaction_id().map(ToOwned::to_owned), + position, + }; + + TimelineEventHandler::new(meta, flow, timeline).handle_event(event.into()) +} + +enum Flow { + Local { + txn_id: OwnedTransactionId, + }, + Remote { + event_id: OwnedEventId, + txn_id: Option, + position: TimelineItemPosition, + }, +} + +impl Flow { + fn to_key(&self) -> TimelineKey { + match self { + Self::Remote { event_id, .. } => TimelineKey::EventId(event_id.to_owned()), + Self::Local { txn_id } => TimelineKey::TransactionId(txn_id.to_owned()), + } + } +} + +struct TimelineEventMetadata { + raw_event: Option>, + sender: OwnedUserId, + origin_server_ts: Option, + is_own_event: bool, + relations: Option, + encryption_info: Option, +} + +impl From for TimelineEventKind { + fn from(event: AnySyncTimelineEvent) -> Self { + match event { + AnySyncTimelineEvent::MessageLike(AnySyncMessageLikeEvent::RoomRedaction( + SyncRoomRedactionEvent::Original(OriginalSyncRoomRedactionEvent { + redacts, + content, + .. + }), + )) => Self::Redaction { redacts, content }, + AnySyncTimelineEvent::MessageLike(ev) => match ev.original_content() { + Some(content) => Self::Message { content }, + None => Self::RedactedMessage, + }, + AnySyncTimelineEvent::State(ev) => match ev.original_content() { + Some(_content) => Self::State { _content }, + None => Self::RedactedState, + }, + } + } +} + +#[derive(Clone)] +enum TimelineEventKind { + Message { content: AnyMessageLikeEventContent }, + RedactedMessage, + Redaction { redacts: OwnedEventId, content: RoomRedactionEventContent }, + // FIXME: Split further for state keys of different type + State { _content: AnyStateEventContent }, + RedactedState, // AnyRedactedStateEventContent +} + +enum TimelineItemPosition { + Start, + End, +} + +// Bundles together a few things that are needed throughout the different stages +// of handling an event (figuring out whether it should update an existing +// timeline item, transforming that item or creating a new one, updating the +// reactive Vec). +struct TimelineEventHandler<'a> { + meta: TimelineEventMetadata, + flow: Flow, + timeline: &'a TimelineInner, + event_added: bool, +} + +impl<'a> TimelineEventHandler<'a> { + fn new(meta: TimelineEventMetadata, flow: Flow, timeline: &'a TimelineInner) -> Self { + Self { meta, flow, timeline, event_added: false } + } + + fn handle_event(mut self, event_kind: TimelineEventKind) { + match event_kind { + TimelineEventKind::Message { content } => match content { + AnyMessageLikeEventContent::Reaction(c) => self.handle_reaction(c), + AnyMessageLikeEventContent::RoomMessage(c) => self.handle_room_message(c), + // TODO + _ => {} + }, + TimelineEventKind::RedactedMessage => { + self.add(NewEventTimelineItem::redacted_message()); + } + TimelineEventKind::Redaction { redacts, content } => { + self.handle_redaction(redacts, content) + } + // TODO: State events + _ => {} + } + + if !self.event_added { + // TODO: Add event as raw + } + } + + fn handle_room_message(&mut self, content: RoomMessageEventContent) { + match content.relates_to { + Some(Relation::Replacement(re)) => { + self.handle_room_message_edit(re); + } + _ => { + self.add(NewEventTimelineItem::message(content, self.meta.relations.clone())); + } + } + } + + fn handle_room_message_edit(&mut self, replacement: Replacement) { + let event_id = &replacement.event_id; + + self.maybe_update_timeline_item(event_id, "edit", |item| { + if self.meta.sender != item.sender() { + info!( + %event_id, original_sender = %item.sender(), edit_sender = %self.meta.sender, + "Event tries to edit another user's timeline item, discarding" + ); + return None; + } + + let msg = match &item.content { + TimelineItemContent::Message(msg) => msg, + TimelineItemContent::RedactedMessage => { + info!( + %event_id, + "Event tries to edit a non-editable timeline item, discarding" + ); + return None; + } + }; + + let content = TimelineItemContent::Message(Message { + msgtype: replacement.new_content.msgtype, + in_reply_to: msg.in_reply_to.clone(), + edited: true, + }); + + Some(item.with_content(content)) + }); + } + + // Redacted reaction events are no-ops so don't need to be handled + fn handle_reaction(&mut self, c: ReactionEventContent) { + let event_id: &EventId = &c.relates_to.event_id; + + // This lock should never be contended, same as the timeline item lock. + // If this is ever run in parallel for some reason though, make sure the + // reaction lock is held for the entire time of the timeline items being + // locked so these two things can't get out of sync. + let mut lock = self.timeline.reaction_map.lock().unwrap(); + + let did_update = self.maybe_update_timeline_item(event_id, "reaction", |item| { + // Handling of reactions on redacted events is an open question. + // For now, ignore reactions on redacted events like Element does. + if let TimelineItemContent::RedactedMessage = item.content { + debug!(%event_id, "Ignoring reaction on redacted event"); + None + } else { + let mut reactions = item.reactions.clone(); + let reaction_details = + reactions.bundled.entry(c.relates_to.key.clone()).or_default(); + + reaction_details.count += uint!(1); + if let TimelineDetails::Ready(senders) = &mut reaction_details.senders { + senders.push(self.meta.sender.clone()); + } + + Some(item.with_reactions(reactions)) + } + }); + + if did_update { + lock.insert(self.flow.to_key(), (self.meta.sender.clone(), c.relates_to)); + } + } + + // Redacted redactions are no-ops (unfortunately) + fn handle_redaction(&mut self, redacts: OwnedEventId, _content: RoomRedactionEventContent) { + let mut did_update = false; + + // Don't release this lock until after update_timeline_item. + // See first comment in handle_reaction for why. + let mut lock = self.timeline.reaction_map.lock().unwrap(); + if let Some((sender, rel)) = lock.remove(&TimelineKey::EventId(redacts.clone())) { + did_update = self.maybe_update_timeline_item(&rel.event_id, "redaction", |item| { + let mut reactions = item.reactions.clone(); + + let mut details_entry = match reactions.bundled.entry(rel.key) { + Entry::Occupied(o) => o, + Entry::Vacant(_) => return None, + }; + let details = details_entry.get_mut(); + details.count -= uint!(1); + + if details.count == uint!(0) { + details_entry.remove(); + return Some(item.with_reactions(reactions)); + } + + let senders = match &mut details.senders { + TimelineDetails::Ready(senders) => senders, + _ => { + // FIXME: We probably want to support this somehow in + // the future, but right now it's not possible. + warn!( + "inconsistent state: shouldn't have a reaction_map entry for a \ + timeline item with incomplete reactions" + ); + return None; + } + }; + + if let Some(idx) = senders.iter().position(|s| *s == sender) { + senders.remove(idx); + } else { + error!( + "inconsistent state: sender from reaction_map not in reaction sender list \ + of timeline item" + ); + return None; + } + + if u64::from(details.count) != senders.len() as u64 { + error!("inconsistent state: reaction count differs from number of senders"); + // Can't make things worse by updating the item, so no early + // return here. + } + + Some(item.with_reactions(reactions)) + }); + + if !did_update { + warn!("reaction_map out of sync with timeline items"); + } + } + + // Even if the event being redacted is a reaction (found in + // `reaction_map`), it can still be present in the timeline items + // directly with the raw event timeline feature (not yet implemented). + did_update |= self.update_timeline_item(&redacts, "redaction", |item| item.to_redacted()); + + if !did_update { + // We will want to know this when debugging redaction issues. + debug!(redaction_key = ?self.flow.to_key(), %redacts, "redaction affected no event"); + } + } + + fn add(&mut self, item: NewEventTimelineItem) { + self.event_added = true; + + let NewEventTimelineItem { content, reactions } = item; + let item = EventTimelineItem { + key: self.flow.to_key(), + event_id: None, + sender: self.meta.sender.to_owned(), + content, + reactions, + origin_server_ts: self.meta.origin_server_ts, + is_own: self.meta.is_own_event, + encryption_info: self.meta.encryption_info.clone(), + raw: self.meta.raw_event.clone(), + }; + + let item = Arc::new(TimelineItem::Event(item)); + let mut lock = self.timeline.items.lock_mut(); + match &self.flow { + Flow::Local { .. } + | Flow::Remote { position: TimelineItemPosition::End, txn_id: None, .. } => { + lock.push_cloned(item); + } + Flow::Remote { position: TimelineItemPosition::Start, txn_id: None, .. } => { + lock.insert_cloned(0, item); + } + Flow::Remote { txn_id: Some(txn_id), event_id, position } => { + if let Some((idx, _old_item)) = find_event(&lock, txn_id) { + // TODO: Check whether anything is different about the old and new item? + lock.set_cloned(idx, item); + } else { + debug!( + %txn_id, %event_id, + "Received event with transaction ID, but didn't find matching timeline item" + ); + + match position { + TimelineItemPosition::Start => lock.insert_cloned(0, item), + TimelineItemPosition::End => lock.push_cloned(item), + } + } + } + } + } + + /// Returns whether an update happened + fn maybe_update_timeline_item( + &self, + event_id: &EventId, + action: &str, + update: impl FnOnce(&EventTimelineItem) -> Option, + ) -> bool { + // No point in trying to update items with relations when back- + // paginating, the event the relation applies to can't be processed yet. + if matches!(self.flow, Flow::Remote { position: TimelineItemPosition::Start, .. }) { + return false; + } + + let mut lock = self.timeline.items.lock_mut(); + if let Some((idx, item)) = find_event(&lock, event_id) { + if let Some(new_item) = update(item) { + lock.set_cloned(idx, Arc::new(TimelineItem::Event(new_item))); + return true; + } + } else { + debug!(%event_id, "Timeline item not found, discarding {action}"); + } + + false + } + + /// Returns whether an update happened + fn update_timeline_item( + &self, + event_id: &EventId, + action: &str, + update: impl FnOnce(&EventTimelineItem) -> EventTimelineItem, + ) -> bool { + self.maybe_update_timeline_item(event_id, action, move |item| Some(update(item))) + } +} + +struct NewEventTimelineItem { + content: TimelineItemContent, + reactions: BundledReactions, +} + +impl NewEventTimelineItem { + // These constructors could also be `From` implementations, but that would + // allow users to call them directly, which should not be supported + pub(crate) fn message(c: RoomMessageEventContent, relations: Option) -> Self { + let edited = relations.as_ref().map_or(false, |r| r.replace.is_some()); + let content = TimelineItemContent::Message(Message { + msgtype: c.msgtype, + in_reply_to: c.relates_to.and_then(|rel| match rel { + Relation::Reply { in_reply_to } => Some(in_reply_to.event_id), + _ => None, + }), + edited, + }); + + let reactions = + relations.and_then(|r| r.annotation).map(BundledReactions::from).unwrap_or_default(); + + Self { content, reactions } + } + + pub(crate) fn redacted_message() -> Self { + Self { + content: TimelineItemContent::RedactedMessage, + reactions: BundledReactions::default(), + } + } +} diff --git a/crates/matrix-sdk/src/room/timeline/event_item.rs b/crates/matrix-sdk/src/room/timeline/event_item.rs new file mode 100644 index 000000000..9300195f3 --- /dev/null +++ b/crates/matrix-sdk/src/room/timeline/event_item.rs @@ -0,0 +1,319 @@ +// Copyright 2022 The Matrix.org Foundation C.I.C. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use indexmap::IndexMap; +use matrix_sdk_base::deserialized_responses::EncryptionInfo; +use ruma::{ + events::{ + relation::{AnnotationChunk, AnnotationType}, + room::message::MessageType, + AnySyncTimelineEvent, + }, + serde::Raw, + uint, EventId, MilliSecondsSinceUnixEpoch, OwnedEventId, OwnedTransactionId, OwnedUserId, + TransactionId, UInt, UserId, +}; + +/// An item in the timeline that represents at least one event. +/// +/// There is always one main event that gives the `EventTimelineItem` its +/// identity (see [key](Self::key)) but in many cases, additional events like +/// reactions and edits are also part of the item. +#[derive(Clone, Debug)] +pub struct EventTimelineItem { + pub(super) key: TimelineKey, + // If this item is a local echo that has been acknowledged by the server + // but not remote-echoed yet, this field holds the event ID from the send + // response. + pub(super) event_id: Option, + pub(super) sender: OwnedUserId, + pub(super) content: TimelineItemContent, + pub(super) reactions: BundledReactions, + pub(super) origin_server_ts: Option, + pub(super) is_own: bool, + pub(super) encryption_info: Option, + // FIXME: Expose the raw JSON of aggregated events somehow + pub(super) raw: Option>, +} + +macro_rules! build { + ( + $ty:ident { + $( $field:ident $(: $value:expr)?, )* + ..$this:ident( $($this_field:ident),* $(,)? ) + } + ) => { + $ty { + $( $field $(: $value)?, )* + $( $this_field: $this.$this_field.clone() ),* + } + } +} + +impl EventTimelineItem { + /// Get the [`TimelineKey`] of this item. + pub fn key(&self) -> &TimelineKey { + &self.key + } + + /// Get the event ID of this item. + /// + /// If this returns `Some(_)`, the event was successfully created by the + /// server. + /// + /// Even if the [`key()`](Self::key) of this timeline item holds a + /// transaction ID, this can be `Some(_)` as the event ID can be known not + /// just from the remote echo via `sync_events`, but also from the response + /// of the send request that created the event. + pub fn event_id(&self) -> Option<&EventId> { + match &self.key { + TimelineKey::TransactionId(_) => self.event_id.as_deref(), + TimelineKey::EventId(id) => Some(id), + } + } + + /// Get the sender of this item. + pub fn sender(&self) -> &UserId { + &self.sender + } + + /// Get the content of this item. + pub fn content(&self) -> &TimelineItemContent { + &self.content + } + + /// Get the reactions of this item. + pub fn reactions(&self) -> &IndexMap { + // FIXME: Find out the state of incomplete bundled reactions, adjust + // Ruma if necessary, return the whole BundledReactions field + &self.reactions.bundled + } + + /// Get the origin server timestamp of this item. + /// + /// Returns `None` if this event hasn't been echoed back by the server yet. + pub fn origin_server_ts(&self) -> Option { + self.origin_server_ts + } + + /// Whether this timeline item was sent by the logged-in user themselves. + pub fn is_own(&self) -> bool { + self.is_own + } + + /// Get the raw JSON representation of the initial event (the one that + /// caused this timeline item to be created). + /// + /// Returns `None` if this event hasn't been echoed back by the server yet. + pub fn raw(&self) -> Option<&Raw> { + self.raw.as_ref() + } + + pub(super) fn to_redacted(&self) -> Self { + build!(Self { + // FIXME: Change when we support state events + content: TimelineItemContent::RedactedMessage, + reactions: BundledReactions::default(), + ..self(key, event_id, sender, origin_server_ts, is_own, encryption_info, raw) + }) + } + + pub(super) fn with_event_id(&self, event_id: Option) -> Self { + build!(Self { + event_id, + ..self(key, sender, content, reactions, origin_server_ts, is_own, encryption_info, raw,) + }) + } + + #[rustfmt::skip] + pub(super) fn with_content(&self, content: TimelineItemContent) -> Self { + build!(Self { + content, + ..self( + key, event_id, sender, reactions, origin_server_ts, is_own, encryption_info, raw, + ) + }) + } + + #[rustfmt::skip] + pub(super) fn with_reactions(&self, reactions: BundledReactions) -> Self { + build!(Self { + reactions, + ..self( + key, event_id, sender, content, origin_server_ts, is_own, encryption_info, raw, + ) + }) + } +} + +/// A unique identifier for a timeline item. +/// +/// This identifier is used to find the item in the timeline in order to update +/// its state. +/// +/// When an event is created locally, the timeline reflects this with an item +/// that has a [`TransactionId`](Self::TransactionId) key. Once the server has +/// acknowledged the event and given it an ID, that item's key is replaced by +/// [`EventId`](Self::EventId) containing the new ID. +/// +/// When an event related to the original event whose ID is stored in a +/// [`TimelineKey`] is received, the key is left untouched, but other parts of +/// the timeline item may be updated. Thus, the current data model is only able +/// to handle relations that reference the initial event that resulted in a +/// timeline item being created, not other related events. At the time of +/// writing, there is no relation that is meant to refer to other events that +/// only exist for their relation (e.g. edits, replies). +#[derive(Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Hash)] +pub enum TimelineKey { + /// Transaction ID, for an event that was created locally and hasn't been + /// acknowledged by the server yet. + TransactionId(OwnedTransactionId), + /// Event ID, for an event that is synced with the server. + EventId(OwnedEventId), +} + +impl PartialEq for &TransactionId { + fn eq(&self, key: &TimelineKey) -> bool { + matches!(key, TimelineKey::TransactionId(txn_id) if txn_id == self) + } +} + +impl PartialEq for &OwnedTransactionId { + fn eq(&self, key: &TimelineKey) -> bool { + matches!(key, TimelineKey::TransactionId(txn_id) if txn_id == *self) + } +} + +impl PartialEq for &EventId { + fn eq(&self, key: &TimelineKey) -> bool { + matches!(key, TimelineKey::EventId(event_id) if event_id == self) + } +} + +/// Some details of an [`EventTimelineItem`] that may require server requests +/// other than just the regular +/// [`sync_events`][ruma::api::client::sync::sync_events]. +#[derive(Clone, Debug)] +pub enum TimelineDetails { + /// The details are not available yet, and have not been request from the + /// server. + Unavailable, + + /// The details are not available yet, but have been requested. + Pending, + + /// The details are available. + Ready(T), +} + +/// The content of an [`EventTimelineItem`]. +#[derive(Clone, Debug)] +pub enum TimelineItemContent { + /// An `m.room.message` event or extensible event, including edits. + Message(Message), + + /// A redacted message. + RedactedMessage, +} + +/// An `m.room.message` event or extensible event, including edits. +#[derive(Clone, Debug)] +pub struct Message { + pub(super) msgtype: MessageType, + // TODO: Add everything required to display the replied-to event, plus a + // 'loading' state that is entered at first, until the user requests the + // reply to be loaded. + pub(super) in_reply_to: Option, + pub(super) edited: bool, +} + +impl Message { + /// Get the `msgtype`-specific data of this message. + pub fn msgtype(&self) -> &MessageType { + &self.msgtype + } + + /// Get the event ID of the event this message is replying to, if any. + pub fn in_reply_to(&self) -> Option<&EventId> { + self.in_reply_to.as_deref() + } + + /// Get the edit state of this message (has been edited: `true` / `false`). + pub fn is_edited(&self) -> bool { + self.edited + } +} + +#[derive(Clone, Debug)] +pub struct BundledReactions { + /// Whether all reactions are known, or some may be missing. + /// + /// If this is `false`, the remaining reactions can be fetched via **TODO**. + pub complete: bool, // FIXME: Unclear whether this is needed + + /// The reactions. + /// + /// Key: The reaction, usually an emoji.\ + /// Value: The count. + pub bundled: IndexMap, +} + +impl From for BundledReactions { + fn from(ann: AnnotationChunk) -> Self { + let bundled = ann + .chunk + .into_iter() + .filter_map(|a| { + (a.annotation_type == AnnotationType::Reaction).then(|| { + let details = + ReactionDetails { count: a.count, senders: TimelineDetails::Unavailable }; + (a.key, details) + }) + }) + .collect(); + + BundledReactions { bundled, complete: ann.next_batch.is_none() } + } +} + +impl Default for BundledReactions { + fn default() -> Self { + Self { complete: true, bundled: IndexMap::new() } + } +} + +/// The details of a group of reaction events on the same event with the same +/// key. +#[derive(Clone, Debug)] +pub struct ReactionDetails { + /// The amount of reactions with this key. + pub count: UInt, + + /// The senders of the reactions. + pub senders: TimelineDetails>, +} + +impl Default for ReactionDetails { + fn default() -> Self { + Self { count: uint!(0), senders: TimelineDetails::Ready(Vec::new()) } + } +} + +/// The result of a successful pagination request. +#[derive(Debug)] +#[non_exhaustive] +pub struct PaginationOutcome { + /// Whether there's more messages to be paginated. + pub more_messages: bool, +} diff --git a/crates/matrix-sdk/src/room/timeline/mod.rs b/crates/matrix-sdk/src/room/timeline/mod.rs new file mode 100644 index 000000000..73ed300d4 --- /dev/null +++ b/crates/matrix-sdk/src/room/timeline/mod.rs @@ -0,0 +1,243 @@ +// Copyright 2022 The Matrix.org Foundation C.I.C. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//! A high-level view into a room's contents. +//! +//! See [`Timeline`] for details. + +use std::{ + collections::HashMap, + sync::{Arc, Mutex}, +}; + +use futures_core::Stream; +use futures_signals::signal_vec::{MutableVec, SignalVec, SignalVecExt, VecDiff}; +use matrix_sdk_base::deserialized_responses::EncryptionInfo; +use ruma::{ + assign, + events::{reaction::Relation as AnnotationRelation, AnyMessageLikeEventContent}, + OwnedEventId, OwnedUserId, TransactionId, UInt, +}; +use tracing::{debug, error, instrument}; + +use super::{Joined, Room}; +use crate::{ + event_handler::EventHandlerDropGuard, + room::{self, MessagesOptions}, + Result, +}; + +mod event_handler; +mod event_item; +mod virtual_item; + +use self::event_handler::{handle_back_paginated_event, handle_live_event, handle_local_event}; +pub use self::{ + event_item::{ + EventTimelineItem, Message, PaginationOutcome, ReactionDetails, TimelineDetails, + TimelineItemContent, TimelineKey, + }, + virtual_item::VirtualTimelineItem, +}; + +/// A high-level view into a regular¹ room's contents. +/// +/// ¹ This type is meant to be used in the context of rooms without a +/// `room_type`, that is rooms that are primarily used to exchange text +/// messages. +#[derive(Debug)] +pub struct Timeline { + inner: TimelineInner, + room: room::Common, + start_token: Mutex>, + _end_token: Mutex>, + _event_handler_guard: EventHandlerDropGuard, +} + +#[derive(Clone, Debug, Default)] +struct TimelineInner { + items: MutableVec>, + // Reaction event / txn ID => sender and reaction data + reaction_map: Arc>>, +} + +impl Timeline { + pub(super) fn new(room: &room::Common) -> Self { + let inner = TimelineInner::default(); + + let handle = room.add_event_handler({ + let inner = inner.clone(); + move |event, encryption_info: Option, room: Room| { + let inner = inner.clone(); + async move { + handle_live_event(event, encryption_info, room.own_user_id(), &inner); + } + } + }); + let _event_handler_guard = room.client.event_handler_drop_guard(handle); + + Timeline { + inner, + room: room.clone(), + start_token: Mutex::new(None), + _end_token: Mutex::new(None), + _event_handler_guard, + } + } + + /// Add more events to the start of the timeline. + #[instrument(skip(self), fields(room_id = %self.room.room_id()))] + pub async fn paginate_backwards(&self, limit: UInt) -> Result { + let start = self.start_token.lock().unwrap().clone(); + let messages = self + .room + .messages(assign!(MessagesOptions::backward(), { + from: start.as_deref(), + limit, + })) + .await?; + + let outcome = PaginationOutcome { more_messages: messages.end.is_some() }; + *self.start_token.lock().unwrap() = messages.end; + + let own_user_id = self.room.own_user_id(); + for room_ev in messages.chunk { + handle_back_paginated_event( + room_ev.event.cast(), + room_ev.encryption_info, + own_user_id, + &self.inner, + ); + } + + Ok(outcome) + } + + /// Get a signal of the timeline's items. + /// + /// You can poll this signal to receive updates, the first of which will + /// be the full list of items currently available. + /// + /// See [`SignalVecExt`](futures_signals::signal_vec::SignalVecExt) for a + /// high-level API on top of [`SignalVec`]. + pub fn signal(&self) -> impl SignalVec> { + self.inner.items.signal_vec_cloned() + } + + /// Get a stream of timeline changes. + /// + /// This is a convenience shorthand for `timeline.signal().to_stream()`. + pub fn stream(&self) -> impl Stream>> { + self.signal().to_stream() + } + + /// Send a message to the room, and add it to the timeline as a local echo. + /// + /// For simplicity, this method doesn't currently allow custom message + /// types. + /// + /// If the encryption feature is enabled, this method will transparently + /// encrypt the room message if the room is encrypted. + /// + /// # Arguments + /// + /// * `content` - The content of the message event. + /// + /// * `txn_id` - A locally-unique ID describing a message transaction with + /// the homeserver. Unless you're doing something special, you can pass in + /// `None` which will create a suitable one for you automatically. + /// * On the sending side, this field is used for re-trying earlier + /// failed transactions. Subsequent messages *must never* re-use an + /// earlier transaction ID. + /// * On the receiving side, the field is used for recognizing our own + /// messages when they arrive down the sync: the server includes the + /// ID in the [`MessageLikeUnsigned`] field `transaction_id` of the + /// corresponding [`SyncMessageLikeEvent`], but only for the *sending* + /// device. Other devices will not see it. + /// + /// [`MessageLikeUnsigned`]: ruma::events::MessageLikeUnsigned + /// [`SyncMessageLikeEvent`]: ruma::events::SyncMessageLikeEvent + #[instrument(skip(self, content), fields(room_id = %self.room.room_id()))] + pub async fn send( + &self, + content: AnyMessageLikeEventContent, + txn_id: Option<&TransactionId>, + ) -> Result<()> { + let txn_id = txn_id.map_or_else(TransactionId::new, ToOwned::to_owned); + handle_local_event(txn_id.clone(), content.clone(), self.room.own_user_id(), &self.inner); + + // If this room isn't actually in joined state, we'll get a server error. + // Not ideal, but works for now. + let room = Joined { inner: self.room.clone() }; + + let response = room.send(content, Some(&txn_id)).await?; + add_event_id(&self.inner, &txn_id, response.event_id); + + Ok(()) + } +} + +/// A single entry in timeline. +#[derive(Clone, Debug)] +#[allow(clippy::large_enum_variant)] +pub enum TimelineItem { + /// An event or aggregation of multiple events. + Event(EventTimelineItem), + /// An item that doesn't correspond to an event, for example the user's own + /// read marker. + Virtual(VirtualTimelineItem), +} + +impl TimelineItem { + /// Get the inner `EventTimelineItem`, if this is a `TimelineItem::Event`. + pub fn as_event(&self) -> Option<&EventTimelineItem> { + match self { + Self::Event(v) => Some(v), + _ => None, + } + } +} + +// FIXME: Put an upper bound on timeline size or add a separate map to look up +// the index of a timeline item by its key, to avoid large linear scans. +fn find_event( + lock: &[Arc], + key: impl PartialEq, +) -> Option<(usize, &EventTimelineItem)> { + lock.iter() + .enumerate() + .filter_map(|(idx, item)| Some((idx, item.as_event()?))) + .rfind(|(_, it)| key == it.key) +} + +fn add_event_id(items: &TimelineInner, txn_id: &TransactionId, event_id: OwnedEventId) { + let mut lock = items.items.lock_mut(); + if let Some((idx, item)) = find_event(&lock, txn_id) { + match &item.key { + TimelineKey::TransactionId(_) => { + lock.set_cloned( + idx, + Arc::new(TimelineItem::Event(item.with_event_id(Some(event_id)))), + ); + } + TimelineKey::EventId(ev_id) => { + if *ev_id != event_id { + error!("remote echo and send-event response disagree on the event ID"); + } + } + } + } else { + debug!(%txn_id, "Timeline item not found, can't mark as sent"); + } +} diff --git a/crates/matrix-sdk/src/room/timeline/virtual_item.rs b/crates/matrix-sdk/src/room/timeline/virtual_item.rs new file mode 100644 index 000000000..dedc18bf3 --- /dev/null +++ b/crates/matrix-sdk/src/room/timeline/virtual_item.rs @@ -0,0 +1,22 @@ +// Copyright 2022 The Matrix.org Foundation C.I.C. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +/// A [`TimelineItem`](super::TimelineItem) that doesn't correspond to an event. +#[derive(Clone, Debug)] +pub enum VirtualTimelineItem { + /// A divider between messages of two days. + DayDivider, + /// The user's own read marker. + ReadMarker, +} diff --git a/crates/matrix-sdk/tests/integration/main.rs b/crates/matrix-sdk/tests/integration/main.rs index 709342fa9..e33adebbf 100644 --- a/crates/matrix-sdk/tests/integration/main.rs +++ b/crates/matrix-sdk/tests/integration/main.rs @@ -13,6 +13,16 @@ mod client; mod refresh_token; mod room; +#[cfg(all(test, not(target_arch = "wasm32")))] +#[ctor::ctor] +fn init_logging() { + use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt}; + tracing_subscriber::registry() + .with(tracing_subscriber::EnvFilter::from_default_env()) + .with(tracing_subscriber::fmt::layer().with_test_writer()) + .init(); +} + async fn test_client_builder() -> (ClientBuilder, MockServer) { let server = MockServer::start().await; let builder = diff --git a/crates/matrix-sdk/tests/integration/room/mod.rs b/crates/matrix-sdk/tests/integration/room/mod.rs index b9e0e2c78..3325d1320 100644 --- a/crates/matrix-sdk/tests/integration/room/mod.rs +++ b/crates/matrix-sdk/tests/integration/room/mod.rs @@ -1,3 +1,4 @@ mod common; mod joined; mod left; +mod timeline; diff --git a/crates/matrix-sdk/tests/integration/room/timeline.rs b/crates/matrix-sdk/tests/integration/room/timeline.rs new file mode 100644 index 000000000..16844c4fa --- /dev/null +++ b/crates/matrix-sdk/tests/integration/room/timeline.rs @@ -0,0 +1,427 @@ +#![cfg(feature = "experimental-timeline")] + +use std::{sync::Arc, time::Duration}; + +use assert_matches::assert_matches; +use futures_signals::signal_vec::{SignalVecExt, VecDiff}; +use futures_util::StreamExt; +use matrix_sdk::{ + config::SyncSettings, + room::timeline::{TimelineDetails, TimelineItemContent, TimelineKey}, + ruma::MilliSecondsSinceUnixEpoch, +}; +use matrix_sdk_common::executor::spawn; +use matrix_sdk_test::{async_test, test_json, EventBuilder, JoinedRoomBuilder, TimelineTestEvent}; +use ruma::{ + event_id, + events::room::message::{MessageType, RoomMessageEventContent}, + room_id, uint, user_id, TransactionId, +}; +use serde_json::json; +use wiremock::{ + matchers::{header, method, path_regex}, + Mock, ResponseTemplate, +}; + +use crate::{logged_in_client, mock_sync}; + +#[async_test] +async fn edit() { + let room_id = room_id!("!a98sd12bjh:example.org"); + let (client, server) = logged_in_client().await; + let sync_settings = SyncSettings::new().timeout(Duration::from_millis(3000)); + + let mut ev_builder = EventBuilder::new(); + ev_builder.add_joined_room(JoinedRoomBuilder::new(room_id)); + + mock_sync(&server, ev_builder.build_json_sync_response(), None).await; + let _response = client.sync_once(sync_settings.clone()).await.unwrap(); + server.reset().await; + + let room = client.get_room(room_id).unwrap(); + let timeline = room.timeline(); + let mut timeline_stream = timeline.signal().to_stream(); + + ev_builder.add_joined_room(JoinedRoomBuilder::new(room_id).add_timeline_event( + TimelineTestEvent::Custom(json!({ + "content": { + "body": "hello", + "msgtype": "m.text", + }, + "event_id": "$msda7m:localhost", + "origin_server_ts": 152037280, + "sender": "@alice:example.org", + "type": "m.room.message", + })), + )); + + mock_sync(&server, ev_builder.build_json_sync_response(), None).await; + let _response = client.sync_once(sync_settings.clone()).await.unwrap(); + server.reset().await; + + let first = + assert_matches!(timeline_stream.next().await, Some(VecDiff::Push { value }) => value); + let msg = assert_matches!( + first.as_event().unwrap().content(), + TimelineItemContent::Message(msg) => msg + ); + assert_matches!(msg.msgtype(), MessageType::Text(_)); + assert_matches!(msg.in_reply_to(), None); + assert!(!msg.is_edited()); + + ev_builder.add_joined_room( + JoinedRoomBuilder::new(room_id) + .add_timeline_event(TimelineTestEvent::Custom(json!({ + "content": { + "body": "Test", + "formatted_body": "Test", + "msgtype": "m.text", + "format": "org.matrix.custom.html", + }, + "event_id": "$7at8sd:localhost", + "origin_server_ts": 152038280, + "sender": "@bob:example.org", + "type": "m.room.message", + }))) + .add_timeline_event(TimelineTestEvent::Custom(json!({ + "content": { + "body": " * hi", + "m.new_content": { + "body": "hi", + "msgtype": "m.text", + }, + "m.relates_to": { + "event_id": "$msda7m:localhost", + "rel_type": "m.replace", + }, + "msgtype": "m.text", + }, + "event_id": "$msda7m2:localhost", + "origin_server_ts": 159056300, + "sender": "@alice:example.org", + "type": "m.room.message", + }))), + ); + + mock_sync(&server, ev_builder.build_json_sync_response(), None).await; + let _response = client.sync_once(sync_settings.clone()).await.unwrap(); + server.reset().await; + + let second = + assert_matches!(timeline_stream.next().await, Some(VecDiff::Push { value }) => value); + let item = second.as_event().unwrap(); + assert_eq!(item.origin_server_ts(), Some(MilliSecondsSinceUnixEpoch(uint!(152038280)))); + assert!(item.event_id().is_some()); + assert!(!item.is_own()); + assert!(item.raw().is_some()); + + let msg = assert_matches!(item.content(), TimelineItemContent::Message(msg) => msg); + assert_matches!(msg.msgtype(), MessageType::Text(_)); + assert_matches!(msg.in_reply_to(), None); + assert!(!msg.is_edited()); + + let edit = assert_matches!( + timeline_stream.next().await, + Some(VecDiff::UpdateAt { index: 0, value }) => value + ); + let edited = assert_matches!( + edit.as_event().unwrap().content(), + TimelineItemContent::Message(msg) => msg + ); + let text = assert_matches!(edited.msgtype(), MessageType::Text(text) => text); + assert_eq!(text.body, "hi"); + assert_matches!(edited.in_reply_to(), None); + assert!(edited.is_edited()); +} + +#[async_test] +async fn echo() { + let room_id = room_id!("!a98sd12bjh:example.org"); + let (client, server) = logged_in_client().await; + let sync_settings = SyncSettings::new().timeout(Duration::from_millis(3000)); + + let mut ev_builder = EventBuilder::new(); + ev_builder.add_joined_room(JoinedRoomBuilder::new(room_id)); + + mock_sync(&server, ev_builder.build_json_sync_response(), None).await; + let _response = client.sync_once(sync_settings.clone()).await.unwrap(); + server.reset().await; + + let room = client.get_room(room_id).unwrap(); + let timeline = Arc::new(room.timeline()); + let mut timeline_stream = timeline.signal().to_stream(); + + let event_id = event_id!("$wWgymRfo7ri1uQx0NXO40vLJ"); + let txn_id: &TransactionId = "my-txn-id".into(); + + Mock::given(method("PUT")) + .and(path_regex(r"^/_matrix/client/r0/rooms/.*/send/.*")) + .and(header("authorization", "Bearer 1234")) + .respond_with(ResponseTemplate::new(200).set_body_json(&json!({ "event_id": event_id }))) + .mount(&server) + .await; + + // Don't move the original timeline, it must live until the end of the test + let timeline = timeline.clone(); + let send_hdl = spawn(async move { + timeline + .send(RoomMessageEventContent::text_plain("Hello, World!").into(), Some(txn_id)) + .await + }); + + let local_echo = + assert_matches!(timeline_stream.next().await, Some(VecDiff::Push { value }) => value); + let item = local_echo.as_event().unwrap(); + assert!(item.event_id().is_none()); + assert!(item.is_own()); + assert_matches!(item.key(), TimelineKey::TransactionId(_)); + assert_eq!(item.origin_server_ts(), None); + assert_matches!(item.raw(), None); + + let msg = assert_matches!(item.content(), TimelineItemContent::Message(msg) => msg); + let text = assert_matches!(msg.msgtype(), MessageType::Text(text) => text); + assert_eq!(text.body, "Hello, World!"); + + // Wait for the sending to finish and assert everything was successful + send_hdl.await.unwrap().unwrap(); + + let sent_confirmation = assert_matches!( + timeline_stream.next().await, + Some(VecDiff::UpdateAt { index: 0, value }) => value + ); + let item = sent_confirmation.as_event().unwrap(); + assert!(item.event_id().is_some()); + assert!(item.is_own()); + assert_matches!(item.key(), TimelineKey::TransactionId(_)); + assert_eq!(item.origin_server_ts(), None); + assert_matches!(item.raw(), None); + + ev_builder.add_joined_room(JoinedRoomBuilder::new(room_id).add_timeline_event( + TimelineTestEvent::Custom(json!({ + "content": { + "body": "Hello, World!", + "msgtype": "m.text", + }, + "event_id": "$7at8sd:localhost", + "origin_server_ts": 152038280, + "sender": "@example:localhost", + "type": "m.room.message", + "unsigned": { "transaction_id": txn_id, }, + })), + )); + + mock_sync(&server, ev_builder.build_json_sync_response(), None).await; + let _response = client.sync_once(sync_settings.clone()).await.unwrap(); + server.reset().await; + + let remote_echo = assert_matches!( + timeline_stream.next().await, + Some(VecDiff::UpdateAt { index: 0, value }) => value + ); + let item = remote_echo.as_event().unwrap(); + assert!(item.event_id().is_some()); + assert!(item.is_own()); + assert_eq!(item.origin_server_ts(), Some(MilliSecondsSinceUnixEpoch(uint!(152038280)))); + assert_matches!(item.key(), TimelineKey::EventId(_)); + assert_matches!(item.raw(), Some(_)); +} + +#[async_test] +async fn back_pagination() { + let room_id = room_id!("!a98sd12bjh:example.org"); + let (client, server) = logged_in_client().await; + let sync_settings = SyncSettings::new().timeout(Duration::from_millis(3000)); + + let mut ev_builder = EventBuilder::new(); + ev_builder.add_joined_room(JoinedRoomBuilder::new(room_id)); + + mock_sync(&server, ev_builder.build_json_sync_response(), None).await; + let _response = client.sync_once(sync_settings.clone()).await.unwrap(); + server.reset().await; + + let room = client.get_room(room_id).unwrap(); + let timeline = Arc::new(room.timeline()); + let mut timeline_stream = timeline.signal().to_stream(); + + Mock::given(method("GET")) + .and(path_regex(r"^/_matrix/client/r0/rooms/.*/messages$")) + .and(header("authorization", "Bearer 1234")) + .respond_with(ResponseTemplate::new(200).set_body_json(&*test_json::ROOM_MESSAGES_BATCH_1)) + .expect(1) + .named("messages_batch_1") + .mount(&server) + .await; + + timeline.paginate_backwards(uint!(10)).await.unwrap(); + + let message = assert_matches!( + timeline_stream.next().await, + Some(VecDiff::Push { value }) => value + ); + let msg = assert_matches!( + message.as_event().unwrap().content(), + TimelineItemContent::Message(msg) => msg + ); + let text = assert_matches!(msg.msgtype(), MessageType::Text(text) => text); + assert_eq!(text.body, "hello world"); + + let message = assert_matches!( + timeline_stream.next().await, + Some(VecDiff::InsertAt { index: 0, value }) => value + ); + let msg = assert_matches!( + message.as_event().unwrap().content(), + TimelineItemContent::Message(msg) => msg + ); + let text = assert_matches!(msg.msgtype(), MessageType::Text(text) => text); + assert_eq!(text.body, "the world is big"); +} + +#[async_test] +async fn reaction() { + let room_id = room_id!("!a98sd12bjh:example.org"); + let (client, server) = logged_in_client().await; + let sync_settings = SyncSettings::new().timeout(Duration::from_millis(3000)); + + let mut ev_builder = EventBuilder::new(); + ev_builder.add_joined_room(JoinedRoomBuilder::new(room_id)); + + mock_sync(&server, ev_builder.build_json_sync_response(), None).await; + let _response = client.sync_once(sync_settings.clone()).await.unwrap(); + server.reset().await; + + let room = client.get_room(room_id).unwrap(); + let timeline = room.timeline(); + let mut timeline_stream = timeline.signal().to_stream(); + + ev_builder.add_joined_room( + JoinedRoomBuilder::new(room_id) + .add_timeline_event(TimelineTestEvent::Custom(json!({ + "content": { + "body": "hello", + "msgtype": "m.text", + }, + "event_id": "$TTvQUp1e17qkw41rBSjpZ", + "origin_server_ts": 152037280, + "sender": "@alice:example.org", + "type": "m.room.message", + }))) + .add_timeline_event(TimelineTestEvent::Custom(json!({ + "content": { + "m.relates_to": { + "event_id": "$TTvQUp1e17qkw41rBSjpZ", + "key": "👍", + "rel_type": "m.annotation", + }, + }, + "event_id": "$031IXQRi27504", + "origin_server_ts": 152038300, + "sender": "@bob:example.org", + "type": "m.reaction", + }))), + ); + + mock_sync(&server, ev_builder.build_json_sync_response(), None).await; + let _response = client.sync_once(sync_settings.clone()).await.unwrap(); + server.reset().await; + + let message = + assert_matches!(timeline_stream.next().await, Some(VecDiff::Push { value }) => value); + assert_matches!(message.as_event().unwrap().content(), TimelineItemContent::Message(_)); + + let updated_message = assert_matches!( + timeline_stream.next().await, + Some(VecDiff::UpdateAt { index: 0, value }) => value + ); + let event_item = updated_message.as_event().unwrap(); + let msg = assert_matches!(event_item.content(), TimelineItemContent::Message(msg) => msg); + assert!(!msg.is_edited()); + assert_eq!(event_item.reactions().len(), 1); + let details = &event_item.reactions()["👍"]; + assert_eq!(details.count, uint!(1)); + let senders = assert_matches!(&details.senders, TimelineDetails::Ready(s) => s); + assert_eq!(*senders, vec![user_id!("@bob:example.org").to_owned()]); + + // TODO: After adding raw timeline items, check for one here + + ev_builder.add_joined_room(JoinedRoomBuilder::new(room_id).add_timeline_event( + TimelineTestEvent::Custom(json!({ + "content": {}, + "redacts": "$031IXQRi27504", + "event_id": "$N6eUCBc3vu58PL8TobGaVQzM", + "sender": "@bob:example.org", + "origin_server_ts": 152037280, + "type": "m.room.redaction", + })), + )); + + mock_sync(&server, ev_builder.build_json_sync_response(), None).await; + let _response = client.sync_once(sync_settings.clone()).await.unwrap(); + server.reset().await; + + let updated_message = assert_matches!( + timeline_stream.next().await, + Some(VecDiff::UpdateAt { index: 0, value }) => value + ); + let event_item = updated_message.as_event().unwrap(); + let msg = assert_matches!(event_item.content(), TimelineItemContent::Message(msg) => msg); + assert!(!msg.is_edited()); + assert_eq!(event_item.reactions().len(), 0); +} + +#[async_test] +async fn redacted_message() { + let room_id = room_id!("!a98sd12bjh:example.org"); + let (client, server) = logged_in_client().await; + let sync_settings = SyncSettings::new().timeout(Duration::from_millis(3000)); + + let mut ev_builder = EventBuilder::new(); + ev_builder.add_joined_room(JoinedRoomBuilder::new(room_id)); + + mock_sync(&server, ev_builder.build_json_sync_response(), None).await; + let _response = client.sync_once(sync_settings.clone()).await.unwrap(); + server.reset().await; + + let room = client.get_room(room_id).unwrap(); + let timeline = room.timeline(); + let mut timeline_stream = timeline.signal().to_stream(); + + ev_builder.add_joined_room( + JoinedRoomBuilder::new(room_id) + .add_timeline_event(TimelineTestEvent::Custom(json!({ + "content": {}, + "event_id": "$eeG0HA0FAZ37wP8kXlNkxx3I", + "origin_server_ts": 152035910, + "sender": "@alice:example.org", + "type": "m.room.message", + "unsigned": { + "redacted_because": { + "content": {}, + "redacts": "$eeG0HA0FAZ37wP8kXlNkxx3I", + "event_id": "$N6eUCBc3vu58PL8TobGaVQzM", + "sender": "@alice:example.org", + "origin_server_ts": 152037280, + "type": "m.room.redaction", + }, + }, + }))) + .add_timeline_event(TimelineTestEvent::Custom(json!({ + "content": {}, + "redacts": "$eeG0HA0FAZ37wP8kXlNkxx3I", + "event_id": "$N6eUCBc3vu58PL8TobGaVQzM", + "sender": "@alice:example.org", + "origin_server_ts": 152037280, + "type": "m.room.redaction", + }))), + ); + + mock_sync(&server, ev_builder.build_json_sync_response(), None).await; + let _response = client.sync_once(sync_settings.clone()).await.unwrap(); + server.reset().await; + + let first = + assert_matches!(timeline_stream.next().await, Some(VecDiff::Push { value }) => value); + assert_matches!(first.as_event().unwrap().content(), TimelineItemContent::RedactedMessage); + + // TODO: After adding raw timeline items, check for one here +} diff --git a/examples/timeline/Cargo.toml b/examples/timeline/Cargo.toml index 9290178fa..dcd457ff6 100644 --- a/examples/timeline/Cargo.toml +++ b/examples/timeline/Cargo.toml @@ -11,11 +11,12 @@ test = false [dependencies] anyhow = "1" futures = "0.3" +futures-signals = { version = "0.3.30", default-features = false } tokio = { version = "1.20.1", features = ["macros", "rt-multi-thread"] } tracing-subscriber = "0.3.15" url = "2.2.2" [dependencies.matrix-sdk] path = "../../crates/matrix-sdk" -features = ["sled"] +features = ["experimental-timeline", "sled"] version = "0.6.0" diff --git a/examples/timeline/src/main.rs b/examples/timeline/src/main.rs index b92ab877a..a00cf90d7 100644 --- a/examples/timeline/src/main.rs +++ b/examples/timeline/src/main.rs @@ -1,5 +1,7 @@ use std::{env, process::exit, sync::Mutex, time::Duration}; +use futures::StreamExt; +use futures_signals::signal_vec::SignalVecExt; use matrix_sdk::{ self, config::SyncSettings, @@ -7,6 +9,7 @@ use matrix_sdk::{ ruma::{ api::client::filter::{FilterDefinition, LazyLoadOptions}, events::{AnySyncMessageLikeEvent, AnySyncTimelineEvent, SyncMessageLikeEvent}, + uint, }, Client, LoopCtrl, }; @@ -44,8 +47,29 @@ fn _event_content(event: AnySyncTimelineEvent) -> Option { } } -async fn print_timeline(_room: Room) { - // TODO +async fn print_timeline(room: Room) { + let timeline = room.timeline(); + let mut timeline_stream = timeline.signal().to_stream(); + tokio::spawn(async move { + while let Some(_diff) = timeline_stream.next().await { + // Is a straight-forward CLI example of dynamic timeline items + // possible?? let event = event.unwrap(); + //if let Some(content) = + // event_content(event.event.deserialize().unwrap()) { + // println!("{content}"); + //} + } + }); + + loop { + match timeline.paginate_backwards(uint!(10)).await { + Ok(outcome) if !outcome.more_messages => break, + Ok(_) => {} + Err(e) => { + eprintln!("error paginating: {e}"); + } + } + } } #[tokio::main]