feat(sdk): Add new timeline API

This commit is contained in:
Jonas Platte
2022-08-09 17:48:17 +02:00
committed by Jonas Platte
parent 67d968d4fa
commit 54fd40d8f5
14 changed files with 1573 additions and 13 deletions

25
Cargo.lock generated
View File

@@ -179,6 +179,12 @@ dependencies = [
"serde_json",
]
[[package]]
name = "assert_matches"
version = "1.5.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9b34d609dfbaf33d6889b2b7106d3ca345eacad44200913df5ba02bfd31d2ba9"
[[package]]
name = "assign"
version = "1.1.1"
@@ -1364,6 +1370,7 @@ version = "0.1.0"
dependencies = [
"anyhow",
"futures",
"futures-signals",
"matrix-sdk",
"tokio",
"tracing-subscriber",
@@ -2302,6 +2309,7 @@ version = "0.6.0"
dependencies = [
"anyhow",
"anymap2",
"assert_matches",
"async-once-cell",
"async-stream",
"async-trait",
@@ -2320,6 +2328,7 @@ dependencies = [
"getrandom 0.2.7",
"http",
"image 0.24.3",
"indexmap",
"matches",
"matrix-sdk-base",
"matrix-sdk-common",
@@ -3660,9 +3669,9 @@ dependencies = [
[[package]]
name = "ruma"
version = "0.7.1"
version = "0.7.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3daa593bddbe225bc78760329afaba54d0c653e015f18ce6405fa723ec0f34d5"
checksum = "8dc348e3a4a18abc4e97fffa5e2e623f6edd50ba3a1dd5f47eb249fea713b69f"
dependencies = [
"assign",
"js_int",
@@ -3686,9 +3695,9 @@ dependencies = [
[[package]]
name = "ruma-client-api"
version = "0.15.0"
version = "0.15.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a2709c891d277ef94d56657c3ec92ed464779dbfff055e518425eedf11d9ecb7"
checksum = "5bcfd3a3853ffdd151fc228441dd9c9e3d835ac85560dface7abda50b3888791"
dependencies = [
"assign",
"bytes",
@@ -3703,9 +3712,9 @@ dependencies = [
[[package]]
name = "ruma-common"
version = "0.10.1"
version = "0.10.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "67dab5e934f2e280875cf3a863c14d876265bda169e4fd18334058e7307142d6"
checksum = "a1e629a01f359234798531a99ba83997abd4c15a65b5bcb8354c4171b59c25be"
dependencies = [
"base64",
"bytes",
@@ -3756,9 +3765,9 @@ dependencies = [
[[package]]
name = "ruma-macros"
version = "0.10.1"
version = "0.10.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3b3e5a61180840ebfdeb4bcc4dc4a0d0c21aa22f587360b16b785c79058d99f3"
checksum = "9f7cd8cf8771aaca36042fb7659f4647b05e74a2058d843474dde5e51a56cd85"
dependencies = [
"once_cell",
"proc-macro-crate",

View File

@@ -23,7 +23,7 @@ futures-signals = { version = "0.3.28" }
futures-util = { version = "0.3.17", default-features = false }
# FIXME: we currently can't feature flag anything in the api.udl, therefore we must enforce sliding-sync being exposed here..
# see https://github.com/matrix-org/matrix-rust-sdk/issues/1014
matrix-sdk = { path = "../../crates/matrix-sdk", features = ["anyhow", "markdown", "sliding-sync", "socks"], version = "0.6.0" }
matrix-sdk = { path = "../../crates/matrix-sdk", features = ["anyhow", "experimental-timeline", "markdown", "sliding-sync", "socks"], version = "0.6.0" }
once_cell = "1.10.0"
sanitize-filename-reader-friendly = "2.2.1"
serde = { version = "1", features = ["derive"] }

View File

@@ -42,6 +42,8 @@ appservice = ["ruma/appservice-api-s"]
image-proc = ["dep:image"]
image-rayon = ["image-proc", "image?/jpeg_rayon"]
experimental-timeline = []
sliding-sync = [
"matrix-sdk-base/sliding-sync",
"anyhow",
@@ -70,6 +72,7 @@ futures-core = "0.3.21"
futures-signals = { version = "0.3.30", default-features = false }
futures-util = { version = "0.3.21", default-features = false }
http = "0.2.6"
indexmap = "1.9.1"
matrix-sdk-base = { version = "0.6.0", path = "../matrix-sdk-base", default_features = false }
matrix-sdk-common = { version = "0.6.0", path = "../matrix-sdk-common" }
matrix-sdk-indexeddb = { version = "0.2.0", path = "../matrix-sdk-indexeddb", default-features = false, optional = true }
@@ -107,7 +110,7 @@ features = [
optional = true
[dependencies.ruma]
version = "0.7.0"
version = "0.7.4"
features = ["client-api-c", "compat", "rand", "unstable-msc2448", "unstable-msc2965"]
[target.'cfg(target_arch = "wasm32")'.dependencies]
@@ -120,6 +123,7 @@ tokio = { version = "1.17.0", default-features = false, features = ["fs", "rt"]
[dev-dependencies]
anyhow = "1.0.57"
assert_matches = "1.5.0"
dirs = "4.0.0"
futures = { version = "0.3.21", default-features = false, features = ["executor"] }
matches = "0.1.9"

View File

@@ -37,6 +37,8 @@ use ruma::{
};
use serde::de::DeserializeOwned;
#[cfg(feature = "experimental-timeline")]
use super::timeline::Timeline;
use crate::{
event_handler::{EventHandler, EventHandlerHandle, EventHandlerResult, SyncEvent},
media::{MediaFormat, MediaRequest},
@@ -251,6 +253,16 @@ impl Common {
self.client.add_room_event_handler(self.room_id(), handler)
}
/// Get a [`Timeline`] for this room.
///
/// This offers a higher-level API than event handlers, in treating things
/// like edits and reactions as updates of existing items rather than new
/// independent events.
#[cfg(feature = "experimental-timeline")]
pub fn timeline(&self) -> Timeline {
Timeline::new(self)
}
/// Fetch the event with the given `EventId` in this room.
pub async fn event(&self, event_id: &EventId) -> Result<TimelineEvent> {
let request = get_room_event::v3::Request::new(self.room_id(), event_id);

View File

@@ -9,6 +9,8 @@ mod invited;
mod joined;
mod left;
mod member;
#[cfg(feature = "experimental-timeline")]
pub mod timeline;
pub use self::{
common::{Common, Messages, MessagesOptions},

View File

@@ -0,0 +1,486 @@
// Copyright 2022 The Matrix.org Foundation C.I.C.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::sync::Arc;
use indexmap::map::Entry;
use matrix_sdk_base::deserialized_responses::EncryptionInfo;
use ruma::{
events::{
reaction::ReactionEventContent,
room::{
message::{Relation, Replacement, RoomMessageEventContent},
redaction::{
OriginalSyncRoomRedactionEvent, RoomRedactionEventContent, SyncRoomRedactionEvent,
},
},
AnyMessageLikeEventContent, AnyStateEventContent, AnySyncMessageLikeEvent,
AnySyncTimelineEvent, Relations,
},
serde::Raw,
uint, EventId, MilliSecondsSinceUnixEpoch, OwnedEventId, OwnedTransactionId, OwnedUserId,
UserId,
};
use tracing::{debug, error, info, warn};
use super::{
event_item::{BundledReactions, TimelineDetails},
find_event, EventTimelineItem, Message, TimelineInner, TimelineItem, TimelineItemContent,
TimelineKey,
};
pub(super) fn handle_live_event(
raw: Raw<AnySyncTimelineEvent>,
encryption_info: Option<EncryptionInfo>,
own_user_id: &UserId,
timeline: &TimelineInner,
) {
handle_remote_event(raw, encryption_info, own_user_id, TimelineItemPosition::End, timeline)
}
pub(super) fn handle_local_event(
txn_id: OwnedTransactionId,
content: AnyMessageLikeEventContent,
own_user_id: &UserId,
timeline: &TimelineInner,
) {
let meta = TimelineEventMetadata {
sender: own_user_id.to_owned(),
origin_server_ts: None,
raw_event: None,
is_own_event: true,
relations: None,
// FIXME: Should we supply something here for encrypted rooms?
encryption_info: None,
};
let flow = Flow::Local { txn_id };
let kind = TimelineEventKind::Message { content };
TimelineEventHandler::new(meta, flow, timeline).handle_event(kind)
}
pub(super) fn handle_back_paginated_event(
raw: Raw<AnySyncTimelineEvent>,
encryption_info: Option<EncryptionInfo>,
own_user_id: &UserId,
timeline: &TimelineInner,
) {
handle_remote_event(raw, encryption_info, own_user_id, TimelineItemPosition::Start, timeline)
}
fn handle_remote_event(
raw: Raw<AnySyncTimelineEvent>,
encryption_info: Option<EncryptionInfo>,
own_user_id: &UserId,
position: TimelineItemPosition,
timeline: &TimelineInner,
) {
let event = match raw.deserialize() {
Ok(ev) => ev,
Err(_e) => {
// TODO: Add some sort of error timeline item
return;
}
};
let sender = event.sender().to_owned();
let is_own_event = sender == own_user_id;
let meta = TimelineEventMetadata {
raw_event: Some(raw),
sender,
origin_server_ts: Some(event.origin_server_ts()),
is_own_event,
relations: event.relations().cloned(),
encryption_info,
};
let flow = Flow::Remote {
event_id: event.event_id().to_owned(),
txn_id: event.transaction_id().map(ToOwned::to_owned),
position,
};
TimelineEventHandler::new(meta, flow, timeline).handle_event(event.into())
}
enum Flow {
Local {
txn_id: OwnedTransactionId,
},
Remote {
event_id: OwnedEventId,
txn_id: Option<OwnedTransactionId>,
position: TimelineItemPosition,
},
}
impl Flow {
fn to_key(&self) -> TimelineKey {
match self {
Self::Remote { event_id, .. } => TimelineKey::EventId(event_id.to_owned()),
Self::Local { txn_id } => TimelineKey::TransactionId(txn_id.to_owned()),
}
}
}
struct TimelineEventMetadata {
raw_event: Option<Raw<AnySyncTimelineEvent>>,
sender: OwnedUserId,
origin_server_ts: Option<MilliSecondsSinceUnixEpoch>,
is_own_event: bool,
relations: Option<Relations>,
encryption_info: Option<EncryptionInfo>,
}
impl From<AnySyncTimelineEvent> for TimelineEventKind {
fn from(event: AnySyncTimelineEvent) -> Self {
match event {
AnySyncTimelineEvent::MessageLike(AnySyncMessageLikeEvent::RoomRedaction(
SyncRoomRedactionEvent::Original(OriginalSyncRoomRedactionEvent {
redacts,
content,
..
}),
)) => Self::Redaction { redacts, content },
AnySyncTimelineEvent::MessageLike(ev) => match ev.original_content() {
Some(content) => Self::Message { content },
None => Self::RedactedMessage,
},
AnySyncTimelineEvent::State(ev) => match ev.original_content() {
Some(_content) => Self::State { _content },
None => Self::RedactedState,
},
}
}
}
#[derive(Clone)]
enum TimelineEventKind {
Message { content: AnyMessageLikeEventContent },
RedactedMessage,
Redaction { redacts: OwnedEventId, content: RoomRedactionEventContent },
// FIXME: Split further for state keys of different type
State { _content: AnyStateEventContent },
RedactedState, // AnyRedactedStateEventContent
}
enum TimelineItemPosition {
Start,
End,
}
// Bundles together a few things that are needed throughout the different stages
// of handling an event (figuring out whether it should update an existing
// timeline item, transforming that item or creating a new one, updating the
// reactive Vec).
struct TimelineEventHandler<'a> {
meta: TimelineEventMetadata,
flow: Flow,
timeline: &'a TimelineInner,
event_added: bool,
}
impl<'a> TimelineEventHandler<'a> {
fn new(meta: TimelineEventMetadata, flow: Flow, timeline: &'a TimelineInner) -> Self {
Self { meta, flow, timeline, event_added: false }
}
fn handle_event(mut self, event_kind: TimelineEventKind) {
match event_kind {
TimelineEventKind::Message { content } => match content {
AnyMessageLikeEventContent::Reaction(c) => self.handle_reaction(c),
AnyMessageLikeEventContent::RoomMessage(c) => self.handle_room_message(c),
// TODO
_ => {}
},
TimelineEventKind::RedactedMessage => {
self.add(NewEventTimelineItem::redacted_message());
}
TimelineEventKind::Redaction { redacts, content } => {
self.handle_redaction(redacts, content)
}
// TODO: State events
_ => {}
}
if !self.event_added {
// TODO: Add event as raw
}
}
fn handle_room_message(&mut self, content: RoomMessageEventContent) {
match content.relates_to {
Some(Relation::Replacement(re)) => {
self.handle_room_message_edit(re);
}
_ => {
self.add(NewEventTimelineItem::message(content, self.meta.relations.clone()));
}
}
}
fn handle_room_message_edit(&mut self, replacement: Replacement) {
let event_id = &replacement.event_id;
self.maybe_update_timeline_item(event_id, "edit", |item| {
if self.meta.sender != item.sender() {
info!(
%event_id, original_sender = %item.sender(), edit_sender = %self.meta.sender,
"Event tries to edit another user's timeline item, discarding"
);
return None;
}
let msg = match &item.content {
TimelineItemContent::Message(msg) => msg,
TimelineItemContent::RedactedMessage => {
info!(
%event_id,
"Event tries to edit a non-editable timeline item, discarding"
);
return None;
}
};
let content = TimelineItemContent::Message(Message {
msgtype: replacement.new_content.msgtype,
in_reply_to: msg.in_reply_to.clone(),
edited: true,
});
Some(item.with_content(content))
});
}
// Redacted reaction events are no-ops so don't need to be handled
fn handle_reaction(&mut self, c: ReactionEventContent) {
let event_id: &EventId = &c.relates_to.event_id;
// This lock should never be contended, same as the timeline item lock.
// If this is ever run in parallel for some reason though, make sure the
// reaction lock is held for the entire time of the timeline items being
// locked so these two things can't get out of sync.
let mut lock = self.timeline.reaction_map.lock().unwrap();
let did_update = self.maybe_update_timeline_item(event_id, "reaction", |item| {
// Handling of reactions on redacted events is an open question.
// For now, ignore reactions on redacted events like Element does.
if let TimelineItemContent::RedactedMessage = item.content {
debug!(%event_id, "Ignoring reaction on redacted event");
None
} else {
let mut reactions = item.reactions.clone();
let reaction_details =
reactions.bundled.entry(c.relates_to.key.clone()).or_default();
reaction_details.count += uint!(1);
if let TimelineDetails::Ready(senders) = &mut reaction_details.senders {
senders.push(self.meta.sender.clone());
}
Some(item.with_reactions(reactions))
}
});
if did_update {
lock.insert(self.flow.to_key(), (self.meta.sender.clone(), c.relates_to));
}
}
// Redacted redactions are no-ops (unfortunately)
fn handle_redaction(&mut self, redacts: OwnedEventId, _content: RoomRedactionEventContent) {
let mut did_update = false;
// Don't release this lock until after update_timeline_item.
// See first comment in handle_reaction for why.
let mut lock = self.timeline.reaction_map.lock().unwrap();
if let Some((sender, rel)) = lock.remove(&TimelineKey::EventId(redacts.clone())) {
did_update = self.maybe_update_timeline_item(&rel.event_id, "redaction", |item| {
let mut reactions = item.reactions.clone();
let mut details_entry = match reactions.bundled.entry(rel.key) {
Entry::Occupied(o) => o,
Entry::Vacant(_) => return None,
};
let details = details_entry.get_mut();
details.count -= uint!(1);
if details.count == uint!(0) {
details_entry.remove();
return Some(item.with_reactions(reactions));
}
let senders = match &mut details.senders {
TimelineDetails::Ready(senders) => senders,
_ => {
// FIXME: We probably want to support this somehow in
// the future, but right now it's not possible.
warn!(
"inconsistent state: shouldn't have a reaction_map entry for a \
timeline item with incomplete reactions"
);
return None;
}
};
if let Some(idx) = senders.iter().position(|s| *s == sender) {
senders.remove(idx);
} else {
error!(
"inconsistent state: sender from reaction_map not in reaction sender list \
of timeline item"
);
return None;
}
if u64::from(details.count) != senders.len() as u64 {
error!("inconsistent state: reaction count differs from number of senders");
// Can't make things worse by updating the item, so no early
// return here.
}
Some(item.with_reactions(reactions))
});
if !did_update {
warn!("reaction_map out of sync with timeline items");
}
}
// Even if the event being redacted is a reaction (found in
// `reaction_map`), it can still be present in the timeline items
// directly with the raw event timeline feature (not yet implemented).
did_update |= self.update_timeline_item(&redacts, "redaction", |item| item.to_redacted());
if !did_update {
// We will want to know this when debugging redaction issues.
debug!(redaction_key = ?self.flow.to_key(), %redacts, "redaction affected no event");
}
}
fn add(&mut self, item: NewEventTimelineItem) {
self.event_added = true;
let NewEventTimelineItem { content, reactions } = item;
let item = EventTimelineItem {
key: self.flow.to_key(),
event_id: None,
sender: self.meta.sender.to_owned(),
content,
reactions,
origin_server_ts: self.meta.origin_server_ts,
is_own: self.meta.is_own_event,
encryption_info: self.meta.encryption_info.clone(),
raw: self.meta.raw_event.clone(),
};
let item = Arc::new(TimelineItem::Event(item));
let mut lock = self.timeline.items.lock_mut();
match &self.flow {
Flow::Local { .. }
| Flow::Remote { position: TimelineItemPosition::End, txn_id: None, .. } => {
lock.push_cloned(item);
}
Flow::Remote { position: TimelineItemPosition::Start, txn_id: None, .. } => {
lock.insert_cloned(0, item);
}
Flow::Remote { txn_id: Some(txn_id), event_id, position } => {
if let Some((idx, _old_item)) = find_event(&lock, txn_id) {
// TODO: Check whether anything is different about the old and new item?
lock.set_cloned(idx, item);
} else {
debug!(
%txn_id, %event_id,
"Received event with transaction ID, but didn't find matching timeline item"
);
match position {
TimelineItemPosition::Start => lock.insert_cloned(0, item),
TimelineItemPosition::End => lock.push_cloned(item),
}
}
}
}
}
/// Returns whether an update happened
fn maybe_update_timeline_item(
&self,
event_id: &EventId,
action: &str,
update: impl FnOnce(&EventTimelineItem) -> Option<EventTimelineItem>,
) -> bool {
// No point in trying to update items with relations when back-
// paginating, the event the relation applies to can't be processed yet.
if matches!(self.flow, Flow::Remote { position: TimelineItemPosition::Start, .. }) {
return false;
}
let mut lock = self.timeline.items.lock_mut();
if let Some((idx, item)) = find_event(&lock, event_id) {
if let Some(new_item) = update(item) {
lock.set_cloned(idx, Arc::new(TimelineItem::Event(new_item)));
return true;
}
} else {
debug!(%event_id, "Timeline item not found, discarding {action}");
}
false
}
/// Returns whether an update happened
fn update_timeline_item(
&self,
event_id: &EventId,
action: &str,
update: impl FnOnce(&EventTimelineItem) -> EventTimelineItem,
) -> bool {
self.maybe_update_timeline_item(event_id, action, move |item| Some(update(item)))
}
}
struct NewEventTimelineItem {
content: TimelineItemContent,
reactions: BundledReactions,
}
impl NewEventTimelineItem {
// These constructors could also be `From` implementations, but that would
// allow users to call them directly, which should not be supported
pub(crate) fn message(c: RoomMessageEventContent, relations: Option<Relations>) -> Self {
let edited = relations.as_ref().map_or(false, |r| r.replace.is_some());
let content = TimelineItemContent::Message(Message {
msgtype: c.msgtype,
in_reply_to: c.relates_to.and_then(|rel| match rel {
Relation::Reply { in_reply_to } => Some(in_reply_to.event_id),
_ => None,
}),
edited,
});
let reactions =
relations.and_then(|r| r.annotation).map(BundledReactions::from).unwrap_or_default();
Self { content, reactions }
}
pub(crate) fn redacted_message() -> Self {
Self {
content: TimelineItemContent::RedactedMessage,
reactions: BundledReactions::default(),
}
}
}

View File

@@ -0,0 +1,319 @@
// Copyright 2022 The Matrix.org Foundation C.I.C.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use indexmap::IndexMap;
use matrix_sdk_base::deserialized_responses::EncryptionInfo;
use ruma::{
events::{
relation::{AnnotationChunk, AnnotationType},
room::message::MessageType,
AnySyncTimelineEvent,
},
serde::Raw,
uint, EventId, MilliSecondsSinceUnixEpoch, OwnedEventId, OwnedTransactionId, OwnedUserId,
TransactionId, UInt, UserId,
};
/// An item in the timeline that represents at least one event.
///
/// There is always one main event that gives the `EventTimelineItem` its
/// identity (see [key](Self::key)) but in many cases, additional events like
/// reactions and edits are also part of the item.
#[derive(Clone, Debug)]
pub struct EventTimelineItem {
pub(super) key: TimelineKey,
// If this item is a local echo that has been acknowledged by the server
// but not remote-echoed yet, this field holds the event ID from the send
// response.
pub(super) event_id: Option<OwnedEventId>,
pub(super) sender: OwnedUserId,
pub(super) content: TimelineItemContent,
pub(super) reactions: BundledReactions,
pub(super) origin_server_ts: Option<MilliSecondsSinceUnixEpoch>,
pub(super) is_own: bool,
pub(super) encryption_info: Option<EncryptionInfo>,
// FIXME: Expose the raw JSON of aggregated events somehow
pub(super) raw: Option<Raw<AnySyncTimelineEvent>>,
}
macro_rules! build {
(
$ty:ident {
$( $field:ident $(: $value:expr)?, )*
..$this:ident( $($this_field:ident),* $(,)? )
}
) => {
$ty {
$( $field $(: $value)?, )*
$( $this_field: $this.$this_field.clone() ),*
}
}
}
impl EventTimelineItem {
/// Get the [`TimelineKey`] of this item.
pub fn key(&self) -> &TimelineKey {
&self.key
}
/// Get the event ID of this item.
///
/// If this returns `Some(_)`, the event was successfully created by the
/// server.
///
/// Even if the [`key()`](Self::key) of this timeline item holds a
/// transaction ID, this can be `Some(_)` as the event ID can be known not
/// just from the remote echo via `sync_events`, but also from the response
/// of the send request that created the event.
pub fn event_id(&self) -> Option<&EventId> {
match &self.key {
TimelineKey::TransactionId(_) => self.event_id.as_deref(),
TimelineKey::EventId(id) => Some(id),
}
}
/// Get the sender of this item.
pub fn sender(&self) -> &UserId {
&self.sender
}
/// Get the content of this item.
pub fn content(&self) -> &TimelineItemContent {
&self.content
}
/// Get the reactions of this item.
pub fn reactions(&self) -> &IndexMap<String, ReactionDetails> {
// FIXME: Find out the state of incomplete bundled reactions, adjust
// Ruma if necessary, return the whole BundledReactions field
&self.reactions.bundled
}
/// Get the origin server timestamp of this item.
///
/// Returns `None` if this event hasn't been echoed back by the server yet.
pub fn origin_server_ts(&self) -> Option<MilliSecondsSinceUnixEpoch> {
self.origin_server_ts
}
/// Whether this timeline item was sent by the logged-in user themselves.
pub fn is_own(&self) -> bool {
self.is_own
}
/// Get the raw JSON representation of the initial event (the one that
/// caused this timeline item to be created).
///
/// Returns `None` if this event hasn't been echoed back by the server yet.
pub fn raw(&self) -> Option<&Raw<AnySyncTimelineEvent>> {
self.raw.as_ref()
}
pub(super) fn to_redacted(&self) -> Self {
build!(Self {
// FIXME: Change when we support state events
content: TimelineItemContent::RedactedMessage,
reactions: BundledReactions::default(),
..self(key, event_id, sender, origin_server_ts, is_own, encryption_info, raw)
})
}
pub(super) fn with_event_id(&self, event_id: Option<OwnedEventId>) -> Self {
build!(Self {
event_id,
..self(key, sender, content, reactions, origin_server_ts, is_own, encryption_info, raw,)
})
}
#[rustfmt::skip]
pub(super) fn with_content(&self, content: TimelineItemContent) -> Self {
build!(Self {
content,
..self(
key, event_id, sender, reactions, origin_server_ts, is_own, encryption_info, raw,
)
})
}
#[rustfmt::skip]
pub(super) fn with_reactions(&self, reactions: BundledReactions) -> Self {
build!(Self {
reactions,
..self(
key, event_id, sender, content, origin_server_ts, is_own, encryption_info, raw,
)
})
}
}
/// A unique identifier for a timeline item.
///
/// This identifier is used to find the item in the timeline in order to update
/// its state.
///
/// When an event is created locally, the timeline reflects this with an item
/// that has a [`TransactionId`](Self::TransactionId) key. Once the server has
/// acknowledged the event and given it an ID, that item's key is replaced by
/// [`EventId`](Self::EventId) containing the new ID.
///
/// When an event related to the original event whose ID is stored in a
/// [`TimelineKey`] is received, the key is left untouched, but other parts of
/// the timeline item may be updated. Thus, the current data model is only able
/// to handle relations that reference the initial event that resulted in a
/// timeline item being created, not other related events. At the time of
/// writing, there is no relation that is meant to refer to other events that
/// only exist for their relation (e.g. edits, replies).
#[derive(Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Hash)]
pub enum TimelineKey {
/// Transaction ID, for an event that was created locally and hasn't been
/// acknowledged by the server yet.
TransactionId(OwnedTransactionId),
/// Event ID, for an event that is synced with the server.
EventId(OwnedEventId),
}
impl PartialEq<TimelineKey> for &TransactionId {
fn eq(&self, key: &TimelineKey) -> bool {
matches!(key, TimelineKey::TransactionId(txn_id) if txn_id == self)
}
}
impl PartialEq<TimelineKey> for &OwnedTransactionId {
fn eq(&self, key: &TimelineKey) -> bool {
matches!(key, TimelineKey::TransactionId(txn_id) if txn_id == *self)
}
}
impl PartialEq<TimelineKey> for &EventId {
fn eq(&self, key: &TimelineKey) -> bool {
matches!(key, TimelineKey::EventId(event_id) if event_id == self)
}
}
/// Some details of an [`EventTimelineItem`] that may require server requests
/// other than just the regular
/// [`sync_events`][ruma::api::client::sync::sync_events].
#[derive(Clone, Debug)]
pub enum TimelineDetails<T> {
/// The details are not available yet, and have not been request from the
/// server.
Unavailable,
/// The details are not available yet, but have been requested.
Pending,
/// The details are available.
Ready(T),
}
/// The content of an [`EventTimelineItem`].
#[derive(Clone, Debug)]
pub enum TimelineItemContent {
/// An `m.room.message` event or extensible event, including edits.
Message(Message),
/// A redacted message.
RedactedMessage,
}
/// An `m.room.message` event or extensible event, including edits.
#[derive(Clone, Debug)]
pub struct Message {
pub(super) msgtype: MessageType,
// TODO: Add everything required to display the replied-to event, plus a
// 'loading' state that is entered at first, until the user requests the
// reply to be loaded.
pub(super) in_reply_to: Option<OwnedEventId>,
pub(super) edited: bool,
}
impl Message {
/// Get the `msgtype`-specific data of this message.
pub fn msgtype(&self) -> &MessageType {
&self.msgtype
}
/// Get the event ID of the event this message is replying to, if any.
pub fn in_reply_to(&self) -> Option<&EventId> {
self.in_reply_to.as_deref()
}
/// Get the edit state of this message (has been edited: `true` / `false`).
pub fn is_edited(&self) -> bool {
self.edited
}
}
#[derive(Clone, Debug)]
pub struct BundledReactions {
/// Whether all reactions are known, or some may be missing.
///
/// If this is `false`, the remaining reactions can be fetched via **TODO**.
pub complete: bool, // FIXME: Unclear whether this is needed
/// The reactions.
///
/// Key: The reaction, usually an emoji.\
/// Value: The count.
pub bundled: IndexMap<String, ReactionDetails>,
}
impl From<AnnotationChunk> for BundledReactions {
fn from(ann: AnnotationChunk) -> Self {
let bundled = ann
.chunk
.into_iter()
.filter_map(|a| {
(a.annotation_type == AnnotationType::Reaction).then(|| {
let details =
ReactionDetails { count: a.count, senders: TimelineDetails::Unavailable };
(a.key, details)
})
})
.collect();
BundledReactions { bundled, complete: ann.next_batch.is_none() }
}
}
impl Default for BundledReactions {
fn default() -> Self {
Self { complete: true, bundled: IndexMap::new() }
}
}
/// The details of a group of reaction events on the same event with the same
/// key.
#[derive(Clone, Debug)]
pub struct ReactionDetails {
/// The amount of reactions with this key.
pub count: UInt,
/// The senders of the reactions.
pub senders: TimelineDetails<Vec<OwnedUserId>>,
}
impl Default for ReactionDetails {
fn default() -> Self {
Self { count: uint!(0), senders: TimelineDetails::Ready(Vec::new()) }
}
}
/// The result of a successful pagination request.
#[derive(Debug)]
#[non_exhaustive]
pub struct PaginationOutcome {
/// Whether there's more messages to be paginated.
pub more_messages: bool,
}

View File

@@ -0,0 +1,243 @@
// Copyright 2022 The Matrix.org Foundation C.I.C.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//! A high-level view into a room's contents.
//!
//! See [`Timeline`] for details.
use std::{
collections::HashMap,
sync::{Arc, Mutex},
};
use futures_core::Stream;
use futures_signals::signal_vec::{MutableVec, SignalVec, SignalVecExt, VecDiff};
use matrix_sdk_base::deserialized_responses::EncryptionInfo;
use ruma::{
assign,
events::{reaction::Relation as AnnotationRelation, AnyMessageLikeEventContent},
OwnedEventId, OwnedUserId, TransactionId, UInt,
};
use tracing::{debug, error, instrument};
use super::{Joined, Room};
use crate::{
event_handler::EventHandlerDropGuard,
room::{self, MessagesOptions},
Result,
};
mod event_handler;
mod event_item;
mod virtual_item;
use self::event_handler::{handle_back_paginated_event, handle_live_event, handle_local_event};
pub use self::{
event_item::{
EventTimelineItem, Message, PaginationOutcome, ReactionDetails, TimelineDetails,
TimelineItemContent, TimelineKey,
},
virtual_item::VirtualTimelineItem,
};
/// A high-level view into a regular¹ room's contents.
///
/// ¹ This type is meant to be used in the context of rooms without a
/// `room_type`, that is rooms that are primarily used to exchange text
/// messages.
#[derive(Debug)]
pub struct Timeline {
inner: TimelineInner,
room: room::Common,
start_token: Mutex<Option<String>>,
_end_token: Mutex<Option<String>>,
_event_handler_guard: EventHandlerDropGuard,
}
#[derive(Clone, Debug, Default)]
struct TimelineInner {
items: MutableVec<Arc<TimelineItem>>,
// Reaction event / txn ID => sender and reaction data
reaction_map: Arc<Mutex<HashMap<TimelineKey, (OwnedUserId, AnnotationRelation)>>>,
}
impl Timeline {
pub(super) fn new(room: &room::Common) -> Self {
let inner = TimelineInner::default();
let handle = room.add_event_handler({
let inner = inner.clone();
move |event, encryption_info: Option<EncryptionInfo>, room: Room| {
let inner = inner.clone();
async move {
handle_live_event(event, encryption_info, room.own_user_id(), &inner);
}
}
});
let _event_handler_guard = room.client.event_handler_drop_guard(handle);
Timeline {
inner,
room: room.clone(),
start_token: Mutex::new(None),
_end_token: Mutex::new(None),
_event_handler_guard,
}
}
/// Add more events to the start of the timeline.
#[instrument(skip(self), fields(room_id = %self.room.room_id()))]
pub async fn paginate_backwards(&self, limit: UInt) -> Result<PaginationOutcome> {
let start = self.start_token.lock().unwrap().clone();
let messages = self
.room
.messages(assign!(MessagesOptions::backward(), {
from: start.as_deref(),
limit,
}))
.await?;
let outcome = PaginationOutcome { more_messages: messages.end.is_some() };
*self.start_token.lock().unwrap() = messages.end;
let own_user_id = self.room.own_user_id();
for room_ev in messages.chunk {
handle_back_paginated_event(
room_ev.event.cast(),
room_ev.encryption_info,
own_user_id,
&self.inner,
);
}
Ok(outcome)
}
/// Get a signal of the timeline's items.
///
/// You can poll this signal to receive updates, the first of which will
/// be the full list of items currently available.
///
/// See [`SignalVecExt`](futures_signals::signal_vec::SignalVecExt) for a
/// high-level API on top of [`SignalVec`].
pub fn signal(&self) -> impl SignalVec<Item = Arc<TimelineItem>> {
self.inner.items.signal_vec_cloned()
}
/// Get a stream of timeline changes.
///
/// This is a convenience shorthand for `timeline.signal().to_stream()`.
pub fn stream(&self) -> impl Stream<Item = VecDiff<Arc<TimelineItem>>> {
self.signal().to_stream()
}
/// Send a message to the room, and add it to the timeline as a local echo.
///
/// For simplicity, this method doesn't currently allow custom message
/// types.
///
/// If the encryption feature is enabled, this method will transparently
/// encrypt the room message if the room is encrypted.
///
/// # Arguments
///
/// * `content` - The content of the message event.
///
/// * `txn_id` - A locally-unique ID describing a message transaction with
/// the homeserver. Unless you're doing something special, you can pass in
/// `None` which will create a suitable one for you automatically.
/// * On the sending side, this field is used for re-trying earlier
/// failed transactions. Subsequent messages *must never* re-use an
/// earlier transaction ID.
/// * On the receiving side, the field is used for recognizing our own
/// messages when they arrive down the sync: the server includes the
/// ID in the [`MessageLikeUnsigned`] field `transaction_id` of the
/// corresponding [`SyncMessageLikeEvent`], but only for the *sending*
/// device. Other devices will not see it.
///
/// [`MessageLikeUnsigned`]: ruma::events::MessageLikeUnsigned
/// [`SyncMessageLikeEvent`]: ruma::events::SyncMessageLikeEvent
#[instrument(skip(self, content), fields(room_id = %self.room.room_id()))]
pub async fn send(
&self,
content: AnyMessageLikeEventContent,
txn_id: Option<&TransactionId>,
) -> Result<()> {
let txn_id = txn_id.map_or_else(TransactionId::new, ToOwned::to_owned);
handle_local_event(txn_id.clone(), content.clone(), self.room.own_user_id(), &self.inner);
// If this room isn't actually in joined state, we'll get a server error.
// Not ideal, but works for now.
let room = Joined { inner: self.room.clone() };
let response = room.send(content, Some(&txn_id)).await?;
add_event_id(&self.inner, &txn_id, response.event_id);
Ok(())
}
}
/// A single entry in timeline.
#[derive(Clone, Debug)]
#[allow(clippy::large_enum_variant)]
pub enum TimelineItem {
/// An event or aggregation of multiple events.
Event(EventTimelineItem),
/// An item that doesn't correspond to an event, for example the user's own
/// read marker.
Virtual(VirtualTimelineItem),
}
impl TimelineItem {
/// Get the inner `EventTimelineItem`, if this is a `TimelineItem::Event`.
pub fn as_event(&self) -> Option<&EventTimelineItem> {
match self {
Self::Event(v) => Some(v),
_ => None,
}
}
}
// FIXME: Put an upper bound on timeline size or add a separate map to look up
// the index of a timeline item by its key, to avoid large linear scans.
fn find_event(
lock: &[Arc<TimelineItem>],
key: impl PartialEq<TimelineKey>,
) -> Option<(usize, &EventTimelineItem)> {
lock.iter()
.enumerate()
.filter_map(|(idx, item)| Some((idx, item.as_event()?)))
.rfind(|(_, it)| key == it.key)
}
fn add_event_id(items: &TimelineInner, txn_id: &TransactionId, event_id: OwnedEventId) {
let mut lock = items.items.lock_mut();
if let Some((idx, item)) = find_event(&lock, txn_id) {
match &item.key {
TimelineKey::TransactionId(_) => {
lock.set_cloned(
idx,
Arc::new(TimelineItem::Event(item.with_event_id(Some(event_id)))),
);
}
TimelineKey::EventId(ev_id) => {
if *ev_id != event_id {
error!("remote echo and send-event response disagree on the event ID");
}
}
}
} else {
debug!(%txn_id, "Timeline item not found, can't mark as sent");
}
}

View File

@@ -0,0 +1,22 @@
// Copyright 2022 The Matrix.org Foundation C.I.C.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
/// A [`TimelineItem`](super::TimelineItem) that doesn't correspond to an event.
#[derive(Clone, Debug)]
pub enum VirtualTimelineItem {
/// A divider between messages of two days.
DayDivider,
/// The user's own read marker.
ReadMarker,
}

View File

@@ -13,6 +13,16 @@ mod client;
mod refresh_token;
mod room;
#[cfg(all(test, not(target_arch = "wasm32")))]
#[ctor::ctor]
fn init_logging() {
use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt};
tracing_subscriber::registry()
.with(tracing_subscriber::EnvFilter::from_default_env())
.with(tracing_subscriber::fmt::layer().with_test_writer())
.init();
}
async fn test_client_builder() -> (ClientBuilder, MockServer) {
let server = MockServer::start().await;
let builder =

View File

@@ -1,3 +1,4 @@
mod common;
mod joined;
mod left;
mod timeline;

View File

@@ -0,0 +1,427 @@
#![cfg(feature = "experimental-timeline")]
use std::{sync::Arc, time::Duration};
use assert_matches::assert_matches;
use futures_signals::signal_vec::{SignalVecExt, VecDiff};
use futures_util::StreamExt;
use matrix_sdk::{
config::SyncSettings,
room::timeline::{TimelineDetails, TimelineItemContent, TimelineKey},
ruma::MilliSecondsSinceUnixEpoch,
};
use matrix_sdk_common::executor::spawn;
use matrix_sdk_test::{async_test, test_json, EventBuilder, JoinedRoomBuilder, TimelineTestEvent};
use ruma::{
event_id,
events::room::message::{MessageType, RoomMessageEventContent},
room_id, uint, user_id, TransactionId,
};
use serde_json::json;
use wiremock::{
matchers::{header, method, path_regex},
Mock, ResponseTemplate,
};
use crate::{logged_in_client, mock_sync};
#[async_test]
async fn edit() {
let room_id = room_id!("!a98sd12bjh:example.org");
let (client, server) = logged_in_client().await;
let sync_settings = SyncSettings::new().timeout(Duration::from_millis(3000));
let mut ev_builder = EventBuilder::new();
ev_builder.add_joined_room(JoinedRoomBuilder::new(room_id));
mock_sync(&server, ev_builder.build_json_sync_response(), None).await;
let _response = client.sync_once(sync_settings.clone()).await.unwrap();
server.reset().await;
let room = client.get_room(room_id).unwrap();
let timeline = room.timeline();
let mut timeline_stream = timeline.signal().to_stream();
ev_builder.add_joined_room(JoinedRoomBuilder::new(room_id).add_timeline_event(
TimelineTestEvent::Custom(json!({
"content": {
"body": "hello",
"msgtype": "m.text",
},
"event_id": "$msda7m:localhost",
"origin_server_ts": 152037280,
"sender": "@alice:example.org",
"type": "m.room.message",
})),
));
mock_sync(&server, ev_builder.build_json_sync_response(), None).await;
let _response = client.sync_once(sync_settings.clone()).await.unwrap();
server.reset().await;
let first =
assert_matches!(timeline_stream.next().await, Some(VecDiff::Push { value }) => value);
let msg = assert_matches!(
first.as_event().unwrap().content(),
TimelineItemContent::Message(msg) => msg
);
assert_matches!(msg.msgtype(), MessageType::Text(_));
assert_matches!(msg.in_reply_to(), None);
assert!(!msg.is_edited());
ev_builder.add_joined_room(
JoinedRoomBuilder::new(room_id)
.add_timeline_event(TimelineTestEvent::Custom(json!({
"content": {
"body": "Test",
"formatted_body": "<em>Test</em>",
"msgtype": "m.text",
"format": "org.matrix.custom.html",
},
"event_id": "$7at8sd:localhost",
"origin_server_ts": 152038280,
"sender": "@bob:example.org",
"type": "m.room.message",
})))
.add_timeline_event(TimelineTestEvent::Custom(json!({
"content": {
"body": " * hi",
"m.new_content": {
"body": "hi",
"msgtype": "m.text",
},
"m.relates_to": {
"event_id": "$msda7m:localhost",
"rel_type": "m.replace",
},
"msgtype": "m.text",
},
"event_id": "$msda7m2:localhost",
"origin_server_ts": 159056300,
"sender": "@alice:example.org",
"type": "m.room.message",
}))),
);
mock_sync(&server, ev_builder.build_json_sync_response(), None).await;
let _response = client.sync_once(sync_settings.clone()).await.unwrap();
server.reset().await;
let second =
assert_matches!(timeline_stream.next().await, Some(VecDiff::Push { value }) => value);
let item = second.as_event().unwrap();
assert_eq!(item.origin_server_ts(), Some(MilliSecondsSinceUnixEpoch(uint!(152038280))));
assert!(item.event_id().is_some());
assert!(!item.is_own());
assert!(item.raw().is_some());
let msg = assert_matches!(item.content(), TimelineItemContent::Message(msg) => msg);
assert_matches!(msg.msgtype(), MessageType::Text(_));
assert_matches!(msg.in_reply_to(), None);
assert!(!msg.is_edited());
let edit = assert_matches!(
timeline_stream.next().await,
Some(VecDiff::UpdateAt { index: 0, value }) => value
);
let edited = assert_matches!(
edit.as_event().unwrap().content(),
TimelineItemContent::Message(msg) => msg
);
let text = assert_matches!(edited.msgtype(), MessageType::Text(text) => text);
assert_eq!(text.body, "hi");
assert_matches!(edited.in_reply_to(), None);
assert!(edited.is_edited());
}
#[async_test]
async fn echo() {
let room_id = room_id!("!a98sd12bjh:example.org");
let (client, server) = logged_in_client().await;
let sync_settings = SyncSettings::new().timeout(Duration::from_millis(3000));
let mut ev_builder = EventBuilder::new();
ev_builder.add_joined_room(JoinedRoomBuilder::new(room_id));
mock_sync(&server, ev_builder.build_json_sync_response(), None).await;
let _response = client.sync_once(sync_settings.clone()).await.unwrap();
server.reset().await;
let room = client.get_room(room_id).unwrap();
let timeline = Arc::new(room.timeline());
let mut timeline_stream = timeline.signal().to_stream();
let event_id = event_id!("$wWgymRfo7ri1uQx0NXO40vLJ");
let txn_id: &TransactionId = "my-txn-id".into();
Mock::given(method("PUT"))
.and(path_regex(r"^/_matrix/client/r0/rooms/.*/send/.*"))
.and(header("authorization", "Bearer 1234"))
.respond_with(ResponseTemplate::new(200).set_body_json(&json!({ "event_id": event_id })))
.mount(&server)
.await;
// Don't move the original timeline, it must live until the end of the test
let timeline = timeline.clone();
let send_hdl = spawn(async move {
timeline
.send(RoomMessageEventContent::text_plain("Hello, World!").into(), Some(txn_id))
.await
});
let local_echo =
assert_matches!(timeline_stream.next().await, Some(VecDiff::Push { value }) => value);
let item = local_echo.as_event().unwrap();
assert!(item.event_id().is_none());
assert!(item.is_own());
assert_matches!(item.key(), TimelineKey::TransactionId(_));
assert_eq!(item.origin_server_ts(), None);
assert_matches!(item.raw(), None);
let msg = assert_matches!(item.content(), TimelineItemContent::Message(msg) => msg);
let text = assert_matches!(msg.msgtype(), MessageType::Text(text) => text);
assert_eq!(text.body, "Hello, World!");
// Wait for the sending to finish and assert everything was successful
send_hdl.await.unwrap().unwrap();
let sent_confirmation = assert_matches!(
timeline_stream.next().await,
Some(VecDiff::UpdateAt { index: 0, value }) => value
);
let item = sent_confirmation.as_event().unwrap();
assert!(item.event_id().is_some());
assert!(item.is_own());
assert_matches!(item.key(), TimelineKey::TransactionId(_));
assert_eq!(item.origin_server_ts(), None);
assert_matches!(item.raw(), None);
ev_builder.add_joined_room(JoinedRoomBuilder::new(room_id).add_timeline_event(
TimelineTestEvent::Custom(json!({
"content": {
"body": "Hello, World!",
"msgtype": "m.text",
},
"event_id": "$7at8sd:localhost",
"origin_server_ts": 152038280,
"sender": "@example:localhost",
"type": "m.room.message",
"unsigned": { "transaction_id": txn_id, },
})),
));
mock_sync(&server, ev_builder.build_json_sync_response(), None).await;
let _response = client.sync_once(sync_settings.clone()).await.unwrap();
server.reset().await;
let remote_echo = assert_matches!(
timeline_stream.next().await,
Some(VecDiff::UpdateAt { index: 0, value }) => value
);
let item = remote_echo.as_event().unwrap();
assert!(item.event_id().is_some());
assert!(item.is_own());
assert_eq!(item.origin_server_ts(), Some(MilliSecondsSinceUnixEpoch(uint!(152038280))));
assert_matches!(item.key(), TimelineKey::EventId(_));
assert_matches!(item.raw(), Some(_));
}
#[async_test]
async fn back_pagination() {
let room_id = room_id!("!a98sd12bjh:example.org");
let (client, server) = logged_in_client().await;
let sync_settings = SyncSettings::new().timeout(Duration::from_millis(3000));
let mut ev_builder = EventBuilder::new();
ev_builder.add_joined_room(JoinedRoomBuilder::new(room_id));
mock_sync(&server, ev_builder.build_json_sync_response(), None).await;
let _response = client.sync_once(sync_settings.clone()).await.unwrap();
server.reset().await;
let room = client.get_room(room_id).unwrap();
let timeline = Arc::new(room.timeline());
let mut timeline_stream = timeline.signal().to_stream();
Mock::given(method("GET"))
.and(path_regex(r"^/_matrix/client/r0/rooms/.*/messages$"))
.and(header("authorization", "Bearer 1234"))
.respond_with(ResponseTemplate::new(200).set_body_json(&*test_json::ROOM_MESSAGES_BATCH_1))
.expect(1)
.named("messages_batch_1")
.mount(&server)
.await;
timeline.paginate_backwards(uint!(10)).await.unwrap();
let message = assert_matches!(
timeline_stream.next().await,
Some(VecDiff::Push { value }) => value
);
let msg = assert_matches!(
message.as_event().unwrap().content(),
TimelineItemContent::Message(msg) => msg
);
let text = assert_matches!(msg.msgtype(), MessageType::Text(text) => text);
assert_eq!(text.body, "hello world");
let message = assert_matches!(
timeline_stream.next().await,
Some(VecDiff::InsertAt { index: 0, value }) => value
);
let msg = assert_matches!(
message.as_event().unwrap().content(),
TimelineItemContent::Message(msg) => msg
);
let text = assert_matches!(msg.msgtype(), MessageType::Text(text) => text);
assert_eq!(text.body, "the world is big");
}
#[async_test]
async fn reaction() {
let room_id = room_id!("!a98sd12bjh:example.org");
let (client, server) = logged_in_client().await;
let sync_settings = SyncSettings::new().timeout(Duration::from_millis(3000));
let mut ev_builder = EventBuilder::new();
ev_builder.add_joined_room(JoinedRoomBuilder::new(room_id));
mock_sync(&server, ev_builder.build_json_sync_response(), None).await;
let _response = client.sync_once(sync_settings.clone()).await.unwrap();
server.reset().await;
let room = client.get_room(room_id).unwrap();
let timeline = room.timeline();
let mut timeline_stream = timeline.signal().to_stream();
ev_builder.add_joined_room(
JoinedRoomBuilder::new(room_id)
.add_timeline_event(TimelineTestEvent::Custom(json!({
"content": {
"body": "hello",
"msgtype": "m.text",
},
"event_id": "$TTvQUp1e17qkw41rBSjpZ",
"origin_server_ts": 152037280,
"sender": "@alice:example.org",
"type": "m.room.message",
})))
.add_timeline_event(TimelineTestEvent::Custom(json!({
"content": {
"m.relates_to": {
"event_id": "$TTvQUp1e17qkw41rBSjpZ",
"key": "👍",
"rel_type": "m.annotation",
},
},
"event_id": "$031IXQRi27504",
"origin_server_ts": 152038300,
"sender": "@bob:example.org",
"type": "m.reaction",
}))),
);
mock_sync(&server, ev_builder.build_json_sync_response(), None).await;
let _response = client.sync_once(sync_settings.clone()).await.unwrap();
server.reset().await;
let message =
assert_matches!(timeline_stream.next().await, Some(VecDiff::Push { value }) => value);
assert_matches!(message.as_event().unwrap().content(), TimelineItemContent::Message(_));
let updated_message = assert_matches!(
timeline_stream.next().await,
Some(VecDiff::UpdateAt { index: 0, value }) => value
);
let event_item = updated_message.as_event().unwrap();
let msg = assert_matches!(event_item.content(), TimelineItemContent::Message(msg) => msg);
assert!(!msg.is_edited());
assert_eq!(event_item.reactions().len(), 1);
let details = &event_item.reactions()["👍"];
assert_eq!(details.count, uint!(1));
let senders = assert_matches!(&details.senders, TimelineDetails::Ready(s) => s);
assert_eq!(*senders, vec![user_id!("@bob:example.org").to_owned()]);
// TODO: After adding raw timeline items, check for one here
ev_builder.add_joined_room(JoinedRoomBuilder::new(room_id).add_timeline_event(
TimelineTestEvent::Custom(json!({
"content": {},
"redacts": "$031IXQRi27504",
"event_id": "$N6eUCBc3vu58PL8TobGaVQzM",
"sender": "@bob:example.org",
"origin_server_ts": 152037280,
"type": "m.room.redaction",
})),
));
mock_sync(&server, ev_builder.build_json_sync_response(), None).await;
let _response = client.sync_once(sync_settings.clone()).await.unwrap();
server.reset().await;
let updated_message = assert_matches!(
timeline_stream.next().await,
Some(VecDiff::UpdateAt { index: 0, value }) => value
);
let event_item = updated_message.as_event().unwrap();
let msg = assert_matches!(event_item.content(), TimelineItemContent::Message(msg) => msg);
assert!(!msg.is_edited());
assert_eq!(event_item.reactions().len(), 0);
}
#[async_test]
async fn redacted_message() {
let room_id = room_id!("!a98sd12bjh:example.org");
let (client, server) = logged_in_client().await;
let sync_settings = SyncSettings::new().timeout(Duration::from_millis(3000));
let mut ev_builder = EventBuilder::new();
ev_builder.add_joined_room(JoinedRoomBuilder::new(room_id));
mock_sync(&server, ev_builder.build_json_sync_response(), None).await;
let _response = client.sync_once(sync_settings.clone()).await.unwrap();
server.reset().await;
let room = client.get_room(room_id).unwrap();
let timeline = room.timeline();
let mut timeline_stream = timeline.signal().to_stream();
ev_builder.add_joined_room(
JoinedRoomBuilder::new(room_id)
.add_timeline_event(TimelineTestEvent::Custom(json!({
"content": {},
"event_id": "$eeG0HA0FAZ37wP8kXlNkxx3I",
"origin_server_ts": 152035910,
"sender": "@alice:example.org",
"type": "m.room.message",
"unsigned": {
"redacted_because": {
"content": {},
"redacts": "$eeG0HA0FAZ37wP8kXlNkxx3I",
"event_id": "$N6eUCBc3vu58PL8TobGaVQzM",
"sender": "@alice:example.org",
"origin_server_ts": 152037280,
"type": "m.room.redaction",
},
},
})))
.add_timeline_event(TimelineTestEvent::Custom(json!({
"content": {},
"redacts": "$eeG0HA0FAZ37wP8kXlNkxx3I",
"event_id": "$N6eUCBc3vu58PL8TobGaVQzM",
"sender": "@alice:example.org",
"origin_server_ts": 152037280,
"type": "m.room.redaction",
}))),
);
mock_sync(&server, ev_builder.build_json_sync_response(), None).await;
let _response = client.sync_once(sync_settings.clone()).await.unwrap();
server.reset().await;
let first =
assert_matches!(timeline_stream.next().await, Some(VecDiff::Push { value }) => value);
assert_matches!(first.as_event().unwrap().content(), TimelineItemContent::RedactedMessage);
// TODO: After adding raw timeline items, check for one here
}

View File

@@ -11,11 +11,12 @@ test = false
[dependencies]
anyhow = "1"
futures = "0.3"
futures-signals = { version = "0.3.30", default-features = false }
tokio = { version = "1.20.1", features = ["macros", "rt-multi-thread"] }
tracing-subscriber = "0.3.15"
url = "2.2.2"
[dependencies.matrix-sdk]
path = "../../crates/matrix-sdk"
features = ["sled"]
features = ["experimental-timeline", "sled"]
version = "0.6.0"

View File

@@ -1,5 +1,7 @@
use std::{env, process::exit, sync::Mutex, time::Duration};
use futures::StreamExt;
use futures_signals::signal_vec::SignalVecExt;
use matrix_sdk::{
self,
config::SyncSettings,
@@ -7,6 +9,7 @@ use matrix_sdk::{
ruma::{
api::client::filter::{FilterDefinition, LazyLoadOptions},
events::{AnySyncMessageLikeEvent, AnySyncTimelineEvent, SyncMessageLikeEvent},
uint,
},
Client, LoopCtrl,
};
@@ -44,8 +47,29 @@ fn _event_content(event: AnySyncTimelineEvent) -> Option<String> {
}
}
async fn print_timeline(_room: Room) {
// TODO
async fn print_timeline(room: Room) {
let timeline = room.timeline();
let mut timeline_stream = timeline.signal().to_stream();
tokio::spawn(async move {
while let Some(_diff) = timeline_stream.next().await {
// Is a straight-forward CLI example of dynamic timeline items
// possible?? let event = event.unwrap();
//if let Some(content) =
// event_content(event.event.deserialize().unwrap()) {
// println!("{content}");
//}
}
});
loop {
match timeline.paginate_backwards(uint!(10)).await {
Ok(outcome) if !outcome.more_messages => break,
Ok(_) => {}
Err(e) => {
eprintln!("error paginating: {e}");
}
}
}
}
#[tokio::main]