Merge pull request #2142 from matrix-org/andybalaam/cache-last-event-in-roominfo

Store the last timeline event in RoomInfo
This commit is contained in:
Damir Jelić
2023-07-10 11:30:13 +02:00
committed by GitHub
20 changed files with 1488 additions and 100 deletions

View File

@@ -20,7 +20,7 @@ async function addMachineToMachine(machineToAdd, machine) {
await machineToAdd.receiveSyncChanges(toDeviceEvents, changedDevices, oneTimeKeyCounts, unusedFallbackKeys),
);
expect(receiveSyncChanges).toEqual([]);
expect(receiveSyncChanges).toEqual([[], []]);
const outgoingRequests = await machineToAdd.outgoingRequests();

View File

@@ -217,7 +217,7 @@ describe(OlmMachine.name, () => {
await m.receiveSyncChanges(toDeviceEvents, changedDevices, oneTimeKeyCounts, unusedFallbackKeys),
);
expect(receiveSyncChanges).toEqual([]);
expect(receiveSyncChanges).toEqual([[], []]);
});
test("can receive sync changes with unusedFallbackKeys as undefined", async () => {
@@ -230,7 +230,7 @@ describe(OlmMachine.name, () => {
await m.receiveSyncChanges(toDeviceEvents, changedDevices, oneTimeKeyCounts, undefined),
);
expect(receiveSyncChanges).toEqual([]);
expect(receiveSyncChanges).toEqual([[], []]);
});
test("can get the outgoing requests that need to be send out", async () => {
@@ -244,7 +244,7 @@ describe(OlmMachine.name, () => {
await m.receiveSyncChanges(toDeviceEvents, changedDevices, oneTimeKeyCounts, unusedFallbackKeys),
);
expect(receiveSyncChanges).toEqual([]);
expect(receiveSyncChanges).toEqual([[], []]);
const outgoingRequests = await m.outgoingRequests();

View File

@@ -139,7 +139,7 @@ describe(OlmMachine.name, () => {
await m.receiveSyncChanges(toDeviceEvents, changedDevices, oneTimeKeyCounts, unusedFallbackKeys),
);
expect(receiveSyncChanges).toEqual([]);
expect(receiveSyncChanges).toEqual([[], []]);
});
test("can get the outgoing requests that need to be send out", async () => {
@@ -153,7 +153,7 @@ describe(OlmMachine.name, () => {
await m.receiveSyncChanges(toDeviceEvents, changedDevices, oneTimeKeyCounts, unusedFallbackKeys),
);
expect(receiveSyncChanges).toEqual([]);
expect(receiveSyncChanges).toEqual([[], []]);
const outgoingRequests = await m.outgoingRequests();

View File

@@ -373,10 +373,8 @@ impl RoomListItem {
self.inner.unsubscribe();
}
fn latest_event(&self) -> Option<Arc<EventTimelineItem>> {
RUNTIME.block_on(async {
self.inner.latest_event().await.map(EventTimelineItem).map(Arc::new)
})
async fn latest_event(&self) -> Option<Arc<EventTimelineItem>> {
self.inner.latest_event().await.map(EventTimelineItem).map(Arc::new)
}
fn has_unread_notifications(&self) -> bool {

View File

@@ -56,6 +56,8 @@ use tokio::sync::RwLock;
use tokio::sync::RwLockReadGuard;
use tracing::{debug, info, instrument, trace, warn};
#[cfg(all(feature = "e2e-encryption", feature = "experimental-sliding-sync"))]
use crate::latest_event::{is_suitable_for_latest_event, PossibleLatestEvent};
use crate::{
deserialized_responses::{AmbiguityChanges, MembersResponse, SyncTimelineEvent},
error::Result,
@@ -561,24 +563,87 @@ impl BaseClient {
changed_devices: &api::sync::sync_events::DeviceLists,
one_time_keys_counts: &BTreeMap<ruma::DeviceKeyAlgorithm, UInt>,
unused_fallback_keys: Option<&[ruma::DeviceKeyAlgorithm]>,
#[cfg(feature = "experimental-sliding-sync")] changes: &mut StateChanges,
#[cfg(not(feature = "experimental-sliding-sync"))] _changes: &mut StateChanges,
) -> Result<Vec<Raw<ruma::events::AnyToDeviceEvent>>> {
if let Some(o) = self.olm_machine().await.as_ref() {
// Let the crypto machine handle the sync response, this
// decrypts to-device events, but leaves room events alone.
// This makes sure that we have the decryption keys for the room
// events at hand.
Ok(o.receive_sync_changes(
to_device_events,
changed_devices,
one_time_keys_counts,
unused_fallback_keys,
)
.await?)
let (events, room_key_updates) = o
.receive_sync_changes(
to_device_events,
changed_devices,
one_time_keys_counts,
unused_fallback_keys,
)
.await?;
#[cfg(feature = "experimental-sliding-sync")]
for room_key_update in room_key_updates {
if let Some(mut room) = self.get_room(&room_key_update.room_id) {
self.decrypt_latest_events(&mut room, changes).await;
}
}
#[cfg(not(feature = "experimental-sliding-sync"))]
drop(room_key_updates); // Silence unused variable warning
Ok(events)
} else {
// If we have no OlmMachine, just return the events that were passed in.
// This should not happen unless we forget to set things up by calling
// set_session_meta().
Ok(to_device_events)
}
}
/// Decrypt any of this room's latest_encrypted_events
/// that we can and if we can, change latest_event to reflect what we
/// found, and remove any older encrypted events from
/// latest_encrypted_events.
#[cfg(all(feature = "e2e-encryption", feature = "experimental-sliding-sync"))]
async fn decrypt_latest_events(&self, room: &mut Room, changes: &mut StateChanges) {
// Try to find a message we can decrypt and is suitable for using as the latest
// event. If we found one, set it as the latest and delete any older
// encrypted events
if let Some((found, found_index)) = self.decrypt_latest_suitable_event(room).await {
room.on_latest_event_decrypted(found, found_index);
changes.room_infos.insert(room.room_id().to_owned(), room.clone_info());
}
}
/// Attempt to decrypt a latest event, trying the latest stored encrypted
/// one first, and walking backwards, stopping when we find an event
/// that we can decrypt, and that is suitable to be the latest event
/// (i.e. we can usefully display it as a message preview). Returns the
/// decrypted event if we found one, along with its index in the
/// latest_encrypted_events list, or None if we didn't find one.
#[cfg(all(feature = "e2e-encryption", feature = "experimental-sliding-sync"))]
async fn decrypt_latest_suitable_event(
&self,
room: &Room,
) -> Option<(SyncTimelineEvent, usize)> {
let enc_events = room.latest_encrypted_events();
// Walk backwards through the encrypted events, looking for one we can decrypt
for (i, event) in enc_events.iter().enumerate().rev() {
if let Ok(Some(decrypted)) = self.decrypt_sync_room_event(event, room.room_id()).await {
// We found an event we can decrypt
if let Ok(any_sync_event) = decrypted.event.deserialize() {
// We can deserialize it to find its type
if let PossibleLatestEvent::YesMessageLike(_) =
is_suitable_for_latest_event(&any_sync_event)
{
// The event is the right type for us to use as latest_event
return Some((decrypted, i));
}
}
}
}
None
}
/// User has joined a room.
///
/// Update the internal and cached state accordingly. Return the final Room.
@@ -644,6 +709,7 @@ impl BaseClient {
}
let now = Instant::now();
let mut changes = Box::new(StateChanges::new(response.next_batch.clone()));
#[cfg(feature = "e2e-encryption")]
let to_device = self
@@ -652,13 +718,13 @@ impl BaseClient {
&response.device_lists,
&response.device_one_time_keys_count,
response.device_unused_fallback_key_types.as_deref(),
&mut changes,
)
.await?;
#[cfg(not(feature = "e2e-encryption"))]
let to_device = response.to_device.events;
let mut changes = Box::new(StateChanges::new(response.next_batch.clone()));
let mut ambiguity_cache = AmbiguityCache::new(self.store.inner.clone());
self.handle_account_data(&response.account_data.events, &mut changes).await;
@@ -1225,31 +1291,24 @@ impl Default for BaseClient {
#[cfg(test)]
mod tests {
use matrix_sdk_test::{
async_test, response_from_file, EventBuilder, InvitedRoomBuilder, LeftRoomBuilder,
StrippedStateTestEvent, TimelineTestEvent,
async_test, response_from_file, EventBuilder, InvitedRoomBuilder, JoinedRoomBuilder,
LeftRoomBuilder, StrippedStateTestEvent, TimelineTestEvent,
};
use ruma::{
api::{client as api, IncomingResponse},
room_id, user_id,
room_id, user_id, RoomId, UserId,
};
use serde_json::json;
use super::BaseClient;
use crate::{DisplayName, RoomState, SessionMeta};
use crate::{DisplayName, Room, RoomState, SessionMeta, StateChanges};
#[async_test]
async fn invite_after_leaving() {
let user_id = user_id!("@alice:example.org");
let room_id = room_id!("!test:example.org");
let client = BaseClient::new();
client
.set_session_meta(SessionMeta {
user_id: user_id.to_owned(),
device_id: "FOOBAR".into(),
})
.await
.unwrap();
let client = logged_in_client(user_id).await;
let mut ev_builder = EventBuilder::new();
@@ -1295,14 +1354,7 @@ mod tests {
let user_id = user_id!("@alice:example.org");
let room_id = room_id!("!ithpyNKDtmhneaTQja:example.org");
let client = BaseClient::new();
client
.set_session_meta(SessionMeta {
user_id: user_id.to_owned(),
device_id: "FOOBAR".into(),
})
.await
.unwrap();
let client = logged_in_client(user_id).await;
let response = api::sync::sync_events::v3::Response::try_from_http_response(response_from_file(&json!({
"next_batch": "asdkl;fjasdkl;fj;asdkl;f",
@@ -1384,4 +1436,72 @@ mod tests {
DisplayName::Calculated("Kyra".to_owned())
);
}
#[cfg(all(feature = "e2e-encryption", feature = "experimental-sliding-sync"))]
#[async_test]
async fn when_there_are_no_latest_encrypted_events_decrypting_them_does_nothing() {
// Given a room
let user_id = user_id!("@u:u.to");
let room_id = room_id!("!r:u.to");
let client = logged_in_client(user_id).await;
let mut room = process_room_join(&client, room_id, "$1", user_id).await;
// Sanity: it has no latest_encrypted_events or latest_event
assert!(room.latest_encrypted_events().is_empty());
assert!(room.latest_event().is_none());
// When I tell it to do some decryption
let mut changes = StateChanges::default();
client.decrypt_latest_events(&mut room, &mut changes).await;
// Then nothing changed
assert!(room.latest_encrypted_events().is_empty());
assert!(room.latest_event().is_none());
assert!(changes.room_infos.is_empty());
}
// TODO: I wanted to write more tests here for decrypt_latest_events but I got
// lost trying to set up my OlmMachine to be able to encrypt and decrypt
// events. In the meantime, there are tests for the most difficult logic
// inside Room. --andyb
async fn logged_in_client(user_id: &UserId) -> BaseClient {
let client = BaseClient::new();
client
.set_session_meta(SessionMeta {
user_id: user_id.to_owned(),
device_id: "FOOBAR".into(),
})
.await
.expect("set_session_meta failed!");
client
}
#[cfg(feature = "e2e-encryption")]
async fn process_room_join(
client: &BaseClient,
room_id: &RoomId,
event_id: &str,
user_id: &UserId,
) -> Room {
let mut ev_builder = EventBuilder::new();
let response = ev_builder
.add_joined_room(JoinedRoomBuilder::new(room_id).add_timeline_event(
TimelineTestEvent::Custom(json!({
"content": {
"displayname": "Alice",
"membership": "join",
},
"event_id": event_id,
"origin_server_ts": 1432135524678u64,
"sender": user_id,
"state_key": user_id,
"type": "m.room.member",
})),
))
.build_sync_response();
client.receive_sync_response(response).await.unwrap();
client.get_room(room_id).expect("Just-created room not found!")
}
}

View File

@@ -0,0 +1,202 @@
//! Utilities for working with events to decide whether they are suitable for
//! use as a [crate::RoomInfo::latest_event].
#![cfg(all(feature = "e2e-encryption", feature = "experimental-sliding-sync"))]
use ruma::events::{
room::message::RoomMessageEventContent, AnySyncMessageLikeEvent, AnySyncTimelineEvent,
OriginalSyncMessageLikeEvent, SyncMessageLikeEvent,
};
/// Represents a decision about whether an event could be stored as the latest
/// event in a room. Variants starting with Yes indicate that this message could
/// be stored, and provide the inner event information, and those starting with
/// a No indicate that it could not, and give a reason.
#[derive(Debug)]
pub enum PossibleLatestEvent<'a> {
/// This message is suitable - it is an m.room.message
YesMessageLike(&'a OriginalSyncMessageLikeEvent<RoomMessageEventContent>),
// Later: YesState(),
// Later: YesReaction(),
/// Not suitable - it's a state event
NoUnsupportedEventType,
/// Not suitable - it's not an m.room.message
NoUnsupportedMessageLikeType,
/// Not suitable - it's encrypted
NoEncrypted,
/// Not suitable - it's redacted (might we want to include these?)
NoRedacted,
}
/// Decide whether an event could be stored as the latest event in a room.
/// Returns a LatestEvent representing our decision.
pub fn is_suitable_for_latest_event(event: &AnySyncTimelineEvent) -> PossibleLatestEvent<'_> {
match event {
// Suitable - we have an m.room.message that was not redacted
AnySyncTimelineEvent::MessageLike(AnySyncMessageLikeEvent::RoomMessage(
SyncMessageLikeEvent::Original(message),
)) => PossibleLatestEvent::YesMessageLike(message),
// Encrypted events are not suitable
AnySyncTimelineEvent::MessageLike(AnySyncMessageLikeEvent::RoomEncrypted(_)) => {
PossibleLatestEvent::NoEncrypted
}
// Later, if we support reactions:
// AnySyncTimelineEvent::MessageLike(AnySyncMessageLikeEvent::Reaction(_))
// Redacted events are not suitable
AnySyncTimelineEvent::MessageLike(AnySyncMessageLikeEvent::RoomMessage(
SyncMessageLikeEvent::Redacted(_),
)) => PossibleLatestEvent::NoRedacted,
// MessageLike, but not one of the types we want to show in message previews, so not
// suitable
AnySyncTimelineEvent::MessageLike(_) => PossibleLatestEvent::NoUnsupportedMessageLikeType,
// We don't currently support state events
AnySyncTimelineEvent::State(_) => PossibleLatestEvent::NoUnsupportedEventType,
}
}
#[cfg(test)]
mod test {
use std::collections::BTreeMap;
use assert_matches::assert_matches;
use ruma::{
events::{
room::{
encrypted::{
EncryptedEventScheme, OlmV1Curve25519AesSha2Content, RoomEncryptedEventContent,
SyncRoomEncryptedEvent,
},
message::{
ImageMessageEventContent, MessageType, RedactedRoomMessageEventContent,
RoomMessageEventContent, SyncRoomMessageEvent,
},
topic::{RoomTopicEventContent, SyncRoomTopicEvent},
ImageInfo, MediaSource,
},
sticker::{StickerEventContent, SyncStickerEvent},
AnySyncMessageLikeEvent, AnySyncStateEvent, AnySyncTimelineEvent, EmptyStateKey,
MessageLikeUnsigned, OriginalSyncMessageLikeEvent, OriginalSyncStateEvent,
RedactedSyncMessageLikeEvent, RedactedUnsigned, StateUnsigned,
UnsignedRoomRedactionEvent,
},
owned_event_id, owned_mxc_uri, owned_user_id, MilliSecondsSinceUnixEpoch, UInt,
};
use serde_json::json;
use crate::latest_event::{is_suitable_for_latest_event, PossibleLatestEvent};
#[test]
fn room_messages_are_suitable() {
let event = AnySyncTimelineEvent::MessageLike(AnySyncMessageLikeEvent::RoomMessage(
SyncRoomMessageEvent::Original(OriginalSyncMessageLikeEvent {
content: RoomMessageEventContent::new(MessageType::Image(
ImageMessageEventContent::new(
"".to_owned(),
MediaSource::Plain(owned_mxc_uri!("mxc://example.com/1")),
),
)),
event_id: owned_event_id!("$1"),
sender: owned_user_id!("@a:b.c"),
origin_server_ts: MilliSecondsSinceUnixEpoch(UInt::new(2123).unwrap()),
unsigned: MessageLikeUnsigned::new(),
}),
));
let m = assert_matches::assert_matches!(
is_suitable_for_latest_event(&event),
PossibleLatestEvent::YesMessageLike(m) => m
);
assert_eq!(m.content.msgtype.msgtype(), "m.image");
}
#[test]
fn different_types_of_messagelike_are_unsuitable() {
let event = AnySyncTimelineEvent::MessageLike(AnySyncMessageLikeEvent::Sticker(
SyncStickerEvent::Original(OriginalSyncMessageLikeEvent {
content: StickerEventContent::new(
"sticker!".to_owned(),
ImageInfo::new(),
owned_mxc_uri!("mxc://example.com/1"),
),
event_id: owned_event_id!("$1"),
sender: owned_user_id!("@a:b.c"),
origin_server_ts: MilliSecondsSinceUnixEpoch(UInt::new(2123).unwrap()),
unsigned: MessageLikeUnsigned::new(),
}),
));
assert_matches!(
is_suitable_for_latest_event(&event),
PossibleLatestEvent::NoUnsupportedMessageLikeType
);
}
#[test]
fn redacted_messages_are_unsuitable() {
// Ruma does not allow constructing UnsignedRoomRedactionEvent instances.
let room_redaction_event: UnsignedRoomRedactionEvent = serde_json::from_value(json!({
"content": {},
"event_id": "$redaction",
"sender": "@x:y.za",
"origin_server_ts": 223543,
"unsigned": { "reason": "foo" }
}))
.unwrap();
let event = AnySyncTimelineEvent::MessageLike(AnySyncMessageLikeEvent::RoomMessage(
SyncRoomMessageEvent::Redacted(RedactedSyncMessageLikeEvent {
content: RedactedRoomMessageEventContent::new(),
event_id: owned_event_id!("$1"),
sender: owned_user_id!("@a:b.c"),
origin_server_ts: MilliSecondsSinceUnixEpoch(UInt::new(2123).unwrap()),
unsigned: RedactedUnsigned::new(room_redaction_event),
}),
));
assert_matches!(is_suitable_for_latest_event(&event), PossibleLatestEvent::NoRedacted);
}
#[test]
fn encrypted_messages_are_unsuitable() {
let event = AnySyncTimelineEvent::MessageLike(AnySyncMessageLikeEvent::RoomEncrypted(
SyncRoomEncryptedEvent::Original(OriginalSyncMessageLikeEvent {
content: RoomEncryptedEventContent::new(
EncryptedEventScheme::OlmV1Curve25519AesSha2(
OlmV1Curve25519AesSha2Content::new(BTreeMap::new(), "".to_owned()),
),
None,
),
event_id: owned_event_id!("$1"),
sender: owned_user_id!("@a:b.c"),
origin_server_ts: MilliSecondsSinceUnixEpoch(UInt::new(2123).unwrap()),
unsigned: MessageLikeUnsigned::new(),
}),
));
assert_matches!(is_suitable_for_latest_event(&event), PossibleLatestEvent::NoEncrypted);
}
#[test]
fn state_events_are_unsuitable() {
let event = AnySyncTimelineEvent::State(AnySyncStateEvent::RoomTopic(
SyncRoomTopicEvent::Original(OriginalSyncStateEvent {
content: RoomTopicEventContent::new("".to_string()),
event_id: owned_event_id!("$1"),
sender: owned_user_id!("@a:b.c"),
origin_server_ts: MilliSecondsSinceUnixEpoch(UInt::new(2123).unwrap()),
unsigned: StateUnsigned::new(),
state_key: EmptyStateKey,
}),
));
assert_matches!(
is_suitable_for_latest_event(&event),
PossibleLatestEvent::NoUnsupportedEventType
);
}
}

View File

@@ -27,6 +27,7 @@ mod client;
pub mod debug;
pub mod deserialized_responses;
mod error;
pub mod latest_event;
pub mod media;
mod rooms;
#[cfg(feature = "experimental-sliding-sync")]

View File

@@ -19,6 +19,10 @@ use std::{
use bitflags::bitflags;
use futures_util::stream::{self, StreamExt};
#[cfg(feature = "experimental-sliding-sync")]
use matrix_sdk_common::deserialized_responses::SyncTimelineEvent;
#[cfg(all(feature = "e2e-encryption", feature = "experimental-sliding-sync"))]
use matrix_sdk_common::ring_buffer::RingBuffer;
use ruma::{
api::client::sync::sync_events::v3::RoomSummary as RumaSummary,
events::{
@@ -43,6 +47,8 @@ use ruma::{
EventId, OwnedEventId, OwnedMxcUri, OwnedRoomAliasId, OwnedRoomId, OwnedUserId, RoomAliasId,
RoomId, RoomVersionId, UserId,
};
#[cfg(all(feature = "e2e-encryption", feature = "experimental-sliding-sync"))]
use ruma::{events::AnySyncTimelineEvent, serde::Raw};
use serde::{Deserialize, Serialize};
use tracing::{debug, info, instrument, warn};
@@ -65,6 +71,18 @@ pub struct Room {
own_user_id: OwnedUserId,
inner: Arc<SyncRwLock<RoomInfo>>,
store: Arc<DynStateStore>,
/// The most recent few encrypted events. When the keys come through to
/// decrypt these, the most recent relevant one will replace
/// `latest_event`. (We can't tell which one is relevant until
/// they are decrypted.)
///
/// Currently, these are held in Room rather than RoomInfo, because we were
/// not sure whether holding too many of them might make the cache too
/// slow to load on startup. Keeping them here means they are not cached
/// to disk but held in memory.
#[cfg(all(feature = "e2e-encryption", feature = "experimental-sliding-sync"))]
pub latest_encrypted_events: Arc<SyncRwLock<RingBuffer<Raw<AnySyncTimelineEvent>>>>,
}
/// The room summary containing member counts and members that should be used to
@@ -108,6 +126,10 @@ impl From<&MembershipState> for RoomState {
}
impl Room {
/// The size of the latest_encrypted_events RingBuffer
#[cfg(all(feature = "e2e-encryption", feature = "experimental-sliding-sync"))]
const MAX_ENCRYPTED_EVENTS: usize = 10;
pub(crate) fn new(
own_user_id: &UserId,
store: Arc<DynStateStore>,
@@ -128,6 +150,10 @@ impl Room {
room_id: room_info.room_id.clone(),
store,
inner: Arc::new(SyncRwLock::new(room_info)),
#[cfg(all(feature = "e2e-encryption", feature = "experimental-sliding-sync"))]
latest_encrypted_events: Arc::new(SyncRwLock::new(RingBuffer::new(
Self::MAX_ENCRYPTED_EVENTS,
))),
}
}
@@ -348,6 +374,41 @@ impl Room {
self.calculate_name().await
}
/// Return the last event in this room, if one has been cached during
/// sliding sync.
#[cfg(feature = "experimental-sliding-sync")]
pub fn latest_event(&self) -> Option<SyncTimelineEvent> {
self.inner.read().unwrap().latest_event.clone()
}
/// Update the last event in the room
#[cfg(feature = "experimental-sliding-sync")]
pub(crate) fn set_latest_event(&self, latest_event: Option<SyncTimelineEvent>) {
self.inner.write().unwrap().latest_event = latest_event;
}
/// Return the most recent few encrypted events. When the keys come through
/// to decrypt these, the most recent relevant one will replace
/// latest_event. (We can't tell which one is relevant until
/// they are decrypted.)
#[cfg(all(feature = "e2e-encryption", feature = "experimental-sliding-sync"))]
pub(crate) fn latest_encrypted_events(&self) -> Vec<Raw<AnySyncTimelineEvent>> {
self.latest_encrypted_events.read().unwrap().iter().cloned().collect()
}
/// Replace our latest_event with the supplied event, and delete it and all
/// older encrypted events from latest_encrypted_events, given that the
/// new event was at the supplied index in the latest_encrypted_events
/// list.
///
/// Panics if index is not a valid index in the latest_encrypted_events
/// list.
#[cfg(all(feature = "e2e-encryption", feature = "experimental-sliding-sync"))]
pub(crate) fn on_latest_event_decrypted(&mut self, event: SyncTimelineEvent, index: usize) {
self.set_latest_event(Some(event));
self.latest_encrypted_events.write().unwrap().drain(0..=index);
}
/// Get the list of users ids that are considered to be joined members of
/// this room.
pub async fn joined_user_ids(&self) -> StoreResult<Vec<OwnedUserId>> {
@@ -632,6 +693,9 @@ pub struct RoomInfo {
/// Whether or not the encryption info was been synced.
#[serde(default = "encryption_state_default")] // see fn docs for why we use this default
encryption_state_synced: bool,
/// The last event send by sliding sync
#[cfg(feature = "experimental-sliding-sync")]
pub latest_event: Option<SyncTimelineEvent>,
/// Base room info which holds some basic event contents important for the
/// room state.
pub(crate) base_info: BaseRoomInfo,
@@ -684,6 +748,8 @@ impl RoomInfo {
last_prev_batch: None,
sync_info: SyncInfo::NoState,
encryption_state_synced: false,
#[cfg(feature = "experimental-sliding-sync")]
latest_event: None,
base_info: BaseRoomInfo::new(),
}
}
@@ -999,6 +1065,7 @@ mod test {
};
#[test]
#[cfg(feature = "experimental-sliding-sync")]
fn room_info_serialization() {
// This test exists to make sure we don't accidentally change the
// serialized format for `RoomInfo`.
@@ -1019,9 +1086,56 @@ mod test {
last_prev_batch: Some("pb".to_owned()),
sync_info: SyncInfo::FullySynced,
encryption_state_synced: true,
latest_event: Some(
Raw::from_json_string(json!({"sender": "@u:i.uk"}).to_string()).unwrap().into(),
),
base_info: BaseRoomInfo::new(),
};
let info_json = json!({
"room_id": "!gda78o:server.tld",
"room_type": "Invited",
"notification_counts": {
"highlight_count": 1,
"notification_count": 2,
},
"summary": {
"heroes": ["Somebody"],
"joined_member_count": 5,
"invited_member_count": 0,
},
"members_synced": true,
"last_prev_batch": "pb",
"sync_info": "FullySynced",
"encryption_state_synced": true,
"latest_event": {"encryption_info": null, "event": {"sender": "@u:i.uk"}},
"base_info": {
"avatar": null,
"canonical_alias": null,
"create": null,
"dm_targets": [],
"encryption": null,
"guest_access": null,
"history_visibility": null,
"join_rules": null,
"max_power_level": 100,
"name": null,
"tombstone": null,
"topic": null,
}
});
assert_eq!(serde_json::to_value(info).unwrap(), info_json);
}
#[test]
#[cfg(feature = "experimental-sliding-sync")]
fn room_info_deserialization_without_optional_items() {
// Ensure we can still deserialize RoomInfos before we added things to its
// schema
// The following JSON should never change if we want to be able to read in old
// cached state
let info_json = json!({
"room_id": "!gda78o:server.tld",
"room_type": "Invited",
@@ -1054,7 +1168,32 @@ mod test {
}
});
assert_eq!(serde_json::to_value(info).unwrap(), info_json);
let info: RoomInfo = serde_json::from_value(info_json).unwrap();
assert_eq!(info.room_id, room_id!("!gda78o:server.tld"));
assert_eq!(info.room_state, RoomState::Invited);
assert_eq!(info.notification_counts.highlight_count, 1);
assert_eq!(info.notification_counts.notification_count, 2);
assert_eq!(info.summary.heroes, vec!["Somebody".to_owned()]);
assert_eq!(info.summary.joined_member_count, 5);
assert_eq!(info.summary.invited_member_count, 0);
assert!(info.members_synced);
assert_eq!(info.last_prev_batch, Some("pb".to_owned()));
assert_eq!(info.sync_info, SyncInfo::FullySynced);
assert!(info.encryption_state_synced);
assert!(info.latest_event.is_none());
assert!(info.base_info.avatar.is_none());
assert!(info.base_info.canonical_alias.is_none());
assert!(info.base_info.create.is_none());
assert_eq!(info.base_info.dm_targets.len(), 0);
assert!(info.base_info.encryption.is_none());
assert!(info.base_info.guest_access.is_none());
assert!(info.base_info.history_visibility.is_none());
assert!(info.base_info.join_rules.is_none());
assert_eq!(info.base_info.max_power_level, 100);
assert!(info.base_info.name.is_none());
assert!(info.base_info.tombstone.is_none());
assert!(info.base_info.topic.is_none());
}
fn make_room(room_type: RoomState) -> (Arc<MemoryStore>, Room) {
@@ -1312,4 +1451,82 @@ mod test {
"new name"
);
}
#[test]
#[cfg(feature = "experimental-sliding-sync")]
fn when_we_provide_a_newly_decrypted_event_it_replaces_latest_event() {
// Given a room with an encrypted event
let (_store, mut room) = make_room(RoomState::Joined);
add_encrypted_event(&mut room, "$A");
// Sanity: it has no latest_event
assert!(room.latest_event().is_none());
// When I provide a decrypted event to replace the encrypted one
let event = make_event("$A");
room.on_latest_event_decrypted(event.clone(), 0);
// Then is it stored
assert_eq!(room.latest_event().unwrap().event_id(), event.event_id());
}
#[test]
#[cfg(feature = "experimental-sliding-sync")]
fn when_a_newly_decrypted_event_appears_we_delete_all_older_encrypted_events() {
// Given a room with some encrypted events and a latest event
let (_store, mut room) = make_room(RoomState::Joined);
room.inner.write().unwrap().latest_event = Some(make_event("$A"));
add_encrypted_event(&mut room, "$0");
add_encrypted_event(&mut room, "$1");
add_encrypted_event(&mut room, "$2");
add_encrypted_event(&mut room, "$3");
// When I provide a latest event
let new_event = make_event("$1");
let new_event_index = 1;
room.on_latest_event_decrypted(new_event.clone(), new_event_index);
// Then the encrypted events list is shortened to only newer events
let enc_evs = room.latest_encrypted_events();
assert_eq!(enc_evs.len(), 2);
assert_eq!(enc_evs.get(0).unwrap().get_field::<&str>("event_id").unwrap().unwrap(), "$2");
assert_eq!(enc_evs.get(1).unwrap().get_field::<&str>("event_id").unwrap().unwrap(), "$3");
// And the event is stored
assert_eq!(room.latest_event().unwrap().event_id(), new_event.event_id());
}
#[test]
#[cfg(feature = "experimental-sliding-sync")]
fn replacing_the_newest_event_leaves_none_left() {
// Given a room with some encrypted events
let (_store, mut room) = make_room(RoomState::Joined);
add_encrypted_event(&mut room, "$0");
add_encrypted_event(&mut room, "$1");
add_encrypted_event(&mut room, "$2");
add_encrypted_event(&mut room, "$3");
// When I provide a latest event and say it was the very latest
let new_event = make_event("$3");
let new_event_index = 3;
room.on_latest_event_decrypted(new_event.clone(), new_event_index);
// Then the encrypted events list ie empty
let enc_evs = room.latest_encrypted_events();
assert_eq!(enc_evs.len(), 0);
}
#[cfg(feature = "experimental-sliding-sync")]
fn add_encrypted_event(room: &mut Room, event_id: &str) {
room.latest_encrypted_events
.write()
.unwrap()
.push(Raw::from_json_string(json!({ "event_id": event_id }).to_string()).unwrap());
}
#[cfg(feature = "experimental-sliding-sync")]
fn make_event(event_id: &str) -> SyncTimelineEvent {
SyncTimelineEvent::new(
Raw::from_json_string(json!({ "event_id": event_id }).to_string()).unwrap(),
)
}
}

View File

@@ -15,6 +15,7 @@
#[cfg(feature = "e2e-encryption")]
use std::ops::Deref;
use matrix_sdk_common::deserialized_responses::SyncTimelineEvent;
use ruma::{
api::client::sync::sync_events::{
v3::{self, InvitedRoom, RoomSummary},
@@ -23,9 +24,11 @@ use ruma::{
events::AnySyncStateEvent,
RoomId,
};
use tracing::{debug, info, instrument};
use tracing::{debug, info, instrument, warn};
use super::BaseClient;
#[cfg(all(feature = "e2e-encryption", feature = "experimental-sliding-sync"))]
use crate::latest_event::{is_suitable_for_latest_event, PossibleLatestEvent};
#[cfg(feature = "e2e-encryption")]
use crate::RoomMemberships;
use crate::{
@@ -78,6 +81,8 @@ impl BaseClient {
e2ee.device_unused_fallback_key_types.as_ref().map(|v| v.len())
);
let mut changes = StateChanges::default();
// Process the to-device events and other related e2ee data. This returns a list
// of all the to-device events that were passed in but encrypted ones
// were replaced with their decrypted version.
@@ -91,11 +96,11 @@ impl BaseClient {
&e2ee.device_lists,
&e2ee.device_one_time_keys_count,
e2ee.device_unused_fallback_key_types.as_deref(),
&mut changes,
)
.await?;
let store = self.store.clone();
let mut changes = StateChanges::default();
let mut ambiguity_cache = AmbiguityCache::new(store.inner.clone());
if !account_data.is_empty() {
@@ -195,7 +200,8 @@ impl BaseClient {
let required_state = Self::deserialize_events(&room_data.required_state);
// Find or create the room in the store
let (room, mut room_info, invited_room) = self.process_sliding_sync_room_membership(
#[allow(unused_mut)] // Required for some feature flag combinations
let (mut room, mut room_info, invited_room) = self.process_sliding_sync_room_membership(
room_data,
&required_state,
store,
@@ -243,6 +249,11 @@ impl BaseClient {
)
.await?;
// Cache the latest decrypted event in room_info, and also keep any later
// encrypted events, so we can slot them in when we get the keys.
#[cfg(all(feature = "e2e-encryption", feature = "experimental-sliding-sync"))]
cache_latest_events(&mut room, &mut room_info, &timeline.events);
#[cfg(feature = "e2e-encryption")]
if room_info.is_encrypted() {
if let Some(o) = self.olm_machine().await.as_ref() {
@@ -361,6 +372,57 @@ impl BaseClient {
}
}
/// Find the most recent decrypted event and cache it in the supplied RoomInfo.
/// If any encrypted events are found after that one, store them in the RoomInfo
/// too so we can use them when we get the relevant keys.
#[cfg(all(feature = "e2e-encryption", feature = "experimental-sliding-sync"))]
fn cache_latest_events(room: &mut Room, room_info: &mut RoomInfo, events: &[SyncTimelineEvent]) {
let mut encrypted_events =
Vec::with_capacity(room.latest_encrypted_events.read().unwrap().capacity());
for e in events.iter().rev() {
if let Ok(timeline_event) = e.event.deserialize() {
match is_suitable_for_latest_event(&timeline_event) {
PossibleLatestEvent::YesMessageLike(_) => {
// m.room.message - we found one! Store it.
// Store it in the return RoomInfo, and in the Room, to make sure they are
// consistent
room_info.latest_event = Some(e.clone());
room.set_latest_event(Some(e.clone()));
// We don't need any of the older encrypted events because we have a new
// decrypted one.
room.latest_encrypted_events.write().unwrap().clear();
// We can stop looking through the timeline now because everything else is
// older.
break;
}
PossibleLatestEvent::NoEncrypted => {
// m.room.encrypted - this might be the latest event later - we can't tell until
// we are able to decrypt it, so store it for now
//
// Check how many encrypted events we have seen. Only store another if we
// haven't already stored the maximum number.
if encrypted_events.len() < encrypted_events.capacity() {
encrypted_events.push(e.event.clone());
}
}
_ => {
// Ignore unsuitable events
}
}
} else {
warn!(
"Failed to deserialise event as AnySyncTimelineEvent. ID={}",
e.event_id().expect("Event has no ID!")
);
}
}
// Push the encrypted events we found into the Room, in reverse order, so
// the latest is last
room.latest_encrypted_events.write().unwrap().extend(encrypted_events.into_iter().rev());
}
fn process_room_properties(room_data: &v4::SlidingSyncRoom, room_info: &mut RoomInfo) {
if let Some(name) = &room_data.name {
room_info.update_name(name.to_owned());
@@ -385,8 +447,12 @@ fn process_room_properties(room_data: &v4::SlidingSyncRoom, room_info: &mut Room
#[cfg(test)]
mod test {
use std::collections::{BTreeMap, HashSet};
use std::{
collections::{BTreeMap, HashSet},
sync::{Arc, RwLock as SyncRwLock},
};
use matrix_sdk_common::ring_buffer::RingBuffer;
use matrix_sdk_test::async_test;
use ruma::{
device_id, event_id,
@@ -397,7 +463,8 @@ mod test {
canonical_alias::RoomCanonicalAliasEventContent,
member::{MembershipState, RoomMemberEventContent},
},
AnySyncStateEvent, GlobalAccountDataEventContent, StateEventContent,
AnySyncStateEvent, AnySyncTimelineEvent, GlobalAccountDataEventContent,
StateEventContent,
},
mxc_uri, room_alias_id, room_id,
serde::Raw,
@@ -406,7 +473,7 @@ mod test {
use serde_json::json;
use super::*;
use crate::SessionMeta;
use crate::{store::MemoryStore, SessionMeta};
#[async_test]
async fn can_process_empty_sliding_sync_response() {
@@ -667,6 +734,328 @@ mod test {
);
}
#[async_test]
async fn last_event_from_sliding_sync_is_cached() {
// Given a logged-in client
let client = logged_in_client().await;
let room_id = room_id!("!r:e.uk");
let event_a = json!({
"sender":"@alice:example.com",
"type":"m.room.message",
"event_id": "$ida",
"origin_server_ts": 12344446,
"content":{"body":"A", "msgtype": "m.text"}
});
let event_b = json!({
"sender":"@alice:example.com",
"type":"m.room.message",
"event_id": "$idb",
"origin_server_ts": 12344447,
"content":{"body":"B", "msgtype": "m.text"}
});
// When the sliding sync response contains a timeline
let events = &[event_a, event_b.clone()];
let room = room_with_timeline(events);
let response = response_with_room(room_id, room).await;
client.process_sliding_sync(&response).await.expect("Failed to process sync");
// Then the room holds the latest event
let client_room = client.get_room(room_id).expect("No room found");
assert_eq!(ev_id(client_room.latest_event()), "$idb");
}
#[test]
fn when_no_events_we_dont_cache_any() {
let events = &[];
let chosen = choose_event_to_cache(events);
assert!(chosen.is_none());
}
#[test]
fn when_only_one_event_we_cache_it() {
let event1 = make_event("m.room.message", "$1");
let events = &[event1.clone()];
let chosen = choose_event_to_cache(events);
assert_eq!(ev_id(chosen), rawev_id(event1));
}
#[test]
fn with_multiple_events_we_cache_the_last_one() {
let event1 = make_event("m.room.message", "$1");
let event2 = make_event("m.room.message", "$2");
let events = &[event1, event2.clone()];
let chosen = choose_event_to_cache(events);
assert_eq!(ev_id(chosen), rawev_id(event2));
}
#[test]
fn cache_the_latest_relevant_event_and_ignore_irrelevant_ones_even_if_later() {
let event1 = make_event("m.room.message", "$1");
let event2 = make_event("m.room.message", "$2");
let event3 = make_event("m.room.powerlevels", "$3");
let event4 = make_event("m.room.powerlevels", "$5");
let events = &[event1, event2.clone(), event3, event4];
let chosen = choose_event_to_cache(events);
assert_eq!(ev_id(chosen), rawev_id(event2));
}
#[test]
fn prefer_to_cache_nothing_rather_than_irrelevant_events() {
let event1 = make_event("m.room.power_levels", "$1");
let events = &[event1];
let chosen = choose_event_to_cache(events);
assert!(chosen.is_none());
}
#[test]
fn cache_encrypted_events_that_are_after_latest_message() {
// Given two message events followed by two encrypted
let event1 = make_event("m.room.message", "$1");
let event2 = make_event("m.room.message", "$2");
let event3 = make_encrypted_event("$3");
let event4 = make_encrypted_event("$4");
let events = &[event1.clone(), event2.clone(), event3.clone(), event4.clone()];
// When I ask to cache events
let mut room = make_room();
let mut room_info = room.clone_info();
cache_latest_events(&mut room, &mut room_info, events);
// The latest message is stored
assert_eq!(ev_id(room_info.latest_event), rawev_id(event2.clone()));
assert_eq!(ev_id(room.latest_event()), rawev_id(event2));
// And also the two encrypted ones
assert_eq!(rawevs_ids(&room.latest_encrypted_events), evs_ids(&[event3, event4]));
}
#[test]
fn dont_cache_encrypted_events_that_are_before_latest_message() {
// Given an encrypted event before and after the message
let event1 = make_encrypted_event("$1");
let event2 = make_event("m.room.message", "$2");
let event3 = make_encrypted_event("$3");
let events = &[event1.clone(), event2.clone(), event3.clone()];
// When I ask to cache events
let mut room = make_room();
let mut room_info = room.clone_info();
cache_latest_events(&mut room, &mut room_info, events);
// The latest message is stored
assert_eq!(ev_id(room.latest_event()), rawev_id(event2));
// And also the encrypted one that was after it, but not the one before
assert_eq!(rawevs_ids(&room.latest_encrypted_events), evs_ids(&[event3]));
}
#[test]
fn skip_irrelevant_events_eg_receipts_even_if_after_message() {
// Given two message events followed by two encrypted, with a receipt in the
// middle
let event1 = make_event("m.room.message", "$1");
let event2 = make_event("m.room.message", "$2");
let event3 = make_encrypted_event("$3");
let event4 = make_event("m.read", "$4");
let event5 = make_encrypted_event("$5");
let events = &[event1.clone(), event2.clone(), event3.clone(), event4, event5.clone()];
// When I ask to cache events
let mut room = make_room();
let mut room_info = room.clone_info();
cache_latest_events(&mut room, &mut room_info, events);
// The latest message is stored, ignoring the receipt
assert_eq!(ev_id(room.latest_event()), rawev_id(event2));
// The two encrypted ones are stored, but not the receipt
assert_eq!(rawevs_ids(&room.latest_encrypted_events), evs_ids(&[event3, event5]));
}
#[test]
fn only_store_the_max_number_of_encrypted_events() {
// Given two message events followed by lots of encrypted and other irrelevant
// events
let evente = make_event("m.room.message", "$e");
let eventd = make_event("m.room.message", "$d");
let eventc = make_encrypted_event("$c");
let event9 = make_encrypted_event("$9");
let event8 = make_encrypted_event("$8");
let event7 = make_encrypted_event("$7");
let eventb = make_event("m.read", "$b");
let event6 = make_encrypted_event("$6");
let event5 = make_encrypted_event("$5");
let event4 = make_encrypted_event("$4");
let event3 = make_encrypted_event("$3");
let event2 = make_encrypted_event("$2");
let eventa = make_event("m.read", "$a");
let event1 = make_encrypted_event("$1");
let event0 = make_encrypted_event("$0");
let events = &[
evente.clone(),
eventd.clone(),
eventc.clone(),
event9.clone(),
event8.clone(),
event7.clone(),
eventb.clone(),
event6.clone(),
event5.clone(),
event4.clone(),
event3.clone(),
event2.clone(),
eventa.clone(),
event1.clone(),
event0.clone(),
];
// When I ask to cache events
let mut room = make_room();
let mut room_info = room.clone_info();
cache_latest_events(&mut room, &mut room_info, events);
// The latest message is stored, ignoring encrypted and receipts
assert_eq!(ev_id(room.latest_event()), rawev_id(eventd));
// Only 10 encrypted are stored, even though there were more
assert_eq!(
rawevs_ids(&room.latest_encrypted_events),
evs_ids(&[
event9, event8, event7, event6, event5, event4, event3, event2, event1, event0
])
);
}
#[test]
fn dont_overflow_capacity_if_previous_encrypted_events_exist() {
// Given a RoomInfo with lots of encrypted events already inside it
let mut room = make_room();
let mut room_info = room.clone_info();
cache_latest_events(
&mut room,
&mut room_info,
&[
make_encrypted_event("$0"),
make_encrypted_event("$1"),
make_encrypted_event("$2"),
make_encrypted_event("$3"),
make_encrypted_event("$4"),
make_encrypted_event("$5"),
make_encrypted_event("$6"),
make_encrypted_event("$7"),
make_encrypted_event("$8"),
make_encrypted_event("$9"),
],
);
// Sanity: room_info has 10 encrypted events inside it
assert_eq!(room.latest_encrypted_events.read().unwrap().len(), 10);
// When I ask to cache more encrypted events
let eventa = make_encrypted_event("$a");
let mut room_info = room.clone_info();
cache_latest_events(&mut room, &mut room_info, &[eventa]);
// The oldest event is gone
assert!(!rawevs_ids(&room.latest_encrypted_events).contains(&"$0".to_owned()));
// The newest event is last in the list
assert_eq!(rawevs_ids(&room.latest_encrypted_events)[9], "$a");
}
#[test]
fn existing_encrypted_events_are_deleted_if_we_receive_unencrypted() {
// Given a RoomInfo with some encrypted events already inside it
let mut room = make_room();
let mut room_info = room.clone_info();
cache_latest_events(
&mut room,
&mut room_info,
&[make_encrypted_event("$0"), make_encrypted_event("$1"), make_encrypted_event("$2")],
);
// When I ask to cache an unecnrypted event, and some more encrypted events
let eventa = make_event("m.room.message", "$a");
let eventb = make_encrypted_event("$b");
cache_latest_events(&mut room, &mut room_info, &[eventa, eventb]);
// The only encrypted events stored are the ones after the decrypted one
assert_eq!(rawevs_ids(&room.latest_encrypted_events), &["$b"]);
// The decrypted one is stored as the latest
assert_eq!(rawev_id(room.latest_event().unwrap()), "$a");
}
fn choose_event_to_cache(events: &[SyncTimelineEvent]) -> Option<SyncTimelineEvent> {
let mut room = make_room();
let mut room_info = room.clone_info();
cache_latest_events(&mut room, &mut room_info, events);
room.latest_event()
}
fn rawev_id(event: SyncTimelineEvent) -> String {
event.event_id().unwrap().to_string()
}
fn ev_id(event: Option<SyncTimelineEvent>) -> String {
event.unwrap().event_id().unwrap().to_string()
}
fn rawevs_ids(events: &Arc<SyncRwLock<RingBuffer<Raw<AnySyncTimelineEvent>>>>) -> Vec<String> {
events.read().unwrap().iter().map(|e| e.get_field("event_id").unwrap().unwrap()).collect()
}
fn evs_ids(events: &[SyncTimelineEvent]) -> Vec<String> {
events.iter().map(|e| e.event_id().unwrap().to_string()).collect()
}
fn make_room() -> Room {
Room::new(
user_id!("@u:e.co"),
Arc::new(MemoryStore::new()),
room_id!("!r:e.co"),
RoomState::Joined,
)
}
fn make_event(typ: &str, id: &str) -> SyncTimelineEvent {
SyncTimelineEvent::new(
Raw::from_json_string(
json!({
"type": typ,
"event_id": id,
"content": { "msgtype": "m.text", "body": "my msg" },
"sender": "@u:h.uk",
"origin_server_ts": 12344445,
})
.to_string(),
)
.unwrap(),
)
}
fn make_encrypted_event(id: &str) -> SyncTimelineEvent {
SyncTimelineEvent::new(
Raw::from_json_string(
json!({
"type": "m.room.encrypted",
"event_id": id,
"content": {
"algorithm": "m.megolm.v1.aes-sha2",
"ciphertext": "",
"sender_key": "",
"device_id": "",
"session_id": "",
},
"sender": "@u:h.uk",
"origin_server_ts": 12344445,
})
.to_string(),
)
.unwrap(),
)
}
async fn membership(
client: &BaseClient,
room_id: &RoomId,
@@ -774,6 +1163,17 @@ mod test {
room
}
fn room_with_timeline(events: &[serde_json::Value]) -> v4::SlidingSyncRoom {
let mut room = v4::SlidingSyncRoom::new();
room.timeline.extend(
events
.iter()
.map(|e| Raw::from_json_string(e.to_string()).unwrap())
.collect::<Vec<_>>(),
);
room
}
fn set_room_invited(room: &mut v4::SlidingSyncRoom, user_id: &UserId) {
// MSC3575 shows an almost-empty event to indicate that we are invited to a
// room. Just the type is supplied.

View File

@@ -81,7 +81,7 @@ pub use identities::{
pub use machine::OlmMachine;
#[cfg(feature = "qrcode")]
pub use matrix_sdk_qrcode;
pub use olm::{CrossSigningStatus, EncryptionSettings, ReadOnlyAccount};
pub use olm::{CrossSigningStatus, EncryptionSettings, ReadOnlyAccount, Session};
pub use requests::{
IncomingResponse, KeysBackupRequest, KeysQueryRequest, OutgoingRequest, OutgoingRequests,
OutgoingVerificationRequest, RoomMessageRequest, ToDeviceRequest, UploadSigningKeysRequest,

View File

@@ -68,7 +68,7 @@ use crate::{
session_manager::{GroupSessionManager, SessionManager},
store::{
locks::LockStoreError, Changes, DeviceChanges, DynCryptoStore, IdentityChanges,
IntoCryptoStore, MemoryStore, Result as StoreResult, SecretImportError, Store,
IntoCryptoStore, MemoryStore, Result as StoreResult, RoomKeyInfo, SecretImportError, Store,
},
types::{
events::{
@@ -1104,6 +1104,8 @@ impl OlmMachine {
/// response returned.
///
/// [`decrypt_room_event`]: #method.decrypt_room_event
///
/// Returns a tuple of (pending verification events, updated room keys)
#[instrument(skip_all)]
pub async fn receive_sync_changes(
&self,
@@ -1111,7 +1113,7 @@ impl OlmMachine {
changed_devices: &DeviceLists,
one_time_keys_counts: &BTreeMap<DeviceKeyAlgorithm, UInt>,
unused_fallback_keys: Option<&[DeviceKeyAlgorithm]>,
) -> OlmResult<Vec<Raw<AnyToDeviceEvent>>> {
) -> OlmResult<(Vec<Raw<AnyToDeviceEvent>>, Vec<RoomKeyInfo>)> {
// Remove verification objects that have expired or are done.
let mut events = self.inner.verification_machine.garbage_collect();
@@ -1136,6 +1138,11 @@ impl OlmMachine {
events.push(raw_event);
}
// Technically save_changes also does the same work, so if it's slow we could
// refactor this to do it only once.
let room_key_updates: Vec<_> =
changes.inbound_group_sessions.iter().map(RoomKeyInfo::from).collect();
let changed_sessions =
self.inner.key_request_machine.collect_incoming_key_requests().await?;
@@ -1143,7 +1150,7 @@ impl OlmMachine {
self.store().save_changes(changes).await?;
Ok(events)
Ok((events, room_key_updates))
}
/// Request a room key from our devices.
@@ -2419,7 +2426,7 @@ pub(crate) mod tests {
let alice_session =
alice.inner.group_session_manager.get_outbound_group_session(room_id).unwrap();
let decrypted = bob
let (decrypted, room_key_updates) = bob
.receive_sync_changes(vec![event], &Default::default(), &Default::default(), None)
.await
.unwrap();
@@ -2437,6 +2444,10 @@ pub(crate) mod tests {
bob.store().get_inbound_group_session(room_id, alice_session.session_id()).await;
assert!(session.unwrap().is_some());
assert_eq!(room_key_updates.len(), 1);
assert_eq!(room_key_updates[0].room_id, room_id);
assert_eq!(room_key_updates[0].session_id, alice_session.session_id());
}
#[async_test]

View File

@@ -73,7 +73,7 @@ impl fmt::Debug for Session {
impl Session {
/// Decrypt the given Olm message.
///
/// Returns the decrypted plaintext or an `DecrypitonError` if decryption
/// Returns the decrypted plaintext or an `DecryptionError` if decryption
/// failed.
///
/// # Arguments

View File

@@ -24,7 +24,10 @@ use ruma::{
};
use super::Error;
use crate::{timeline::EventTimelineItem, Timeline};
use crate::{
timeline::{EventTimelineItem, SlidingSyncRoomExt},
Timeline,
};
/// A room in the room list.
///
@@ -47,10 +50,6 @@ struct RoomInner {
/// The timeline of the room.
timeline: AsyncOnceCell<Arc<Timeline>>,
/// The “sneaky” timeline of the room, i.e. this timeline doesn't track the
/// read marker nor the receipts.
sneaky_timeline: AsyncOnceCell<Arc<Timeline>>,
}
impl Room {
@@ -70,7 +69,6 @@ impl Room {
sliding_sync_room,
room,
timeline: AsyncOnceCell::new(),
sneaky_timeline: AsyncOnceCell::new(),
}),
})
}
@@ -131,28 +129,14 @@ impl Room {
.clone()
}
/// Get the latest event of the timeline.
/// Get the latest event in the timeline, if we already have it cached. Does
/// not fetch any events or calculate anything - if it's not already
/// available, we return None.
///
/// It's different from `Self::timeline().latest_event()` as it won't track
/// the read marker and receipts.
pub async fn latest_event(&self) -> Option<EventTimelineItem> {
self.inner
.sneaky_timeline
.get_or_init(async {
Arc::new(
Timeline::builder(&self.inner.room)
.events(
self.inner.sliding_sync_room.prev_batch(),
self.inner.sliding_sync_room.timeline_queue(),
)
.read_only()
.build()
.await,
)
})
.await
.latest_event()
.await
self.inner.sliding_sync_room.latest_timeline_item().await
}
/// Is there any unread notifications?

View File

@@ -75,6 +75,7 @@ impl TimelineBuilder {
///
/// This improves efficiency a little bit since no background task will be
/// spawned for sending messages.
#[allow(dead_code)]
pub(crate) fn read_only(mut self) -> Self {
self.read_only = true;
self

View File

@@ -18,6 +18,7 @@ use imbl::{vector, Vector};
use indexmap::IndexMap;
use itertools::Itertools;
use matrix_sdk::{deserialized_responses::TimelineEvent, Result};
use matrix_sdk_base::latest_event::{is_suitable_for_latest_event, PossibleLatestEvent};
use ruma::{
assign,
events::{
@@ -52,12 +53,12 @@ use ruma::{
space::{child::SpaceChildEventContent, parent::SpaceParentEventContent},
sticker::StickerEventContent,
AnyFullStateEventContent, AnyMessageLikeEventContent, AnySyncMessageLikeEvent,
AnyTimelineEvent, BundledMessageLikeRelations, FullStateEventContent, MessageLikeEventType,
StateEventType,
AnySyncTimelineEvent, AnyTimelineEvent, BundledMessageLikeRelations, FullStateEventContent,
MessageLikeEventType, OriginalSyncMessageLikeEvent, StateEventType,
},
OwnedDeviceId, OwnedEventId, OwnedMxcUri, OwnedTransactionId, OwnedUserId, UserId,
};
use tracing::{debug, error};
use tracing::{debug, error, warn};
use super::{EventTimelineItem, Profile, TimelineDetails};
use crate::timeline::{
@@ -111,6 +112,68 @@ pub enum TimelineItemContent {
}
impl TimelineItemContent {
/// If the supplied event is suitable to be used as a latest_event in a
/// message preview, extract its contents and wrap it as a
/// TimelineItemContent.
#[cfg(feature = "experimental-sliding-sync")]
pub(crate) fn from_latest_event_content(
event: AnySyncTimelineEvent,
) -> Option<TimelineItemContent> {
match is_suitable_for_latest_event(&event) {
PossibleLatestEvent::YesMessageLike(m) => Self::from_suitable_latest_event_content(m),
PossibleLatestEvent::NoUnsupportedEventType => {
// TODO: when we support state events in message previews, this will need change
warn!("Found a state event cached as latest_event! ID={}", event.event_id());
None
}
PossibleLatestEvent::NoUnsupportedMessageLikeType => {
// TODO: When we support reactions in message previews, this will need to change
warn!(
"Found an event cached as latest_event, but I don't know how \
to wrap it in a TimelineItemContent. type={}, ID={}",
event.event_type().to_string(),
event.event_id()
);
None
}
PossibleLatestEvent::NoEncrypted => {
warn!("Found an encrypted event cached as latest_event! ID={}", event.event_id());
None
}
PossibleLatestEvent::NoRedacted => {
warn!("Found a redacted event cached as latest_event! ID={}", event.event_id());
None
}
}
}
/// Given some message content that is from an event that we have already
/// determined is suitable for use as a latest event in a message preview,
/// extract its contents and wrap it as a TimelineItemContent.
fn from_suitable_latest_event_content(
message: &OriginalSyncMessageLikeEvent<RoomMessageEventContent>,
) -> Option<TimelineItemContent> {
// Grab the content of this event
let event_content = message.content.clone();
// We don't have access to any relations via the AnySyncTimelineEvent (I think -
// andyb) so we pretend there are none. This might be OK for the message preview
// use case.
let relations = BundledMessageLikeRelations::new();
// If this message is a reply, we would look up in this list the message it was
// replying to. Since we probably won't show this in the message preview,
// it's probably OK to supply an empty list here.
// Message::from_event marks the original event as Unavailable if it can't be
// found inside the timeline_items.
let timeline_items = Vector::new();
Some(TimelineItemContent::Message(Message::from_event(
event_content,
relations,
&timeline_items,
)))
}
/// If `self` is of the [`Message`][Self::Message] variant, return the inner
/// [`Message`].
pub fn as_message(&self) -> Option<&Message> {

View File

@@ -15,13 +15,15 @@
use std::sync::Arc;
use indexmap::IndexMap;
use matrix_sdk::{deserialized_responses::EncryptionInfo, Error};
use matrix_sdk::{deserialized_responses::EncryptionInfo, Error, SlidingSyncRoom};
use matrix_sdk_base::deserialized_responses::SyncTimelineEvent;
use once_cell::sync::Lazy;
use ruma::{
events::{receipt::Receipt, room::message::MessageType, AnySyncTimelineEvent},
serde::Raw,
EventId, MilliSecondsSinceUnixEpoch, OwnedMxcUri, OwnedUserId, TransactionId, UserId,
};
use tracing::warn;
mod content;
mod local;
@@ -78,6 +80,76 @@ impl EventTimelineItem {
Self { sender, sender_profile, timestamp, content, kind }
}
/// If the supplied low-level SyncTimelineEventy is suitable for use as the
/// latest_event in a message preview, wrap it as an EventTimelineItem,
#[cfg(feature = "experimental-sliding-sync")]
pub(crate) async fn from_latest_event(
room: &SlidingSyncRoom,
sync_event: SyncTimelineEvent,
) -> Option<EventTimelineItem> {
use super::traits::RoomDataProvider;
let raw_sync_event = sync_event.event;
let encryption_info = sync_event.encryption_info;
let Ok(event) = raw_sync_event.deserialize_as::<AnySyncTimelineEvent>() else {
warn!("Unable to deserialize latest_event as an AnySyncTimelineEvent!");
return None;
};
let timestamp = event.origin_server_ts();
let sender = event.sender().to_owned();
let event_id = event.event_id().to_owned();
let is_own = room.client().user_id().map(|uid| uid == sender).unwrap_or(false);
// If we don't (yet) know how to handle this type of message, return None here.
// If we do, convert it into a TimelineItemContent.
let item_content = TimelineItemContent::from_latest_event_content(event)?;
// We don't currently bundle any reactions with the main event. This could
// conceivably be wanted in the message preview in future.
let reactions = IndexMap::new();
// The message preview probably never needs read receipts.
let read_receipts = IndexMap::new();
// Being highlighted is _probably_ not relevant to the message preview.
let is_highlighted = false;
// We may need this, depending on how we are going to display edited messages in
// previews.
let latest_edit_json = None;
// Probably the origin of the event doesn't matter for the preview.
let origin = RemoteEventOrigin::Sync;
let event_kind = RemoteEventTimelineItem {
event_id,
reactions,
read_receipts,
is_own,
is_highlighted,
encryption_info,
original_json: raw_sync_event,
latest_edit_json,
origin,
}
.into();
let room = room.client().get_room(room.room_id());
let sender_profile = if let Some(room) = room {
room.profile(&sender)
.await
.map(TimelineDetails::Ready)
.unwrap_or(TimelineDetails::Unavailable)
} else {
TimelineDetails::Unavailable
};
Some(EventTimelineItem::new(sender, sender_profile, timestamp, item_content, event_kind))
}
/// Check whether this item is a local echo.
///
/// This returns `true` for events created locally, until the server echoes
@@ -382,3 +454,175 @@ pub enum EventItemOrigin {
/// The event came from pagination.
Pagination,
}
#[cfg(test)]
mod test {
use assert_matches::assert_matches;
use matrix_sdk::{config::RequestConfig, Client, ClientBuilder};
use matrix_sdk_base::{BaseClient, SessionMeta};
use matrix_sdk_test::async_test;
use ruma::{
api::{client::sync::sync_events::v4, MatrixVersion},
device_id,
events::room::message::MessageFormat,
room_id, user_id, RoomId, UInt,
};
use serde_json::json;
use super::*;
#[async_test]
#[cfg(feature = "experimental-sliding-sync")]
async fn latest_message_event_can_be_wrapped_as_a_timeline_item() {
// Given a sync event that is suitable to be used as a latest_event
let room_id = room_id!("!q:x.uk");
let user_id = user_id!("@t:o.uk");
let event = message_event(room_id, user_id, "**My M**", "<b>My M</b>", 122344);
let room = SlidingSyncRoom::new(
logged_in_client(None).await,
room_id.to_owned(),
v4::SlidingSyncRoom::new(),
Vec::new(),
);
// When we construct a timeline event from it
let timeline_item = EventTimelineItem::from_latest_event(&room, event).await.unwrap();
// Then its properties correctly translate
assert_eq!(timeline_item.sender, user_id);
assert_matches!(timeline_item.sender_profile, TimelineDetails::Unavailable);
assert_eq!(timeline_item.timestamp.0, UInt::new(122344).unwrap());
if let MessageType::Text(txt) = timeline_item.content.as_message().unwrap().msgtype() {
assert_eq!(txt.body, "**My M**");
let formatted = txt.formatted.as_ref().unwrap();
assert_eq!(formatted.format, MessageFormat::Html);
assert_eq!(formatted.body, "<b>My M</b>");
} else {
panic!("Unexpected message type");
}
}
#[async_test]
#[cfg(feature = "experimental-sliding-sync")]
async fn latest_message_event_can_be_wrapped_as_a_timeline_item_with_sender() {
// Given a sync event that is suitable to be used as a latest_event, and a room
// with a member event for the sender
use ruma::owned_mxc_uri;
let room_id = room_id!("!q:x.uk");
let user_id = user_id!("@t:o.uk");
let event = message_event(room_id, user_id, "**My M**", "<b>My M</b>", 122344);
let client = logged_in_client(None).await;
let mut room = v4::SlidingSyncRoom::new();
room.timeline.push(member_event(room_id, user_id, "Alice Margatroid", "mxc://e.org/SEs"));
let ss_room =
SlidingSyncRoom::new(client.clone(), room_id.to_owned(), room.clone(), Vec::new());
// And the room is stored in the client so it can be extracted when needed
let response = response_with_room(room_id, room).await;
client.process_sliding_sync(&response).await.unwrap();
// When we construct a timeline event from it
let timeline_item = EventTimelineItem::from_latest_event(&ss_room, event).await.unwrap();
// Then its sender is properly populated
let profile = assert_matches!(timeline_item.sender_profile, TimelineDetails::Ready(p) => p);
assert_eq!(
profile,
Profile {
display_name: Some("Alice Margatroid".to_owned()),
display_name_ambiguous: false,
avatar_url: Some(owned_mxc_uri!("mxc://e.org/SEs"))
}
);
}
fn member_event(
room_id: &RoomId,
user_id: &UserId,
display_name: &str,
avatar_url: &str,
) -> Raw<AnySyncTimelineEvent> {
Raw::from_json_string(
json!({
"type": "m.room.member",
"content": {
"avatar_url": avatar_url,
"displayname": display_name,
"membership": "join",
"reason": ""
},
"event_id": "$143273582443PhrSn:example.org",
"origin_server_ts": 143273583,
"room_id": room_id,
"sender": "@example:example.org",
"state_key": user_id,
"type": "m.room.member",
"unsigned": {
"age": 1234
}
})
.to_string(),
)
.unwrap()
}
async fn response_with_room(room_id: &RoomId, room: v4::SlidingSyncRoom) -> v4::Response {
let mut response = v4::Response::new("6".to_owned());
response.rooms.insert(room_id.to_owned(), room);
response
}
fn message_event(
room_id: &RoomId,
user_id: &UserId,
body: &str,
formatted_body: &str,
ts: u64,
) -> SyncTimelineEvent {
SyncTimelineEvent::new(
Raw::from_json_string(
json!({
"event_id": "$eventid6",
"sender": user_id,
"origin_server_ts": ts,
"type": "m.room.message",
"room_id": room_id.to_string(),
"content": {
"body": body,
"format": "org.matrix.custom.html",
"formatted_body": formatted_body,
"msgtype": "m.text"
},
})
.to_string(),
)
.unwrap(),
)
}
/// Copied from matrix_sdk_base::sliding_sync::test
async fn logged_in_client(homeserver_url: Option<String>) -> Client {
let base_client = BaseClient::new();
base_client
.set_session_meta(SessionMeta {
user_id: user_id!("@u:e.uk").to_owned(),
device_id: device_id!("XYZ").to_owned(),
})
.await
.expect("Failed to set session meta");
test_client_builder(homeserver_url)
.request_config(RequestConfig::new().disable_retry())
.base_client(base_client)
.build()
.await
.unwrap()
}
fn test_client_builder(homeserver_url: Option<String>) -> ClientBuilder {
let homeserver = homeserver_url.as_deref().unwrap_or("http://localhost:1234");
Client::builder().homeserver_url(homeserver).server_versions([MatrixVersion::V1_0])
}
}

View File

@@ -23,11 +23,11 @@ pub trait SlidingSyncRoomExt {
/// Get a `Timeline` for this room.
async fn timeline(&self) -> Option<Timeline>;
/// Get the latest timeline item of this room.
/// Get the latest timeline item of this room, if it is already cached.
///
/// Use `Timeline::latest_event` instead if you already have a timeline for
/// this `SlidingSyncRoom`.
async fn latest_event(&self) -> Option<EventTimelineItem>;
async fn latest_timeline_item(&self) -> Option<EventTimelineItem>;
}
#[async_trait]
@@ -36,9 +36,13 @@ impl SlidingSyncRoomExt for SlidingSyncRoom {
Some(sliding_sync_timeline_builder(self)?.track_read_marker_and_receipts().build().await)
}
/// Get a timeline item representing the latest event in this room.
/// This method wraps latest_event, converting the event into an
/// EventTimelineItem.
#[instrument(skip_all)]
async fn latest_event(&self) -> Option<EventTimelineItem> {
sliding_sync_timeline_builder(self)?.read_only().build().await.latest_event().await
async fn latest_timeline_item(&self) -> Option<EventTimelineItem> {
let latest_event = self.latest_event()?;
EventTimelineItem::from_latest_event(self, latest_event).await
}
}
@@ -52,3 +56,130 @@ fn sliding_sync_timeline_builder(room: &SlidingSyncRoom) -> Option<TimelineBuild
}
}
}
#[cfg(test)]
mod tests {
use assert_matches::assert_matches;
use matrix_sdk::{config::RequestConfig, Client, ClientBuilder, SlidingSyncRoom};
use matrix_sdk_base::{deserialized_responses::SyncTimelineEvent, BaseClient, SessionMeta};
use matrix_sdk_test::async_test;
use ruma::{
api::{client::sync::sync_events::v4, MatrixVersion},
device_id,
events::room::message::{MessageFormat, MessageType},
room_id,
serde::Raw,
user_id, RoomId, UInt, UserId,
};
use serde_json::json;
use crate::timeline::{SlidingSyncRoomExt, TimelineDetails};
#[async_test]
async fn initially_latest_message_event_is_none() {
// Given a room with no latest event
let room_id = room_id!("!r:x.uk").to_owned();
let client = logged_in_client(None).await;
let room = SlidingSyncRoom::new(client, room_id, v4::SlidingSyncRoom::new(), Vec::new());
// When we ask for the latest event, it is None
assert!(room.latest_timeline_item().await.is_none());
}
#[async_test]
async fn latest_message_event_is_wrapped_as_a_timeline_item() {
// Given a room exists, and an event came in through a sync
let room_id = room_id!("!r:x.uk");
let user_id = user_id!("@s:o.uk");
let client = logged_in_client(None).await;
let event = message_event(room_id, user_id, "**My msg**", "<b>My msg</b>", 122343);
process_event_via_sync(room_id, event, &client).await;
// When we ask for the latest event in the room
let room = SlidingSyncRoom::new(
client.clone(),
room_id.to_owned(),
v4::SlidingSyncRoom::new(),
Vec::new(),
);
let actual = room.latest_timeline_item().await.unwrap();
// Then it is wrapped as an EventTimelineItem
assert_eq!(actual.sender, user_id);
assert_matches!(actual.sender_profile, TimelineDetails::Unavailable);
assert_eq!(actual.timestamp.0, UInt::new(122343).unwrap());
if let MessageType::Text(txt) = actual.content.as_message().unwrap().msgtype() {
assert_eq!(txt.body, "**My msg**");
let formatted = txt.formatted.as_ref().unwrap();
assert_eq!(formatted.format, MessageFormat::Html);
assert_eq!(formatted.body, "<b>My msg</b>");
} else {
panic!("Unexpected message type");
}
}
async fn process_event_via_sync(room_id: &RoomId, event: SyncTimelineEvent, client: &Client) {
let mut room = v4::SlidingSyncRoom::new();
room.timeline.push(event.event);
let response = response_with_room(room_id, room).await;
client.process_sliding_sync(&response).await.unwrap();
}
fn message_event(
room_id: &RoomId,
user_id: &UserId,
body: &str,
formatted_body: &str,
ts: u64,
) -> SyncTimelineEvent {
SyncTimelineEvent::new(
Raw::from_json_string(
json!({
"event_id": "$eventid6",
"sender": user_id,
"origin_server_ts": ts,
"type": "m.room.message",
"room_id": room_id.to_string(),
"content": {
"body": body,
"format": "org.matrix.custom.html",
"formatted_body": formatted_body,
"msgtype": "m.text"
},
})
.to_string(),
)
.unwrap(),
)
}
async fn response_with_room(room_id: &RoomId, room: v4::SlidingSyncRoom) -> v4::Response {
let mut response = v4::Response::new("6".to_owned());
response.rooms.insert(room_id.to_owned(), room);
response
}
/// Copied from matrix_sdk_base::sliding_sync::test
async fn logged_in_client(homeserver_url: Option<String>) -> Client {
let base_client = BaseClient::new();
base_client
.set_session_meta(SessionMeta {
user_id: user_id!("@u:e.uk").to_owned(),
device_id: device_id!("XYZ").to_owned(),
})
.await
.expect("Failed to set session meta");
test_client_builder(homeserver_url)
.request_config(RequestConfig::new().disable_retry())
.base_client(base_client)
.build()
.await
.unwrap()
}
fn test_client_builder(homeserver_url: Option<String>) -> ClientBuilder {
let homeserver = homeserver_url.as_deref().unwrap_or("http://localhost:1234");
Client::builder().homeserver_url(homeserver).server_versions([MatrixVersion::V1_0])
}
}

View File

@@ -81,6 +81,7 @@ pub struct ClientBuilder {
appservice_mode: bool,
server_versions: Option<Box<[MatrixVersion]>>,
handle_refresh_tokens: bool,
base_client: Option<BaseClient>,
}
impl ClientBuilder {
@@ -94,6 +95,7 @@ impl ClientBuilder {
appservice_mode: false,
server_versions: None,
handle_refresh_tokens: false,
base_client: None,
}
}
@@ -301,6 +303,13 @@ impl ClientBuilder {
self
}
/// Public for test only
#[doc(hidden)]
pub fn base_client(mut self, base_client: BaseClient) -> Self {
self.base_client = Some(base_client);
self
}
/// Create a [`Client`] with the options set on this builder.
///
/// # Errors
@@ -330,20 +339,24 @@ impl ClientBuilder {
HttpConfig::Custom(c) => c,
};
#[allow(clippy::infallible_destructuring_match)]
let store_config = match self.store_config {
#[cfg(feature = "sqlite")]
BuilderStoreConfig::Sqlite { path, passphrase } => {
matrix_sdk_sqlite::make_store_config(&path, passphrase.as_deref()).await?
}
#[cfg(feature = "indexeddb")]
BuilderStoreConfig::IndexedDb { name, passphrase } => {
matrix_sdk_indexeddb::make_store_config(&name, passphrase.as_deref()).await?
}
BuilderStoreConfig::Custom(config) => config,
let base_client = if let Some(base_client) = self.base_client {
base_client
} else {
#[allow(clippy::infallible_destructuring_match)]
let store_config = match self.store_config {
#[cfg(feature = "sqlite")]
BuilderStoreConfig::Sqlite { path, passphrase } => {
matrix_sdk_sqlite::make_store_config(&path, passphrase.as_deref()).await?
}
#[cfg(feature = "indexeddb")]
BuilderStoreConfig::IndexedDb { name, passphrase } => {
matrix_sdk_indexeddb::make_store_config(&name, passphrase.as_deref()).await?
}
BuilderStoreConfig::Custom(config) => config,
};
BaseClient::with_store_config(store_config)
};
let base_client = BaseClient::with_store_config(store_config);
let http_client = HttpClient::new(inner_http_client.clone(), self.request_config);
let mut authentication_server_info = None;

View File

@@ -14,11 +14,9 @@ impl Client {
Ok(SlidingSync::builder(id.into(), self.clone())?)
}
/// Handle all the information provided in a sliding sync response
#[instrument(skip(self, response))]
pub(crate) async fn process_sliding_sync(
&self,
response: &v4::Response,
) -> Result<SyncResponse> {
pub async fn process_sliding_sync(&self, response: &v4::Response) -> Result<SyncResponse> {
let response = self.base_client().process_sliding_sync(response).await?;
debug!("done processing on base_client");
self.handle_sync_response(&response).await?;

View File

@@ -45,7 +45,7 @@ pub struct SlidingSyncRoom {
impl SlidingSyncRoom {
/// Create a new `SlidingSyncRoom`.
pub(super) fn new(
pub fn new(
client: Client,
room_id: OwnedRoomId,
inner: v4::SlidingSyncRoom,
@@ -125,6 +125,11 @@ impl SlidingSyncRoom {
self.inner.client.clone()
}
/// Find the latest event in this room
pub fn latest_event(&self) -> Option<SyncTimelineEvent> {
self.inner.client.get_room(&self.inner.room_id).and_then(|room| room.latest_event())
}
pub(super) fn update(
&mut self,
room_data: v4::SlidingSyncRoom,