feat: Make latest_event faster regarding member profile

feat: Make `latest_event` faster regarding member profile
This commit is contained in:
Ivan Enderlin
2023-10-09 11:41:32 +02:00
committed by GitHub
18 changed files with 433 additions and 178 deletions

View File

@@ -315,10 +315,10 @@ impl Room {
// Otherwise, fallback to the classical path.
let latest_event = match self.inner.latest_event() {
Some(ev) => matrix_sdk_ui::timeline::EventTimelineItem::from_latest_event(
Some(latest_event) => matrix_sdk_ui::timeline::EventTimelineItem::from_latest_event(
self.inner.client(),
self.inner.room_id(),
ev,
latest_event,
)
.await
.map(EventTimelineItem)

View File

@@ -54,7 +54,7 @@ use tokio::sync::RwLockReadGuard;
use tracing::{debug, info, instrument, trace, warn};
#[cfg(all(feature = "e2e-encryption", feature = "experimental-sliding-sync"))]
use crate::latest_event::{is_suitable_for_latest_event, PossibleLatestEvent};
use crate::latest_event::{is_suitable_for_latest_event, LatestEvent, PossibleLatestEvent};
use crate::{
deserialized_responses::{AmbiguityChanges, MembersResponse, SyncTimelineEvent},
error::Result,
@@ -620,10 +620,7 @@ impl BaseClient {
/// decrypted event if we found one, along with its index in the
/// latest_encrypted_events list, or None if we didn't find one.
#[cfg(all(feature = "e2e-encryption", feature = "experimental-sliding-sync"))]
async fn decrypt_latest_suitable_event(
&self,
room: &Room,
) -> Option<(SyncTimelineEvent, usize)> {
async fn decrypt_latest_suitable_event(&self, room: &Room) -> Option<(LatestEvent, usize)> {
let enc_events = room.latest_encrypted_events();
// Walk backwards through the encrypted events, looking for one we can decrypt
@@ -636,7 +633,7 @@ impl BaseClient {
is_suitable_for_latest_event(&any_sync_event)
{
// The event is the right type for us to use as latest_event
return Some((decrypted, i));
return Some((LatestEvent::new(decrypted), i));
}
}
}

View File

@@ -1,17 +1,24 @@
//! Utilities for working with events to decide whether they are suitable for
//! use as a [crate::Room::latest_event].
#![cfg(all(feature = "e2e-encryption", feature = "experimental-sliding-sync"))]
#![cfg(feature = "experimental-sliding-sync")]
use matrix_sdk_common::deserialized_responses::SyncTimelineEvent;
#[cfg(feature = "e2e-encryption")]
use ruma::events::{
poll::unstable_start::SyncUnstablePollStartEvent, room::message::SyncRoomMessageEvent,
AnySyncMessageLikeEvent, AnySyncTimelineEvent,
};
use ruma::{MxcUri, OwnedEventId};
use serde::{Deserialize, Serialize};
use crate::MinimalRoomMemberEvent;
/// Represents a decision about whether an event could be stored as the latest
/// event in a room. Variants starting with Yes indicate that this message could
/// be stored, and provide the inner event information, and those starting with
/// a No indicate that it could not, and give a reason.
#[cfg(feature = "e2e-encryption")]
#[derive(Debug)]
pub enum PossibleLatestEvent<'a> {
/// This message is suitable - it is an m.room.message
@@ -30,6 +37,7 @@ pub enum PossibleLatestEvent<'a> {
/// Decide whether an event could be stored as the latest event in a room.
/// Returns a LatestEvent representing our decision.
#[cfg(feature = "e2e-encryption")]
pub fn is_suitable_for_latest_event(event: &AnySyncTimelineEvent) -> PossibleLatestEvent<'_> {
match event {
// Suitable - we have an m.room.message that was not redacted
@@ -58,6 +66,86 @@ pub fn is_suitable_for_latest_event(event: &AnySyncTimelineEvent) -> PossibleLat
}
}
/// Represent all information required to represent a latest event in an
/// efficient way.
#[derive(Clone, Debug, Deserialize, Serialize)]
pub struct LatestEvent {
/// The actual event.
event: SyncTimelineEvent,
/// The member profile of the event' sender.
#[serde(skip_serializing_if = "Option::is_none")]
sender_profile: Option<MinimalRoomMemberEvent>,
/// The name of the event' sender is ambiguous.
#[serde(skip_serializing_if = "Option::is_none")]
sender_name_is_ambiguous: Option<bool>,
}
impl LatestEvent {
/// Create a new [`LatestEvent`] without the sender's profile.
pub fn new(event: SyncTimelineEvent) -> Self {
Self { event, sender_profile: None, sender_name_is_ambiguous: None }
}
/// Create a new [`LatestEvent`] with maybe the sender's profile.
pub fn new_with_sender_details(
event: SyncTimelineEvent,
sender_profile: Option<MinimalRoomMemberEvent>,
sender_name_is_ambiguous: Option<bool>,
) -> Self {
Self { event, sender_profile, sender_name_is_ambiguous }
}
/// Transform [`Self`] into an event.
pub fn into_event(self) -> SyncTimelineEvent {
self.event
}
/// Get a reference to the event.
pub fn event(&self) -> &SyncTimelineEvent {
&self.event
}
/// Get a mutable reference to the event.
pub fn event_mut(&mut self) -> &mut SyncTimelineEvent {
&mut self.event
}
/// Get the event ID.
pub fn event_id(&self) -> Option<OwnedEventId> {
self.event.event_id()
}
/// Check whether [`Self`] has a sender profile.
pub fn has_sender_profile(&self) -> bool {
self.sender_profile.is_some()
}
/// Return the sender's display name if it was known at the time [`Self`]
/// was built.
pub fn sender_display_name(&self) -> Option<&str> {
self.sender_profile.as_ref().and_then(|profile| {
profile.as_original().and_then(|event| event.content.displayname.as_deref())
})
}
/// Return `Some(true)` if the sender's name is ambiguous, `Some(false)` if
/// it isn't, `None` if ambiguity detection wasn't possible at the time
/// [`Self`] was built.
pub fn sender_name_ambiguous(&self) -> Option<bool> {
self.sender_name_is_ambiguous
}
/// Return the sender's avatar URL if it was known at the time [`Self`] was
/// built.
pub fn sender_avatar_url(&self) -> Option<&MxcUri> {
self.sender_profile.as_ref().and_then(|profile| {
profile.as_original().and_then(|event| event.content.avatar_url.as_deref())
})
}
}
#[cfg(test)]
mod tests {
use std::collections::BTreeMap;

View File

@@ -23,8 +23,6 @@ use std::{
use bitflags::bitflags;
use eyeball::{SharedObservable, Subscriber};
use futures_util::stream::{self, StreamExt};
#[cfg(feature = "experimental-sliding-sync")]
use matrix_sdk_common::deserialized_responses::SyncTimelineEvent;
#[cfg(all(feature = "e2e-encryption", feature = "experimental-sliding-sync"))]
use matrix_sdk_common::ring_buffer::RingBuffer;
#[cfg(feature = "experimental-sliding-sync")]
@@ -60,6 +58,8 @@ use super::{
members::{MemberInfo, MemberRoomInfo},
BaseRoomInfo, DisplayName, RoomCreateWithCreatorEventContent, RoomMember,
};
#[cfg(feature = "experimental-sliding-sync")]
use crate::latest_event::LatestEvent;
use crate::{
deserialized_responses::MemberEvent,
store::{DynStateStore, Result as StoreResult, StateStoreExt},
@@ -383,13 +383,13 @@ impl Room {
/// Return the last event in this room, if one has been cached during
/// sliding sync.
#[cfg(feature = "experimental-sliding-sync")]
pub fn latest_event(&self) -> Option<SyncTimelineEvent> {
pub fn latest_event(&self) -> Option<LatestEvent> {
self.inner.read().latest_event.clone()
}
/// Update the last event in the room
#[cfg(all(feature = "e2e-encryption", feature = "experimental-sliding-sync"))]
pub(crate) fn set_latest_event(&self, latest_event: Option<SyncTimelineEvent>) {
pub(crate) fn set_latest_event(&self, latest_event: Option<LatestEvent>) {
self.inner.update(|info| info.latest_event = latest_event);
}
@@ -410,8 +410,8 @@ impl Room {
/// Panics if index is not a valid index in the latest_encrypted_events
/// list.
#[cfg(all(feature = "e2e-encryption", feature = "experimental-sliding-sync"))]
pub(crate) fn on_latest_event_decrypted(&self, event: SyncTimelineEvent, index: usize) {
self.set_latest_event(Some(event));
pub(crate) fn on_latest_event_decrypted(&self, latest_event: LatestEvent, index: usize) {
self.set_latest_event(Some(latest_event));
self.latest_encrypted_events.write().unwrap().drain(0..=index);
}
@@ -713,7 +713,7 @@ pub struct RoomInfo {
pub(crate) encryption_state_synced: bool,
/// The last event send by sliding sync
#[cfg(feature = "experimental-sliding-sync")]
pub(crate) latest_event: Option<SyncTimelineEvent>,
pub(crate) latest_event: Option<LatestEvent>,
/// Base room info which holds some basic event contents important for the
/// room state.
pub(crate) base_info: BaseRoomInfo,
@@ -869,9 +869,9 @@ impl RoomInfo {
if let Some(latest_event) = &mut self.latest_event {
trace!("Checking if redaction applies to latest event");
if latest_event.event_id().as_deref() == Some(redacts) {
match apply_redaction(&latest_event.event, _raw, room_version) {
match apply_redaction(&latest_event.event().event, _raw, room_version) {
Some(redacted) => {
latest_event.event = redacted;
latest_event.event_mut().event = redacted;
debug!("Redacted latest event");
}
None => {
@@ -1133,6 +1133,8 @@ mod tests {
#[cfg(feature = "experimental-sliding-sync")]
use super::SyncInfo;
use super::{Room, RoomInfo, RoomState};
#[cfg(any(feature = "experimental-sliding-sync", feature = "e2e-encryption"))]
use crate::latest_event::LatestEvent;
use crate::{
store::{MemoryStore, StateChanges, StateStore},
DisplayName, MinimalStateEvent, OriginalMinimalStateEvent,
@@ -1163,9 +1165,9 @@ mod tests {
last_prev_batch: Some("pb".to_owned()),
sync_info: SyncInfo::FullySynced,
encryption_state_synced: true,
latest_event: Some(
latest_event: Some(LatestEvent::new(
Raw::from_json_string(json!({"sender": "@u:i.uk"}).to_string()).unwrap().into(),
),
)),
base_info: BaseRoomInfo::new(),
};
@@ -1185,7 +1187,14 @@ mod tests {
"last_prev_batch": "pb",
"sync_info": "FullySynced",
"encryption_state_synced": true,
"latest_event": {"encryption_info": null, "event": {"sender": "@u:i.uk"}},
"latest_event": {
"event": {
"encryption_info": null,
"event": {
"sender": "@u:i.uk",
},
},
},
"base_info": {
"avatar": null,
"canonical_alias": null,
@@ -1546,7 +1555,7 @@ mod tests {
assert!(room.latest_event().is_none());
// When I provide a decrypted event to replace the encrypted one
let event = make_event("$A");
let event = make_latest_event("$A");
room.on_latest_event_decrypted(event.clone(), 0);
// Then is it stored
@@ -1558,14 +1567,14 @@ mod tests {
fn when_a_newly_decrypted_event_appears_we_delete_all_older_encrypted_events() {
// Given a room with some encrypted events and a latest event
let (_store, room) = make_room(RoomState::Joined);
room.inner.update(|info| info.latest_event = Some(make_event("$A")));
room.inner.update(|info| info.latest_event = Some(make_latest_event("$A")));
add_encrypted_event(&room, "$0");
add_encrypted_event(&room, "$1");
add_encrypted_event(&room, "$2");
add_encrypted_event(&room, "$3");
// When I provide a latest event
let new_event = make_event("$1");
let new_event = make_latest_event("$1");
let new_event_index = 1;
room.on_latest_event_decrypted(new_event.clone(), new_event_index);
@@ -1590,7 +1599,7 @@ mod tests {
add_encrypted_event(&room, "$3");
// When I provide a latest event and say it was the very latest
let new_event = make_event("$3");
let new_event = make_latest_event("$3");
let new_event_index = 3;
room.on_latest_event_decrypted(new_event, new_event_index);
@@ -1608,9 +1617,9 @@ mod tests {
}
#[cfg(feature = "experimental-sliding-sync")]
fn make_event(event_id: &str) -> SyncTimelineEvent {
SyncTimelineEvent::new(
fn make_latest_event(event_id: &str) -> LatestEvent {
LatestEvent::new(SyncTimelineEvent::new(
Raw::from_json_string(json!({ "event_id": event_id }).to_string()).unwrap(),
)
))
}
}

View File

@@ -32,7 +32,7 @@ use tracing::{instrument, trace, warn};
use super::BaseClient;
#[cfg(feature = "e2e-encryption")]
use crate::latest_event::{is_suitable_for_latest_event, PossibleLatestEvent};
use crate::latest_event::{is_suitable_for_latest_event, LatestEvent, PossibleLatestEvent};
#[cfg(feature = "e2e-encryption")]
use crate::RoomMemberships;
use crate::{
@@ -295,7 +295,8 @@ impl BaseClient {
// Cache the latest decrypted event in room_info, and also keep any later
// encrypted events, so we can slot them in when we get the keys.
#[cfg(feature = "e2e-encryption")]
cache_latest_events(&room, &mut room_info, &timeline.events);
cache_latest_events(&room, &mut room_info, &timeline.events, Some(changes), Some(store))
.await;
#[cfg(feature = "e2e-encryption")]
if room_info.is_encrypted() {
@@ -458,19 +459,81 @@ impl BaseClient {
/// If any encrypted events are found after that one, store them in the RoomInfo
/// too so we can use them when we get the relevant keys.
#[cfg(feature = "e2e-encryption")]
fn cache_latest_events(room: &Room, room_info: &mut RoomInfo, events: &[SyncTimelineEvent]) {
async fn cache_latest_events(
room: &Room,
room_info: &mut RoomInfo,
events: &[SyncTimelineEvent],
changes: Option<&StateChanges>,
store: Option<&Store>,
) {
let mut encrypted_events =
Vec::with_capacity(room.latest_encrypted_events.read().unwrap().capacity());
for e in events.iter().rev() {
if let Ok(timeline_event) = e.event.deserialize() {
for event in events.iter().rev() {
if let Ok(timeline_event) = event.event.deserialize() {
match is_suitable_for_latest_event(&timeline_event) {
PossibleLatestEvent::YesRoomMessage(_) | PossibleLatestEvent::YesPoll(_) => {
// m.room.message or m.poll.start - we found one! Store it.
// In order to make the latest event fast to read, we want to keep the
// associated sender in cache. This is a best-effort to gather enough
// information for creating a user profile as fast as possible. If information
// are missing, let's go back on the “slow” path.
let mut sender_profile = None;
let mut sender_name_is_ambiguous = None;
// First off, look up the sender's profile from the `StateChanges`, they are
// likely to be the most recent information.
if let Some(changes) = changes {
sender_profile = changes
.profiles
.get(room.room_id())
.and_then(|profiles_by_user| {
profiles_by_user.get(timeline_event.sender())
})
.cloned();
if let Some(sender_profile) = sender_profile.as_ref() {
sender_name_is_ambiguous = sender_profile
.as_original()
.and_then(|profile| profile.content.displayname.as_ref())
.and_then(|display_name| {
changes.ambiguity_maps.get(room.room_id()).and_then(
|map_for_room| {
map_for_room
.get(display_name)
.map(|user_ids| user_ids.len() > 1)
},
)
});
}
}
// Otherwise, look up the sender's profile from the `Store`.
if sender_profile.is_none() {
if let Some(store) = store {
sender_profile = store
.get_profile(room.room_id(), timeline_event.sender())
.await
.ok()
.flatten();
// TODO: need to update `sender_name_is_ambiguous`,
// but how?
}
}
let latest_event = LatestEvent::new_with_sender_details(
event.clone(),
sender_profile,
sender_name_is_ambiguous,
);
// Store it in the return RoomInfo, and in the Room, to make sure they are
// consistent
room_info.latest_event = Some(e.clone());
room.set_latest_event(Some(e.clone()));
room_info.latest_event = Some(latest_event.clone());
room.set_latest_event(Some(latest_event));
// We don't need any of the older encrypted events because we have a new
// decrypted one.
room.latest_encrypted_events.write().unwrap().clear();
@@ -485,7 +548,7 @@ fn cache_latest_events(room: &Room, room_info: &mut RoomInfo, events: &[SyncTime
// Check how many encrypted events we have seen. Only store another if we
// haven't already stored the maximum number.
if encrypted_events.len() < encrypted_events.capacity() {
encrypted_events.push(e.event.clone());
encrypted_events.push(event.event.clone());
}
}
_ => {
@@ -495,7 +558,7 @@ fn cache_latest_events(room: &Room, room_info: &mut RoomInfo, events: &[SyncTime
} else {
warn!(
"Failed to deserialize event as AnySyncTimelineEvent. ID={}",
e.event_id().expect("Event has no ID!")
event.event_id().expect("Event has no ID!")
);
}
}
@@ -964,7 +1027,10 @@ mod tests {
// Then the room holds the latest event
let client_room = client.get_room(room_id).expect("No room found");
assert_eq!(ev_id(client_room.latest_event()), "$idb");
assert_eq!(
ev_id(client_room.latest_event().map(|latest_event| latest_event.event().clone())),
"$idb"
);
}
#[async_test]
@@ -987,7 +1053,10 @@ mod tests {
// Then the room holds the latest event
let client_room = client.get_room(room_id).expect("No room found");
assert_eq!(ev_id(client_room.latest_event()), "$ida");
assert_eq!(
ev_id(client_room.latest_event().map(|latest_event| latest_event.event().clone())),
"$ida"
);
let redaction = json!({
"sender": "@alice:example.com",
@@ -1010,58 +1079,58 @@ mod tests {
// But it's now redacted
assert_matches!(
latest_event.event.deserialize().unwrap(),
latest_event.event().event.deserialize().unwrap(),
AnySyncTimelineEvent::MessageLike(AnySyncMessageLikeEvent::RoomMessage(
SyncRoomMessageEvent::Redacted(_)
))
);
}
#[test]
fn when_no_events_we_dont_cache_any() {
#[async_test]
async fn when_no_events_we_dont_cache_any() {
let events = &[];
let chosen = choose_event_to_cache(events);
let chosen = choose_event_to_cache(events).await;
assert!(chosen.is_none());
}
#[test]
fn when_only_one_event_we_cache_it() {
#[async_test]
async fn when_only_one_event_we_cache_it() {
let event1 = make_event("m.room.message", "$1");
let events = &[event1.clone()];
let chosen = choose_event_to_cache(events);
let chosen = choose_event_to_cache(events).await;
assert_eq!(ev_id(chosen), rawev_id(event1));
}
#[test]
fn with_multiple_events_we_cache_the_last_one() {
#[async_test]
async fn with_multiple_events_we_cache_the_last_one() {
let event1 = make_event("m.room.message", "$1");
let event2 = make_event("m.room.message", "$2");
let events = &[event1, event2.clone()];
let chosen = choose_event_to_cache(events);
let chosen = choose_event_to_cache(events).await;
assert_eq!(ev_id(chosen), rawev_id(event2));
}
#[test]
fn cache_the_latest_relevant_event_and_ignore_irrelevant_ones_even_if_later() {
#[async_test]
async fn cache_the_latest_relevant_event_and_ignore_irrelevant_ones_even_if_later() {
let event1 = make_event("m.room.message", "$1");
let event2 = make_event("m.room.message", "$2");
let event3 = make_event("m.room.powerlevels", "$3");
let event4 = make_event("m.room.powerlevels", "$5");
let events = &[event1, event2.clone(), event3, event4];
let chosen = choose_event_to_cache(events);
let chosen = choose_event_to_cache(events).await;
assert_eq!(ev_id(chosen), rawev_id(event2));
}
#[test]
fn prefer_to_cache_nothing_rather_than_irrelevant_events() {
#[async_test]
async fn prefer_to_cache_nothing_rather_than_irrelevant_events() {
let event1 = make_event("m.room.power_levels", "$1");
let events = &[event1];
let chosen = choose_event_to_cache(events);
let chosen = choose_event_to_cache(events).await;
assert!(chosen.is_none());
}
#[test]
fn cache_encrypted_events_that_are_after_latest_message() {
#[async_test]
async fn cache_encrypted_events_that_are_after_latest_message() {
// Given two message events followed by two encrypted
let event1 = make_event("m.room.message", "$1");
let event2 = make_event("m.room.message", "$2");
@@ -1072,18 +1141,24 @@ mod tests {
// When I ask to cache events
let room = make_room();
let mut room_info = room.clone_info();
cache_latest_events(&room, &mut room_info, events);
cache_latest_events(&room, &mut room_info, events, None, None).await;
// The latest message is stored
assert_eq!(ev_id(room_info.latest_event), rawev_id(event2.clone()));
assert_eq!(ev_id(room.latest_event()), rawev_id(event2));
assert_eq!(
ev_id(room_info.latest_event.map(|latest_event| latest_event.event().clone())),
rawev_id(event2.clone())
);
assert_eq!(
ev_id(room.latest_event().map(|latest_event| latest_event.event().clone())),
rawev_id(event2)
);
// And also the two encrypted ones
assert_eq!(rawevs_ids(&room.latest_encrypted_events), evs_ids(&[event3, event4]));
}
#[test]
fn dont_cache_encrypted_events_that_are_before_latest_message() {
#[async_test]
async fn dont_cache_encrypted_events_that_are_before_latest_message() {
// Given an encrypted event before and after the message
let event1 = make_encrypted_event("$1");
let event2 = make_event("m.room.message", "$2");
@@ -1093,17 +1168,20 @@ mod tests {
// When I ask to cache events
let room = make_room();
let mut room_info = room.clone_info();
cache_latest_events(&room, &mut room_info, events);
cache_latest_events(&room, &mut room_info, events, None, None).await;
// The latest message is stored
assert_eq!(ev_id(room.latest_event()), rawev_id(event2));
assert_eq!(
ev_id(room.latest_event().map(|latest_event| latest_event.event().clone())),
rawev_id(event2)
);
// And also the encrypted one that was after it, but not the one before
assert_eq!(rawevs_ids(&room.latest_encrypted_events), evs_ids(&[event3]));
}
#[test]
fn skip_irrelevant_events_eg_receipts_even_if_after_message() {
#[async_test]
async fn skip_irrelevant_events_eg_receipts_even_if_after_message() {
// Given two message events followed by two encrypted, with a receipt in the
// middle
let event1 = make_event("m.room.message", "$1");
@@ -1116,17 +1194,20 @@ mod tests {
// When I ask to cache events
let room = make_room();
let mut room_info = room.clone_info();
cache_latest_events(&room, &mut room_info, events);
cache_latest_events(&room, &mut room_info, events, None, None).await;
// The latest message is stored, ignoring the receipt
assert_eq!(ev_id(room.latest_event()), rawev_id(event2));
assert_eq!(
ev_id(room.latest_event().map(|latest_event| latest_event.event().clone())),
rawev_id(event2)
);
// The two encrypted ones are stored, but not the receipt
assert_eq!(rawevs_ids(&room.latest_encrypted_events), evs_ids(&[event3, event5]));
}
#[test]
fn only_store_the_max_number_of_encrypted_events() {
#[async_test]
async fn only_store_the_max_number_of_encrypted_events() {
// Given two message events followed by lots of encrypted and other irrelevant
// events
let evente = make_event("m.room.message", "$e");
@@ -1165,10 +1246,13 @@ mod tests {
// When I ask to cache events
let room = make_room();
let mut room_info = room.clone_info();
cache_latest_events(&room, &mut room_info, events);
cache_latest_events(&room, &mut room_info, events, None, None).await;
// The latest message is stored, ignoring encrypted and receipts
assert_eq!(ev_id(room.latest_event()), rawev_id(eventd));
assert_eq!(
ev_id(room.latest_event().map(|latest_event| latest_event.event().clone())),
rawev_id(eventd)
);
// Only 10 encrypted are stored, even though there were more
assert_eq!(
@@ -1179,8 +1263,8 @@ mod tests {
);
}
#[test]
fn dont_overflow_capacity_if_previous_encrypted_events_exist() {
#[async_test]
async fn dont_overflow_capacity_if_previous_encrypted_events_exist() {
// Given a RoomInfo with lots of encrypted events already inside it
let room = make_room();
let mut room_info = room.clone_info();
@@ -1199,14 +1283,17 @@ mod tests {
make_encrypted_event("$8"),
make_encrypted_event("$9"),
],
);
None,
None,
)
.await;
// Sanity: room_info has 10 encrypted events inside it
assert_eq!(room.latest_encrypted_events.read().unwrap().len(), 10);
// When I ask to cache more encrypted events
let eventa = make_encrypted_event("$a");
let mut room_info = room.clone_info();
cache_latest_events(&room, &mut room_info, &[eventa]);
cache_latest_events(&room, &mut room_info, &[eventa], None, None).await;
// The oldest event is gone
assert!(!rawevs_ids(&room.latest_encrypted_events).contains(&"$0".to_owned()));
@@ -1215,8 +1302,8 @@ mod tests {
assert_eq!(rawevs_ids(&room.latest_encrypted_events)[9], "$a");
}
#[test]
fn existing_encrypted_events_are_deleted_if_we_receive_unencrypted() {
#[async_test]
async fn existing_encrypted_events_are_deleted_if_we_receive_unencrypted() {
// Given a RoomInfo with some encrypted events already inside it
let room = make_room();
let mut room_info = room.clone_info();
@@ -1224,25 +1311,28 @@ mod tests {
&room,
&mut room_info,
&[make_encrypted_event("$0"), make_encrypted_event("$1"), make_encrypted_event("$2")],
);
None,
None,
)
.await;
// When I ask to cache an unecnrypted event, and some more encrypted events
let eventa = make_event("m.room.message", "$a");
let eventb = make_encrypted_event("$b");
cache_latest_events(&room, &mut room_info, &[eventa, eventb]);
cache_latest_events(&room, &mut room_info, &[eventa, eventb], None, None).await;
// The only encrypted events stored are the ones after the decrypted one
assert_eq!(rawevs_ids(&room.latest_encrypted_events), &["$b"]);
// The decrypted one is stored as the latest
assert_eq!(rawev_id(room.latest_event().unwrap()), "$a");
assert_eq!(rawev_id(room.latest_event().unwrap().event().clone()), "$a");
}
fn choose_event_to_cache(events: &[SyncTimelineEvent]) -> Option<SyncTimelineEvent> {
async fn choose_event_to_cache(events: &[SyncTimelineEvent]) -> Option<SyncTimelineEvent> {
let room = make_room();
let mut room_info = room.clone_info();
cache_latest_events(&room, &mut room_info, events);
room.latest_event()
cache_latest_events(&room, &mut room_info, events, None, None).await;
room.latest_event().map(|latest_event| latest_event.event().clone())
}
fn rawev_id(event: SyncTimelineEvent) -> String {

View File

@@ -38,6 +38,8 @@ use ruma::{
};
use serde::{Deserialize, Serialize};
#[cfg(feature = "experimental-sliding-sync")]
use crate::latest_event::LatestEvent;
use crate::{
deserialized_responses::SyncOrStrippedState,
rooms::{
@@ -115,7 +117,7 @@ impl RoomInfoV1 {
sync_info,
encryption_state_synced,
#[cfg(feature = "experimental-sliding-sync")]
latest_event,
latest_event: latest_event.map(LatestEvent::new),
base_info: base_info.migrate(create),
}
}

View File

@@ -177,6 +177,7 @@ impl RoomListService {
.required_state(vec![
(StateEventType::RoomAvatar, "".to_owned()),
(StateEventType::RoomEncryption, "".to_owned()),
(StateEventType::RoomMember, "$LAZY".to_owned()),
(StateEventType::RoomPowerLevels, "".to_owned()),
]),
))

View File

@@ -268,8 +268,9 @@ impl RepliedToEvent {
let content =
TimelineItemContent::Message(Message::from_event(c, event.relations(), &vector![]));
let sender = event.sender().to_owned();
let sender_profile =
TimelineDetails::from_initial_value(room_data_provider.profile(&sender).await);
let sender_profile = TimelineDetails::from_initial_value(
room_data_provider.profile_from_user_id(&sender).await,
);
Ok(Self { content, sender, sender_profile })
}

View File

@@ -109,9 +109,9 @@ pub enum TimelineItemContent {
}
impl TimelineItemContent {
/// If the supplied event is suitable to be used as a latest_event in a
/// If the supplied event is suitable to be used as a `latest_event` in a
/// message preview, extract its contents and wrap it as a
/// TimelineItemContent.
/// `TimelineItemContent`.
pub(crate) fn from_latest_event_content(
event: AnySyncTimelineEvent,
) -> Option<TimelineItemContent> {
@@ -146,23 +146,23 @@ impl TimelineItemContent {
/// Given some message content that is from an event that we have already
/// determined is suitable for use as a latest event in a message preview,
/// extract its contents and wrap it as a TimelineItemContent.
/// extract its contents and wrap it as a `TimelineItemContent`.
fn from_suitable_latest_event_content(event: &SyncRoomMessageEvent) -> TimelineItemContent {
match event {
SyncRoomMessageEvent::Original(event) => {
// Grab the content of this event
let event_content = event.content.clone();
// We don't have access to any relations via the AnySyncTimelineEvent (I think -
// andyb) so we pretend there are none. This might be OK for the message preview
// use case.
// We don't have access to any relations via the `AnySyncTimelineEvent` (I think
// - andyb) so we pretend there are none. This might be OK for
// the message preview use case.
let relations = BundledMessageLikeRelations::new();
// If this message is a reply, we would look up in this list the message it was
// replying to. Since we probably won't show this in the message preview,
// it's probably OK to supply an empty list here.
// Message::from_event marks the original event as Unavailable if it can't be
// found inside the timeline_items.
// `Message::from_event` marks the original event as `Unavailable` if it can't
// be found inside the timeline_items.
let timeline_items = Vector::new();
TimelineItemContent::Message(Message::from_event(
event_content,
@@ -174,7 +174,7 @@ impl TimelineItemContent {
}
}
/// extracts a TimelineItemContent from a poll start event for use as a
/// Extracts a `TimelineItemContent` from a poll start event for use as a
/// latest event in a message preview.
fn from_suitable_latest_poll_event_content(
event: &SyncUnstablePollStartEvent,

View File

@@ -16,7 +16,7 @@ use std::sync::Arc;
use indexmap::IndexMap;
use matrix_sdk::{deserialized_responses::EncryptionInfo, Client, Error};
use matrix_sdk_base::deserialized_responses::SyncTimelineEvent;
use matrix_sdk_base::{deserialized_responses::SyncTimelineEvent, latest_event::LatestEvent};
use once_cell::sync::Lazy;
use ruma::{
events::{receipt::Receipt, room::message::MessageType, AnySyncTimelineEvent},
@@ -90,17 +90,18 @@ impl EventTimelineItem {
Self { sender, sender_profile, timestamp, content, kind }
}
/// If the supplied low-level SyncTimelineEventy is suitable for use as the
/// latest_event in a message preview, wrap it as an EventTimelineItem,
/// If the supplied low-level `SyncTimelineEventy` is suitable for use as
/// the `latest_event` in a message preview, wrap it as an
/// `EventTimelineItem`.
pub async fn from_latest_event(
client: Client,
room_id: &RoomId,
sync_event: SyncTimelineEvent,
latest_event: LatestEvent,
) -> Option<EventTimelineItem> {
use super::traits::RoomDataProvider;
let raw_sync_event = sync_event.event;
let encryption_info = sync_event.encryption_info;
let SyncTimelineEvent { event: raw_sync_event, encryption_info, .. } =
latest_event.event().clone();
let Ok(event) = raw_sync_event.deserialize_as::<AnySyncTimelineEvent>() else {
warn!("Unable to deserialize latest_event as an AnySyncTimelineEvent!");
@@ -112,8 +113,8 @@ impl EventTimelineItem {
let event_id = event.event_id().to_owned();
let is_own = client.user_id().map(|uid| uid == sender).unwrap_or(false);
// If we don't (yet) know how to handle this type of message, return None here.
// If we do, convert it into a TimelineItemContent.
// If we don't (yet) know how to handle this type of message, return `None`
// here. If we do, convert it into a `TimelineItemContent`.
let item_content = TimelineItemContent::from_latest_event_content(event)?;
// We don't currently bundle any reactions with the main event. This could
@@ -148,15 +149,19 @@ impl EventTimelineItem {
let room = client.get_room(room_id);
let sender_profile = if let Some(room) = room {
room.profile(&sender)
.await
.map(TimelineDetails::Ready)
.unwrap_or(TimelineDetails::Unavailable)
let mut profile = room.profile_from_latest_event(&latest_event).await;
// Fallback to the slow path.
if profile.is_none() {
profile = room.profile_from_user_id(&sender).await;
}
profile.map(TimelineDetails::Ready).unwrap_or(TimelineDetails::Unavailable)
} else {
TimelineDetails::Unavailable
};
Some(EventTimelineItem::new(sender, sender_profile, timestamp, item_content, event_kind))
Some(Self::new(sender, sender_profile, timestamp, item_content, event_kind))
}
/// Check whether this item is a local echo.
@@ -433,7 +438,7 @@ impl From<RemoteEventTimelineItem> for EventTimelineItemKind {
}
/// The display name and avatar URL of a room member.
#[derive(Clone, Debug, PartialEq, Eq)]
#[derive(Clone, Debug, Default, PartialEq, Eq)]
pub struct Profile {
/// The display name, if set.
pub display_name: Option<String>,
@@ -498,20 +503,25 @@ pub enum EventItemOrigin {
mod tests {
use assert_matches::assert_matches;
use matrix_sdk::{config::RequestConfig, Client, ClientBuilder};
use matrix_sdk_base::{deserialized_responses::SyncTimelineEvent, BaseClient, SessionMeta};
use matrix_sdk_test::async_test;
use matrix_sdk_base::{
deserialized_responses::SyncTimelineEvent, latest_event::LatestEvent, BaseClient,
MinimalStateEvent, OriginalMinimalStateEvent, SessionMeta,
};
use matrix_sdk_test::{async_test, sync_timeline_event};
use ruma::{
api::{client::sync::sync_events::v4, MatrixVersion},
device_id,
events::{
room::message::{MessageFormat, MessageType},
room::{
member::RoomMemberEventContent,
message::{MessageFormat, MessageType},
},
AnySyncTimelineEvent,
},
room_id,
serde::Raw,
user_id, RoomId, UInt, UserId,
};
use serde_json::json;
use super::{EventTimelineItem, Profile};
use crate::timeline::TimelineDetails;
@@ -527,7 +537,9 @@ mod tests {
// When we construct a timeline event from it
let timeline_item =
EventTimelineItem::from_latest_event(client, room_id, event).await.unwrap();
EventTimelineItem::from_latest_event(client, room_id, LatestEvent::new(event))
.await
.unwrap();
// Then its properties correctly translate
assert_eq!(timeline_item.sender, user_id);
@@ -544,7 +556,7 @@ mod tests {
}
#[async_test]
async fn latest_message_event_can_be_wrapped_as_a_timeline_item_with_sender() {
async fn latest_message_event_can_be_wrapped_as_a_timeline_item_with_sender_from_the_storage() {
// Given a sync event that is suitable to be used as a latest_event, and a room
// with a member event for the sender
@@ -562,7 +574,55 @@ mod tests {
// When we construct a timeline event from it
let timeline_item =
EventTimelineItem::from_latest_event(client, room_id, event).await.unwrap();
EventTimelineItem::from_latest_event(client, room_id, LatestEvent::new(event))
.await
.unwrap();
// Then its sender is properly populated
let profile = assert_matches!(timeline_item.sender_profile, TimelineDetails::Ready(p) => p);
assert_eq!(
profile,
Profile {
display_name: Some("Alice Margatroid".to_owned()),
display_name_ambiguous: false,
avatar_url: Some(owned_mxc_uri!("mxc://e.org/SEs"))
}
);
}
#[async_test]
async fn latest_message_event_can_be_wrapped_as_a_timeline_item_with_sender_from_the_cache() {
// Given a sync event that is suitable to be used as a latest_event, a room, and
// a member event for the sender (which isn't part of the room yet).
use ruma::owned_mxc_uri;
let room_id = room_id!("!q:x.uk");
let user_id = user_id!("@t:o.uk");
let event = message_event(room_id, user_id, "**My M**", "<b>My M</b>", 122344);
let client = logged_in_client(None).await;
let member_event = MinimalStateEvent::Original(
member_event(room_id, user_id, "Alice Margatroid", "mxc://e.org/SEs")
.deserialize_as::<OriginalMinimalStateEvent<RoomMemberEventContent>>()
.unwrap(),
);
let room = v4::SlidingSyncRoom::new();
// Do not push the `member_event` inside the room. Let's say it's flying in the
// `StateChanges`.
// And the room is stored in the client so it can be extracted when needed
let response = response_with_room(room_id, room).await;
client.process_sliding_sync(&response).await.unwrap();
// When we construct a timeline event from it
let timeline_item = EventTimelineItem::from_latest_event(
client,
room_id,
LatestEvent::new_with_sender_details(event, Some(member_event), None),
)
.await
.unwrap();
// Then its sender is properly populated
let profile = assert_matches!(timeline_item.sender_profile, TimelineDetails::Ready(p) => p);
@@ -582,28 +642,24 @@ mod tests {
display_name: &str,
avatar_url: &str,
) -> Raw<AnySyncTimelineEvent> {
Raw::from_json_string(
json!({
"type": "m.room.member",
"content": {
"avatar_url": avatar_url,
"displayname": display_name,
"membership": "join",
"reason": ""
},
"event_id": "$143273582443PhrSn:example.org",
"origin_server_ts": 143273583,
"room_id": room_id,
"sender": "@example:example.org",
"state_key": user_id,
"type": "m.room.member",
"unsigned": {
"age": 1234
}
})
.to_string(),
)
.unwrap()
sync_timeline_event!({
"type": "m.room.member",
"content": {
"avatar_url": avatar_url,
"displayname": display_name,
"membership": "join",
"reason": ""
},
"event_id": "$143273582443PhrSn:example.org",
"origin_server_ts": 143273583,
"room_id": room_id,
"sender": "@example:example.org",
"state_key": user_id,
"type": "m.room.member",
"unsigned": {
"age": 1234
}
})
}
async fn response_with_room(room_id: &RoomId, room: v4::SlidingSyncRoom) -> v4::Response {
@@ -619,25 +675,20 @@ mod tests {
formatted_body: &str,
ts: u64,
) -> SyncTimelineEvent {
SyncTimelineEvent::new(
Raw::from_json_string(
json!({
"event_id": "$eventid6",
"sender": user_id,
"origin_server_ts": ts,
"type": "m.room.message",
"room_id": room_id.to_string(),
"content": {
"body": body,
"format": "org.matrix.custom.html",
"formatted_body": formatted_body,
"msgtype": "m.text"
},
})
.to_string(),
)
.unwrap(),
)
sync_timeline_event!({
"event_id": "$eventid6",
"sender": user_id,
"origin_server_ts": ts,
"type": "m.room.message",
"room_id": room_id.to_string(),
"content": {
"body": body,
"format": "org.matrix.custom.html",
"formatted_body": formatted_body,
"msgtype": "m.text"
},
})
.into()
}
/// Copied from matrix_sdk_base::sliding_sync::test

View File

@@ -206,7 +206,7 @@ impl<P: RoomDataProvider> TimelineInner<P> {
};
let sender = self.room_data_provider.own_user_id().to_owned();
let sender_profile = self.room_data_provider.profile(&sender).await;
let sender_profile = self.room_data_provider.profile_from_user_id(&sender).await;
let reaction_state = match (to_redact_local, to_redact_remote) {
(None, None) => {
// No record of the reaction, create a local echo
@@ -355,7 +355,7 @@ impl<P: RoomDataProvider> TimelineInner<P> {
content: AnyMessageLikeEventContent,
) {
let sender = self.room_data_provider.own_user_id().to_owned();
let profile = self.room_data_provider.profile(&sender).await;
let profile = self.room_data_provider.profile_from_user_id(&sender).await;
let mut state = self.state.write().await;
state.handle_local_event(sender, profile, txn_id, content, &self.settings);
@@ -370,7 +370,7 @@ impl<P: RoomDataProvider> TimelineInner<P> {
content: RoomRedactionEventContent,
) {
let sender = self.room_data_provider.own_user_id().to_owned();
let profile = self.room_data_provider.profile(&sender).await;
let profile = self.room_data_provider.profile_from_user_id(&sender).await;
let mut state = self.state.write().await;
state.handle_local_redaction(sender, profile, txn_id, to_redact, content, &self.settings);
@@ -781,7 +781,7 @@ impl<P: RoomDataProvider> TimelineInner<P> {
continue;
}
match self.room_data_provider.profile(event_item.sender()).await {
match self.room_data_provider.profile_from_user_id(event_item.sender()).await {
Some(profile) => {
trace!(event_id, transaction_id, "Adding profile");
let updated_item =

View File

@@ -567,7 +567,7 @@ impl TimelineInnerStateTransaction<'_> {
}
let is_own_event = sender == room_data_provider.own_user_id();
let sender_profile = room_data_provider.profile(&sender).await;
let sender_profile = room_data_provider.profile_from_user_id(&sender).await;
let ctx = TimelineEventContext {
sender,
sender_profile,

View File

@@ -23,6 +23,7 @@ use futures_core::Stream;
use futures_util::{FutureExt, StreamExt};
use indexmap::IndexMap;
use matrix_sdk::deserialized_responses::{SyncTimelineEvent, TimelineEvent};
use matrix_sdk_base::latest_event::LatestEvent;
use matrix_sdk_test::{EventBuilder, ALICE};
use ruma::{
events::{
@@ -241,7 +242,11 @@ impl RoomDataProvider for TestRoomDataProvider {
RoomVersionId::V10
}
async fn profile(&self, _user_id: &UserId) -> Option<Profile> {
async fn profile_from_user_id(&self, _user_id: &UserId) -> Option<Profile> {
None
}
async fn profile_from_latest_event(&self, _latest_event: &LatestEvent) -> Option<Profile> {
None
}

View File

@@ -17,6 +17,7 @@ use indexmap::IndexMap;
use matrix_sdk::Room;
#[cfg(feature = "e2e-encryption")]
use matrix_sdk::{deserialized_responses::TimelineEvent, Result};
use matrix_sdk_base::latest_event::LatestEvent;
use ruma::{
events::receipt::{Receipt, ReceiptThread, ReceiptType},
push::{PushConditionRoomCtx, Ruleset},
@@ -66,7 +67,8 @@ impl RoomExt for Room {
pub(super) trait RoomDataProvider: Clone + Send + Sync + 'static {
fn own_user_id(&self) -> &UserId;
fn room_version(&self) -> RoomVersionId;
async fn profile(&self, user_id: &UserId) -> Option<Profile>;
async fn profile_from_user_id(&self, user_id: &UserId) -> Option<Profile>;
async fn profile_from_latest_event(&self, latest_event: &LatestEvent) -> Option<Profile>;
async fn read_receipts_for_event(&self, event_id: &EventId) -> IndexMap<OwnedUserId, Receipt>;
async fn push_rules_and_context(&self) -> Option<(Ruleset, PushConditionRoomCtx)>;
}
@@ -84,18 +86,14 @@ impl RoomDataProvider for Room {
})
}
async fn profile(&self, user_id: &UserId) -> Option<Profile> {
async fn profile_from_user_id(&self, user_id: &UserId) -> Option<Profile> {
match self.get_member_no_sync(user_id).await {
Ok(Some(member)) => Some(Profile {
display_name: member.display_name().map(ToOwned::to_owned),
display_name_ambiguous: member.name_ambiguous(),
avatar_url: member.avatar_url().map(ToOwned::to_owned),
}),
Ok(None) if self.are_members_synced() => Some(Profile {
display_name: None,
display_name_ambiguous: false,
avatar_url: None,
}),
Ok(None) if self.are_members_synced() => Some(Profile::default()),
Ok(None) => None,
Err(e) => {
error!(%user_id, "Failed to fetch room member information: {e}");
@@ -104,6 +102,18 @@ impl RoomDataProvider for Room {
}
}
async fn profile_from_latest_event(&self, latest_event: &LatestEvent) -> Option<Profile> {
if !latest_event.has_sender_profile() {
return None;
}
Some(Profile {
display_name: latest_event.sender_display_name().map(ToOwned::to_owned),
display_name_ambiguous: latest_event.sender_name_ambiguous().unwrap_or(false),
avatar_url: latest_event.sender_avatar_url().map(ToOwned::to_owned),
})
}
async fn read_receipts_for_event(&self, event_id: &EventId) -> IndexMap<OwnedUserId, Receipt> {
match self.event_receipts(ReceiptType::Read, ReceiptThread::Unthreaded, event_id).await {
Ok(receipts) => receipts.into_iter().collect(),

View File

@@ -274,6 +274,7 @@ async fn test_sync_all_states() -> Result<(), Error> {
"required_state": [
["m.room.avatar", ""],
["m.room.encryption", ""],
["m.room.member", "$LAZY"],
["m.room.power_levels", ""],
],
"filters": {

View File

@@ -15,10 +15,9 @@ use super::{
FrozenSlidingSync, FrozenSlidingSyncList, SlidingSync, SlidingSyncList,
SlidingSyncPositionMarkers,
};
use crate::{
sliding_sync::{FrozenSlidingSyncPos, SlidingSyncListCachePolicy},
Client, Result,
};
#[cfg(feature = "e2e-encryption")]
use crate::sliding_sync::FrozenSlidingSyncPos;
use crate::{sliding_sync::SlidingSyncListCachePolicy, Client, Result};
/// Be careful: as this is used as a storage key; changing it requires migrating
/// data!

View File

@@ -50,12 +50,13 @@ use tokio::{
use tracing::{debug, error, info, instrument, trace, warn, Instrument, Span};
use url::Url;
#[cfg(feature = "e2e-encryption")]
use self::utils::JoinHandleExt as _;
pub use self::{builder::*, error::*, list::*, room::*};
use self::{
cache::restore_sliding_sync_state,
client::SlidingSyncResponseProcessor,
sticky_parameters::{LazyTransactionId, SlidingSyncStickyManager, StickyData},
utils::JoinHandleExt as _,
};
use crate::{config::RequestConfig, Client, Result};

View File

@@ -5,7 +5,7 @@ use std::{
};
use eyeball_im::Vector;
use matrix_sdk_base::deserialized_responses::SyncTimelineEvent;
use matrix_sdk_base::{deserialized_responses::SyncTimelineEvent, latest_event::LatestEvent};
use ruma::{
api::client::sync::sync_events::{v4, UnreadNotificationsCount},
events::AnySyncStateEvent,
@@ -133,7 +133,7 @@ impl SlidingSyncRoom {
}
/// Find the latest event in this room
pub fn latest_event(&self) -> Option<SyncTimelineEvent> {
pub fn latest_event(&self) -> Option<LatestEvent> {
self.inner.client.get_room(&self.inner.room_id).and_then(|room| room.latest_event())
}