Merge branch 'main' into rav/history_sharing/upload_bundle

Signed-off-by: Richard van der Hoff <1389908+richvdh@users.noreply.github.com>
This commit is contained in:
Richard van der Hoff
2025-04-15 14:32:42 +01:00
committed by GitHub
40 changed files with 1710 additions and 1257 deletions

View File

@@ -25,7 +25,7 @@ jobs:
- uses: actions/checkout@v4
- name: Check for changed files
id: changed-files
uses: tj-actions/changed-files@6f67ee9ac810f0192ea7b3d2086406f97847bcf9 # v45
uses: tj-actions/changed-files@9b4bb2bedb217d3ede225b6b07ebde713177cd8f # v45
- name: Detect long path
env:
ALL_CHANGED_FILES: ${{ steps.changed-files.outputs.all_changed_files }} # ignore the deleted files

View File

@@ -6,6 +6,10 @@ All notable changes to this project will be documented in this file.
## [Unreleased] - ReleaseDate
Additions:
- Add room topic string to `StateEventContent`
## [0.11.0] - 2025-04-11
Breaking changes:
@@ -20,7 +24,7 @@ Breaking changes:
programs can set it to `true`.
- Matrix client API errors coming from API responses will now be mapped to `ClientError::MatrixApi`, containing both the
original message and the associated error code and kind.
original message and the associated error code and kind.
- `EventSendState` now has two additional variants: `CrossSigningNotSetup` and
`SendingFromUnverifiedDevice`. These indicate that your own device is not

View File

@@ -83,7 +83,7 @@ pub enum StateEventContent {
RoomServerAcl,
RoomThirdPartyInvite,
RoomTombstone,
RoomTopic,
RoomTopic { topic: String },
SpaceChild,
SpaceParent,
}
@@ -118,7 +118,11 @@ impl TryFrom<AnySyncStateEvent> for StateEventContent {
AnySyncStateEvent::RoomServerAcl(_) => StateEventContent::RoomServerAcl,
AnySyncStateEvent::RoomThirdPartyInvite(_) => StateEventContent::RoomThirdPartyInvite,
AnySyncStateEvent::RoomTombstone(_) => StateEventContent::RoomTombstone,
AnySyncStateEvent::RoomTopic(_) => StateEventContent::RoomTopic,
AnySyncStateEvent::RoomTopic(content) => {
let content = get_state_event_original_content(content)?;
StateEventContent::RoomTopic { topic: content.topic }
}
AnySyncStateEvent::SpaceChild(_) => StateEventContent::SpaceChild,
AnySyncStateEvent::SpaceParent(_) => StateEventContent::SpaceParent,
_ => bail!("Unsupported state event"),

View File

@@ -17,7 +17,7 @@
use std::sync::Arc;
use std::{
collections::{BTreeMap, BTreeSet, HashMap},
fmt, iter,
fmt,
ops::Deref,
};
@@ -38,35 +38,34 @@ use ruma::{
events::{
push_rules::{PushRulesEvent, PushRulesEventContent},
room::member::SyncRoomMemberEvent,
AnyStrippedStateEvent, AnySyncEphemeralRoomEvent, StateEvent, StateEventType,
StateEvent, StateEventType,
},
push::{Action, Ruleset},
serde::Raw,
push::Ruleset,
time::Instant,
OwnedRoomId, OwnedUserId, RoomId,
};
use tokio::sync::{broadcast, Mutex};
#[cfg(feature = "e2e-encryption")]
use tokio::sync::{RwLock, RwLockReadGuard};
use tracing::{debug, info, instrument};
use tracing::{debug, enabled, info, instrument, Level};
#[cfg(feature = "e2e-encryption")]
use crate::RoomMemberships;
use crate::{
deserialized_responses::{DisplayName, RawAnySyncOrStrippedTimelineEvent},
deserialized_responses::DisplayName,
error::{Error, Result},
event_cache::store::EventCacheStoreLock,
response_processors::{self as processors, Context},
rooms::{
normal::{RoomInfoNotableUpdate, RoomInfoNotableUpdateReasons, RoomMembersUpdate},
Room, RoomInfo, RoomState,
Room, RoomState,
},
store::{
ambiguity_map::AmbiguityCache, BaseStateStore, DynStateStore, MemoryStore,
Result as StoreResult, RoomLoadSettings, StateChanges, StateStoreDataKey,
StateStoreDataValue, StateStoreExt, StoreConfig,
},
sync::{JoinedRoomUpdate, LeftRoomUpdate, Notification, RoomUpdates, SyncResponse},
sync::{RoomUpdates, SyncResponse},
RoomStateFilter, SessionMeta,
};
@@ -367,65 +366,6 @@ impl BaseClient {
self.state_store.sync_token.read().await.clone()
}
/// Handles the stripped state events in `invite_state`, modifying the
/// room's info and posting notifications as needed.
///
/// * `room` - The [`Room`] to modify.
/// * `events` - The contents of `invite_state` in the form of list of pairs
/// of raw stripped state events with their deserialized counterpart.
/// * `push_rules` - The push rules for this room.
/// * `room_info` - The current room's info.
/// * `changes` - The accumulated list of changes to apply once the
/// processing is finished.
/// * `notifications` - Notifications to post for the current room.
#[instrument(skip_all, fields(room_id = ?room_info.room_id))]
pub(crate) async fn handle_invited_state(
&self,
context: &mut Context,
room: &Room,
events: (Vec<Raw<AnyStrippedStateEvent>>, Vec<AnyStrippedStateEvent>),
push_rules: &Ruleset,
room_info: &mut RoomInfo,
notifications: &mut BTreeMap<OwnedRoomId, Vec<Notification>>,
) -> Result<()> {
let mut state_events = BTreeMap::new();
for (raw_event, event) in iter::zip(events.0, events.1) {
room_info.handle_stripped_state_event(&event);
state_events
.entry(event.event_type())
.or_insert_with(BTreeMap::new)
.insert(event.state_key().to_owned(), raw_event);
}
context
.state_changes
.stripped_state
.insert(room_info.room_id().to_owned(), state_events.clone());
// We need to check for notifications after we have handled all state
// events, to make sure we have the full push context.
if let Some(push_context) =
processors::timeline::get_push_room_context(context, room, room_info, &self.state_store)
.await?
{
// Check every event again for notification.
for event in state_events.values().flat_map(|map| map.values()) {
let actions = push_rules.get_actions(event, &push_context);
if actions.iter().any(Action::should_notify) {
notifications.entry(room.room_id().to_owned()).or_default().push(
Notification {
actions: actions.to_owned(),
event: RawAnySyncOrStrippedTimelineEvent::Stripped(event.clone()),
},
);
}
}
}
Ok(())
}
/// User has knocked on a room.
///
/// Update the internal and cached state accordingly. Return the final Room.
@@ -546,13 +486,12 @@ impl BaseClient {
return Ok(SyncResponse::default());
}
let now = Instant::now();
let now = if enabled!(Level::INFO) { Some(Instant::now()) } else { None };
#[cfg(feature = "e2e-encryption")]
let olm_machine = self.olm_machine().await;
let mut context =
Context::new(StateChanges::new(response.next_batch.clone()), Default::default());
let mut context = Context::new(StateChanges::new(response.next_batch.clone()));
#[cfg(feature = "e2e-encryption")]
let to_device = {
@@ -573,9 +512,11 @@ impl BaseClient {
.flatten()
.filter_map(|room_key_info| self.get_room(&room_key_info.room_id))
.collect(),
olm_machine.as_ref(),
self.decryption_trust_requirement,
self.handle_verification_events,
processors::e2ee::E2EE::new(
olm_machine.as_ref(),
self.decryption_trust_requirement,
self.handle_verification_events,
),
)
.await?;
@@ -598,81 +539,24 @@ impl BaseClient {
let mut updated_members_in_room: BTreeMap<OwnedRoomId, BTreeSet<OwnedUserId>> =
BTreeMap::new();
for (room_id, new_info) in response.rooms.join {
let room = self.state_store.get_or_create_room(
&room_id,
RoomState::Joined,
self.room_info_notable_update_sender.clone(),
);
let mut room_info = room.clone_info();
room_info.mark_as_joined();
room_info.update_from_ruma_summary(&new_info.summary);
room_info.set_prev_batch(new_info.timeline.prev_batch.as_deref());
room_info.mark_state_fully_synced();
room_info.handle_encryption_state(requested_required_states.for_room(&room_id));
let (raw_state_events, state_events) =
processors::state_events::sync::collect(&mut context, &new_info.state.events);
let mut new_user_ids = processors::state_events::dispatch_and_get_new_users(
for (room_id, joined_room) in response.rooms.join {
let joined_room_update = processors::room::sync_v2::update_joined_room(
&mut context,
(&raw_state_events, &state_events),
&mut room_info,
&mut ambiguity_cache,
)
.await?;
for raw in &new_info.ephemeral.events {
match raw.deserialize() {
Ok(AnySyncEphemeralRoomEvent::Receipt(event)) => {
context.state_changes.add_receipts(&room_id, event.content);
}
Ok(_) => {}
Err(e) => {
let event_id: Option<String> = raw.get_field("event_id").ok().flatten();
#[rustfmt::skip]
info!(
?room_id, event_id,
"Failed to deserialize ephemeral room event: {e}"
);
}
}
}
if new_info.timeline.limited {
room_info.mark_members_missing();
}
let (raw_state_events_from_timeline, state_events_from_timeline) =
processors::state_events::sync::collect_from_timeline(
&mut context,
&new_info.timeline.events,
);
let mut other_new_user_ids = processors::state_events::dispatch_and_get_new_users(
&mut context,
(&raw_state_events_from_timeline, &state_events_from_timeline),
&mut room_info,
&mut ambiguity_cache,
)
.await?;
new_user_ids.append(&mut other_new_user_ids);
updated_members_in_room.insert(room_id.to_owned(), new_user_ids.clone());
let timeline = processors::timeline::build(
&mut context,
&room,
&mut room_info,
processors::timeline::builder::Timeline::from(new_info.timeline),
processors::timeline::builder::Notification::new(
processors::room::RoomCreationData::new(
&room_id,
self.room_info_notable_update_sender.clone(),
requested_required_states,
&mut ambiguity_cache,
),
joined_room,
&mut updated_members_in_room,
processors::notification::Notification::new(
&push_rules,
&mut notifications,
&self.state_store,
),
#[cfg(feature = "e2e-encryption")]
processors::timeline::builder::E2EE::new(
processors::e2ee::E2EE::new(
olm_machine.as_ref(),
self.decryption_trust_requirement,
self.handle_verification_events,
@@ -680,106 +564,26 @@ impl BaseClient {
)
.await?;
// Save the new `RoomInfo`.
context.state_changes.add_room(room_info);
processors::account_data::for_room(
&mut context,
&room_id,
&new_info.account_data.events,
&self.state_store,
)
.await;
// `Self::handle_room_account_data` might have updated the `RoomInfo`. Let's
// fetch it again.
//
// SAFETY: `unwrap` is safe because the `RoomInfo` has been inserted 2 lines
// above.
let mut room_info = context.state_changes.room_infos.get(&room_id).unwrap().clone();
#[cfg(feature = "e2e-encryption")]
processors::e2ee::tracked_users::update_or_set_if_room_is_newly_encrypted(
&mut context,
olm_machine.as_ref(),
&new_user_ids,
room_info.encryption_state(),
room.encryption_state(),
&room_id,
&self.state_store,
)
.await?;
let notification_count = new_info.unread_notifications.into();
room_info.update_notification_count(notification_count);
let ambiguity_changes = ambiguity_cache.changes.remove(&room_id).unwrap_or_default();
new_rooms.join.insert(
room_id,
JoinedRoomUpdate::new(
timeline,
new_info.state.events,
new_info.account_data.events,
new_info.ephemeral.events,
notification_count,
ambiguity_changes,
),
);
context.state_changes.add_room(room_info);
new_rooms.joined.insert(room_id, joined_room_update);
}
for (room_id, new_info) in response.rooms.leave {
let room = self.state_store.get_or_create_room(
&room_id,
RoomState::Left,
self.room_info_notable_update_sender.clone(),
);
let mut room_info = room.clone_info();
room_info.mark_as_left();
room_info.mark_state_partially_synced();
room_info.handle_encryption_state(requested_required_states.for_room(&room_id));
let (raw_state_events, state_events) =
processors::state_events::sync::collect(&mut context, &new_info.state.events);
let mut new_user_ids = processors::state_events::dispatch_and_get_new_users(
for (room_id, left_room) in response.rooms.leave {
let left_room_update = processors::room::sync_v2::update_left_room(
&mut context,
(&raw_state_events, &state_events),
&mut room_info,
&mut ambiguity_cache,
)
.await?;
let (raw_state_events_from_timeline, state_events_from_timeline) =
processors::state_events::sync::collect_from_timeline(
&mut context,
&new_info.timeline.events,
);
let mut other_new_user_ids = processors::state_events::dispatch_and_get_new_users(
&mut context,
(&raw_state_events_from_timeline, &state_events_from_timeline),
&mut room_info,
&mut ambiguity_cache,
)
.await?;
new_user_ids.append(&mut other_new_user_ids);
let timeline = processors::timeline::build(
&mut context,
&room,
&mut room_info,
processors::timeline::builder::Timeline::from(new_info.timeline),
processors::timeline::builder::Notification::new(
processors::room::RoomCreationData::new(
&room_id,
self.room_info_notable_update_sender.clone(),
requested_required_states,
&mut ambiguity_cache,
),
left_room,
processors::notification::Notification::new(
&push_rules,
&mut notifications,
&self.state_store,
),
#[cfg(feature = "e2e-encryption")]
processors::timeline::builder::E2EE::new(
processors::e2ee::E2EE::new(
olm_machine.as_ref(),
self.decryption_trust_requirement,
self.handle_verification_events,
@@ -787,90 +591,41 @@ impl BaseClient {
)
.await?;
// Save the new `RoomInfo`.
context.state_changes.add_room(room_info);
new_rooms.left.insert(room_id, left_room_update);
}
processors::account_data::for_room(
for (room_id, invited_room) in response.rooms.invite {
let invited_room_update = processors::room::sync_v2::update_invited_room(
&mut context,
&room_id,
&new_info.account_data.events,
&self.state_store,
)
.await;
let ambiguity_changes = ambiguity_cache.changes.remove(&room_id).unwrap_or_default();
new_rooms.leave.insert(
room_id,
LeftRoomUpdate::new(
timeline,
new_info.state.events,
new_info.account_data.events,
ambiguity_changes,
invited_room,
self.room_info_notable_update_sender.clone(),
processors::notification::Notification::new(
&push_rules,
&mut notifications,
&self.state_store,
),
);
}
for (room_id, new_info) in response.rooms.invite {
let room = self.state_store.get_or_create_room(
&room_id,
RoomState::Invited,
self.room_info_notable_update_sender.clone(),
);
let invite_state = processors::state_events::stripped::collect(
&mut context,
&new_info.invite_state.events,
);
let mut room_info = room.clone_info();
room_info.mark_as_invited();
room_info.mark_state_fully_synced();
self.handle_invited_state(
&mut context,
&room,
invite_state,
&push_rules,
&mut room_info,
&mut notifications,
)
.await?;
context.state_changes.add_room(room_info);
new_rooms.invite.insert(room_id, new_info);
new_rooms.invited.insert(room_id, invited_room_update);
}
for (room_id, new_info) in response.rooms.knock {
let room = self.state_store.get_or_create_room(
for (room_id, knocked_room) in response.rooms.knock {
let knocked_room_update = processors::room::sync_v2::update_knocked_room(
&mut context,
&room_id,
RoomState::Knocked,
knocked_room,
self.room_info_notable_update_sender.clone(),
);
let knock_state = processors::state_events::stripped::collect(
&mut context,
&new_info.knock_state.events,
);
let mut room_info = room.clone_info();
room_info.mark_as_knocked();
room_info.mark_state_fully_synced();
self.handle_invited_state(
&mut context,
&room,
knock_state,
&push_rules,
&mut room_info,
&mut notifications,
processors::notification::Notification::new(
&push_rules,
&mut notifications,
&self.state_store,
),
)
.await?;
context.state_changes.add_room(room_info);
new_rooms.knocked.insert(room_id, new_info);
new_rooms.knocked.insert(room_id, knocked_room_update);
}
global_account_data_processor.apply(&mut context, &self.state_store).await;
@@ -913,7 +668,9 @@ impl BaseClient {
}
}
info!("Processed a sync response in {:?}", now.elapsed());
if enabled!(Level::INFO) {
info!("Processed a sync response in {:?}", now.map(|now| now.elapsed()));
}
let response = SyncResponse {
rooms: new_rooms,
@@ -958,7 +715,7 @@ impl BaseClient {
};
let mut chunk = Vec::with_capacity(response.chunk.len());
let mut context = Context::new(StateChanges::default(), Default::default());
let mut context = Context::default();
#[cfg(feature = "e2e-encryption")]
let mut user_ids = BTreeSet::new();

View File

@@ -263,6 +263,18 @@ pub enum RawAnySyncOrStrippedTimelineEvent {
Stripped(Raw<AnyStrippedStateEvent>),
}
impl From<Raw<AnySyncTimelineEvent>> for RawAnySyncOrStrippedTimelineEvent {
fn from(event: Raw<AnySyncTimelineEvent>) -> Self {
Self::Sync(event)
}
}
impl From<Raw<AnyStrippedStateEvent>> for RawAnySyncOrStrippedTimelineEvent {
fn from(event: Raw<AnyStrippedStateEvent>) -> Self {
Self::Stripped(event)
}
}
/// Wrapper around both versions of any raw state event.
#[derive(Clone, Debug, Serialize)]
#[serde(untagged)]

View File

@@ -13,12 +13,13 @@
// limitations under the License.
use matrix_sdk_common::deserialized_responses::TimelineEvent;
use matrix_sdk_crypto::{
DecryptionSettings, OlmMachine, RoomEventDecryptionResult, TrustRequirement,
};
use matrix_sdk_crypto::{DecryptionSettings, RoomEventDecryptionResult};
use ruma::{events::AnySyncTimelineEvent, serde::Raw, RoomId};
use super::super::{verification, Context};
use super::{
super::{verification, Context},
E2EE,
};
use crate::Result;
/// Attempt to decrypt the given raw event into a [`TimelineEvent`].
@@ -30,16 +31,14 @@ use crate::Result;
/// Returns `Ok(None)` if encryption is not configured.
pub async fn sync_timeline_event(
context: &mut Context,
olm_machine: Option<&OlmMachine>,
e2ee: E2EE<'_>,
event: &Raw<AnySyncTimelineEvent>,
room_id: &RoomId,
decryption_trust_requirement: TrustRequirement,
verification_is_allowed: bool,
) -> Result<Option<TimelineEvent>> {
let Some(olm) = olm_machine else { return Ok(None) };
let Some(olm) = e2ee.olm_machine else { return Ok(None) };
let decryption_settings =
DecryptionSettings { sender_device_trust_requirement: decryption_trust_requirement };
DecryptionSettings { sender_device_trust_requirement: e2ee.decryption_trust_requirement };
Ok(Some(
match olm.try_decrypt_room_event(event.cast_ref(), room_id, &decryption_settings).await? {
@@ -47,14 +46,8 @@ pub async fn sync_timeline_event(
let timeline_event = TimelineEvent::from(decrypted);
if let Ok(sync_timeline_event) = timeline_event.raw().deserialize() {
verification::process_if_relevant(
context,
&sync_timeline_event,
verification_is_allowed,
olm_machine,
room_id,
)
.await?;
verification::process_if_relevant(context, &sync_timeline_event, e2ee, room_id)
.await?;
}
timeline_event

View File

@@ -12,6 +12,26 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use matrix_sdk_crypto::{OlmMachine, TrustRequirement};
pub mod decrypt;
pub mod to_device;
pub mod tracked_users;
/// A classical set of data used by some processors in this module.
#[derive(Clone)]
pub struct E2EE<'a> {
pub olm_machine: Option<&'a OlmMachine>,
pub decryption_trust_requirement: TrustRequirement,
pub verification_is_allowed: bool,
}
impl<'a> E2EE<'a> {
pub fn new(
olm_machine: Option<&'a OlmMachine>,
decryption_trust_requirement: TrustRequirement,
verification_is_allowed: bool,
) -> Self {
Self { olm_machine, decryption_trust_requirement, verification_is_allowed }
}
}

View File

@@ -0,0 +1,50 @@
// Copyright 2025 The Matrix.org Foundation C.I.C.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use ruma::{events::AnySyncEphemeralRoomEvent, serde::Raw, RoomId};
use tracing::info;
use super::Context;
/// Dispatch [`AnySyncEphemeralRoomEvent`]s on the [`Context`].
pub fn dispatch(
context: &mut Context,
raw_events: &[Raw<AnySyncEphemeralRoomEvent>],
room_id: &RoomId,
) {
for raw_event in raw_events {
dispatch_receipt(context, raw_event, room_id);
}
}
/// Dispatch the [`AnySyncEphemeralRoomEvent::Receipt`] on the [`Context`].
pub(super) fn dispatch_receipt(
context: &mut Context,
raw_event: &Raw<AnySyncEphemeralRoomEvent>,
room_id: &RoomId,
) {
match raw_event.deserialize() {
Ok(AnySyncEphemeralRoomEvent::Receipt(event)) => {
context.state_changes.add_receipts(room_id, event.content);
}
Ok(_) => {}
Err(e) => {
let event_id = raw_event.get_field::<String>("event_id").ok().flatten();
info!(?room_id, event_id, "Failed to deserialize ephemeral room event: {e}");
}
}
}

View File

@@ -13,12 +13,10 @@
// limitations under the License.
use matrix_sdk_common::deserialized_responses::TimelineEvent;
use matrix_sdk_crypto::{
DecryptionSettings, OlmMachine, RoomEventDecryptionResult, TrustRequirement,
};
use matrix_sdk_crypto::{DecryptionSettings, RoomEventDecryptionResult};
use ruma::{events::AnySyncTimelineEvent, serde::Raw, RoomId};
use super::{verification, Context};
use super::{e2ee::E2EE, verification, Context};
use crate::{
latest_event::{is_suitable_for_latest_event, LatestEvent, PossibleLatestEvent},
Result, Room,
@@ -33,27 +31,19 @@ use crate::{
pub async fn decrypt_from_rooms(
context: &mut Context,
rooms: Vec<Room>,
olm_machine: Option<&OlmMachine>,
decryption_trust_requirement: TrustRequirement,
verification_is_allowed: bool,
e2ee: E2EE<'_>,
) -> Result<()> {
let Some(olm_machine) = olm_machine else {
// All functions used by this one expect an `OlmMachine`. Return if there is
// none.
if e2ee.olm_machine.is_none() {
return Ok(());
};
}
for room in rooms {
// Try to find a message we can decrypt and is suitable for using as the latest
// event. If we found one, set it as the latest and delete any older
// encrypted events
if let Some((found, found_index)) = find_suitable_and_decrypt(
context,
olm_machine,
&room,
&decryption_trust_requirement,
verification_is_allowed,
)
.await
{
if let Some((found, found_index)) = find_suitable_and_decrypt(context, &room, &e2ee).await {
room.on_latest_event_decrypted(
found,
found_index,
@@ -68,10 +58,8 @@ pub async fn decrypt_from_rooms(
async fn find_suitable_and_decrypt(
context: &mut Context,
olm_machine: &OlmMachine,
room: &Room,
decryption_trust_requirement: &TrustRequirement,
verification_is_allowed: bool,
e2ee: &E2EE<'_>,
) -> Option<(Box<LatestEvent>, usize)> {
let enc_events = room.latest_encrypted_events();
let power_levels = room.power_levels().await.ok();
@@ -82,14 +70,8 @@ async fn find_suitable_and_decrypt(
// Size of the `decrypt_sync_room_event` future should not impact this
// async fn since it is likely that there aren't even any encrypted
// events when calling it.
let decrypt_sync_room_event = Box::pin(decrypt_sync_room_event(
context,
olm_machine,
event,
room.room_id(),
decryption_trust_requirement,
verification_is_allowed,
));
let decrypt_sync_room_event =
Box::pin(decrypt_sync_room_event(context, event, e2ee, room.room_id()));
if let Ok(decrypted) = decrypt_sync_room_event.await {
// We found an event we can decrypt
@@ -119,19 +101,21 @@ async fn find_suitable_and_decrypt(
/// representing the decryption error; in the case of problems with our
/// application, returns `Err`.
///
/// Returns `Ok(None)` if encryption is not configured.
/// # Panics
///
/// Panics if there is no [`OlmMachine`] in [`E2EE`].
async fn decrypt_sync_room_event(
context: &mut Context,
olm_machine: &OlmMachine,
event: &Raw<AnySyncTimelineEvent>,
e2ee: &E2EE<'_>,
room_id: &RoomId,
decryption_trust_requirement: &TrustRequirement,
verification_is_allowed: bool,
) -> Result<TimelineEvent> {
let decryption_settings =
DecryptionSettings { sender_device_trust_requirement: *decryption_trust_requirement };
DecryptionSettings { sender_device_trust_requirement: e2ee.decryption_trust_requirement };
let event = match olm_machine
let event = match e2ee
.olm_machine
.expect("An `OlmMachine` is expected")
.try_decrypt_room_event(event.cast_ref(), room_id, &decryption_settings)
.await?
{
@@ -142,8 +126,7 @@ async fn decrypt_sync_room_event(
verification::process_if_relevant(
context,
&sync_timeline_event,
verification_is_allowed,
Some(olm_machine),
e2ee.clone(),
room_id,
)
.await?;
@@ -167,11 +150,8 @@ mod tests {
};
use ruma::{event_id, events::room::member::MembershipState, room_id, user_id};
use super::{decrypt_from_rooms, Context};
use crate::{
rooms::normal::RoomInfoNotableUpdateReasons, test_utils::logged_in_base_client,
StateChanges,
};
use super::{decrypt_from_rooms, Context, E2EE};
use crate::{rooms::normal::RoomInfoNotableUpdateReasons, test_utils::logged_in_base_client};
#[async_test]
async fn test_when_there_are_no_latest_encrypted_events_decrypting_them_does_nothing() {
@@ -203,14 +183,16 @@ mod tests {
assert!(room.latest_event().is_none());
// When I tell it to do some decryption
let mut context = Context::new(StateChanges::default(), Default::default());
let mut context = Context::default();
decrypt_from_rooms(
&mut context,
vec![room.clone()],
client.olm_machine().await.as_ref(),
client.decryption_trust_requirement,
client.handle_verification_events,
E2EE::new(
client.olm_machine().await.as_ref(),
client.decryption_trust_requirement,
client.handle_verification_events,
),
)
.await
.unwrap();

View File

@@ -16,9 +16,12 @@ pub mod account_data;
pub mod changes;
#[cfg(feature = "e2e-encryption")]
pub mod e2ee;
pub mod ephemeral_events;
#[cfg(feature = "e2e-encryption")]
pub mod latest_event;
pub mod notification;
pub mod profiles;
pub mod room;
pub mod state_events;
pub mod timeline;
#[cfg(feature = "e2e-encryption")]
@@ -33,17 +36,15 @@ use crate::{RoomInfoNotableUpdateReasons, StateChanges};
type RoomInfoNotableUpdates = BTreeMap<OwnedRoomId, RoomInfoNotableUpdateReasons>;
#[cfg_attr(test, derive(Clone))]
#[derive(Default)]
pub(crate) struct Context {
pub(super) state_changes: StateChanges,
pub(super) room_info_notable_updates: RoomInfoNotableUpdates,
}
impl Context {
pub fn new(
state_changes: StateChanges,
room_info_notable_updates: RoomInfoNotableUpdates,
) -> Self {
Self { state_changes, room_info_notable_updates }
pub fn new(state_changes: StateChanges) -> Self {
Self { state_changes, room_info_notable_updates: Default::default() }
}
pub fn into_parts(self) -> (StateChanges, RoomInfoNotableUpdates) {

View File

@@ -0,0 +1,81 @@
// Copyright 2025 The Matrix.org Foundation C.I.C.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::collections::BTreeMap;
use ruma::{
push::{Action, PushConditionRoomCtx, Ruleset},
serde::Raw,
OwnedRoomId, RoomId,
};
use crate::{
deserialized_responses::RawAnySyncOrStrippedTimelineEvent, store::BaseStateStore, sync,
};
/// A classical set of data used by some processors dealing with notifications
/// and push rules.
pub struct Notification<'a> {
pub push_rules: &'a Ruleset,
pub notifications: &'a mut BTreeMap<OwnedRoomId, Vec<sync::Notification>>,
pub state_store: &'a BaseStateStore,
}
impl<'a> Notification<'a> {
pub fn new(
push_rules: &'a Ruleset,
notifications: &'a mut BTreeMap<OwnedRoomId, Vec<sync::Notification>>,
state_store: &'a BaseStateStore,
) -> Self {
Self { push_rules, notifications, state_store }
}
fn push_notification(
&mut self,
room_id: &RoomId,
actions: Vec<Action>,
event: RawAnySyncOrStrippedTimelineEvent,
) {
self.notifications
.entry(room_id.to_owned())
.or_default()
.push(sync::Notification { actions, event });
}
/// Push a new [`sync::Notification`] in [`Self::notifications`] from
/// `event` if and only if `predicate` returns `true` for at least one of
/// the [`Action`]s associated to this event and this `push_context`
/// (based on `Self::push_rules`).
///
/// This method returns the fetched [`Action`]s.
pub fn push_notification_from_event_if<E, P>(
&mut self,
room_id: &RoomId,
push_context: &PushConditionRoomCtx,
event: &Raw<E>,
predicate: P,
) -> &[Action]
where
Raw<E>: Into<RawAnySyncOrStrippedTimelineEvent>,
P: Fn(&Action) -> bool,
{
let actions = self.push_rules.get_actions(event, push_context);
if actions.iter().any(predicate) {
self.push_notification(room_id, actions.to_owned(), event.clone().into());
}
actions
}
}

View File

@@ -0,0 +1,45 @@
// Copyright 2025 The Matrix.org Foundation C.I.C.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use ruma::RoomId;
use tokio::sync::broadcast::Sender;
use crate::{store::ambiguity_map::AmbiguityCache, RequestedRequiredStates, RoomInfoNotableUpdate};
pub mod msc4186;
pub mod sync_v2;
/// A classical set of data used by some processors in this module.
pub struct RoomCreationData<'a> {
room_id: &'a RoomId,
room_info_notable_update_sender: Sender<RoomInfoNotableUpdate>,
requested_required_states: &'a RequestedRequiredStates,
ambiguity_cache: &'a mut AmbiguityCache,
}
impl<'a> RoomCreationData<'a> {
pub fn new(
room_id: &'a RoomId,
room_info_notable_update_sender: Sender<RoomInfoNotableUpdate>,
requested_required_states: &'a RequestedRequiredStates,
ambiguity_cache: &'a mut AmbiguityCache,
) -> Self {
Self {
room_id,
room_info_notable_update_sender,
requested_required_states,
ambiguity_cache,
}
}
}

View File

@@ -0,0 +1,80 @@
// Copyright 2025 The Matrix.org Foundation C.I.C.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::collections::BTreeMap;
use ruma::{api::client::sync::sync_events::v5 as http, OwnedRoomId};
use super::super::super::{
account_data::for_room as account_data_for_room, ephemeral_events::dispatch_receipt, Context,
};
use crate::{
store::BaseStateStore,
sync::{JoinedRoomUpdate, RoomUpdates},
RoomState,
};
pub fn dispatch_ephemeral_events(
context: &mut Context,
receipts: &http::response::Receipts,
typing: &http::response::Typing,
joined_room_updates: &mut BTreeMap<OwnedRoomId, JoinedRoomUpdate>,
) {
for (room_id, raw) in &receipts.rooms {
dispatch_receipt(context, raw.cast_ref(), room_id);
joined_room_updates
.entry(room_id.to_owned())
.or_default()
.ephemeral
.push(raw.clone().cast());
}
for (room_id, raw) in &typing.rooms {
joined_room_updates
.entry(room_id.to_owned())
.or_default()
.ephemeral
.push(raw.clone().cast());
}
}
pub async fn room_account_data(
context: &mut Context,
account_data: &http::response::AccountData,
room_updates: &mut RoomUpdates,
state_store: &BaseStateStore,
) {
for (room_id, raw) in &account_data.rooms {
account_data_for_room(context, room_id, raw, state_store).await;
if let Some(room) = state_store.room(room_id) {
match room.state() {
RoomState::Joined => room_updates
.joined
.entry(room_id.to_owned())
.or_default()
.account_data
.append(&mut raw.to_vec()),
RoomState::Left | RoomState::Banned => room_updates
.left
.entry(room_id.to_owned())
.or_default()
.account_data
.append(&mut raw.to_vec()),
RoomState::Invited | RoomState::Knocked => {}
}
}
}
}

View File

@@ -0,0 +1,559 @@
// Copyright 2025 The Matrix.org Foundation C.I.C.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
pub mod extensions;
use std::collections::BTreeMap;
#[cfg(feature = "e2e-encryption")]
use matrix_sdk_common::deserialized_responses::TimelineEvent;
#[cfg(feature = "e2e-encryption")]
use ruma::events::StateEventType;
use ruma::{
api::client::sync::sync_events::{
v3::{InviteState, InvitedRoom, KnockState, KnockedRoom},
v5 as http,
},
assign,
events::{
room::member::{MembershipState, RoomMemberEventContent},
AnyRoomAccountDataEvent, AnyStrippedStateEvent, AnySyncStateEvent,
},
serde::Raw,
JsOption, OwnedRoomId, RoomId, UserId,
};
use tokio::sync::broadcast::Sender;
#[cfg(feature = "e2e-encryption")]
use super::super::e2ee;
use super::{
super::{notification, state_events, timeline, Context},
RoomCreationData,
};
#[cfg(feature = "e2e-encryption")]
use crate::StateChanges;
use crate::{
store::BaseStateStore,
sync::{InvitedRoomUpdate, JoinedRoomUpdate, KnockedRoomUpdate, LeftRoomUpdate},
Result, Room, RoomHero, RoomInfo, RoomInfoNotableUpdate, RoomInfoNotableUpdateReasons,
RoomState,
};
/// Represent any kind of room updates.
pub enum RoomUpdateKind {
Joined(JoinedRoomUpdate),
Left(LeftRoomUpdate),
Invited(InvitedRoomUpdate),
Knocked(KnockedRoomUpdate),
}
pub async fn update_any_room(
context: &mut Context,
user_id: &UserId,
room_creation_data: RoomCreationData<'_>,
room_response: &http::response::Room,
rooms_account_data: &BTreeMap<OwnedRoomId, Vec<Raw<AnyRoomAccountDataEvent>>>,
#[cfg(feature = "e2e-encryption")] e2ee: e2ee::E2EE<'_>,
notification: notification::Notification<'_>,
) -> Result<Option<(RoomInfo, RoomUpdateKind)>> {
let RoomCreationData {
room_id,
room_info_notable_update_sender,
requested_required_states,
ambiguity_cache,
} = room_creation_data;
// Read state events from the `required_state` field.
//
// Don't read state events from the `timeline` field, because they might be
// incomplete or staled already. We must only read state events from
// `required_state`.
let (raw_state_events, state_events) =
state_events::sync::collect(context, &room_response.required_state);
let state_store = notification.state_store;
// Find or create the room in the store
let is_new_room = !state_store.room_exists(room_id);
let invite_state_events = room_response
.invite_state
.as_ref()
.map(|events| state_events::stripped::collect(context, events));
#[allow(unused_mut)] // Required for some feature flag combinations
let (mut room, mut room_info, maybe_room_update_kind) = membership(
context,
&state_events,
&invite_state_events,
state_store,
user_id,
room_id,
room_info_notable_update_sender,
);
room_info.mark_state_partially_synced();
room_info.handle_encryption_state(requested_required_states.for_room(room_id));
#[cfg_attr(not(feature = "e2e-encryption"), allow(unused))]
let new_user_ids = state_events::sync::dispatch_and_get_new_users(
context,
(&raw_state_events, &state_events),
&mut room_info,
ambiguity_cache,
)
.await?;
// This will be used for both invited and knocked rooms.
if let Some((raw_events, events)) = invite_state_events {
state_events::stripped::dispatch_invite_or_knock(
context,
(&raw_events, &events),
&room,
&mut room_info,
notification::Notification::new(
notification.push_rules,
notification.notifications,
notification.state_store,
),
)
.await?;
}
properties(context, room_id, room_response, &mut room_info, is_new_room);
let timeline = timeline::build(
context,
&room,
&mut room_info,
timeline::builder::Timeline::from(room_response),
notification,
#[cfg(feature = "e2e-encryption")]
e2ee.clone(),
)
.await?;
// Cache the latest decrypted event in room_info, and also keep any later
// encrypted events, so we can slot them in when we get the keys.
#[cfg(feature = "e2e-encryption")]
cache_latest_events(
&room,
&mut room_info,
&timeline.events,
Some(&context.state_changes),
Some(state_store),
)
.await;
#[cfg(feature = "e2e-encryption")]
e2ee::tracked_users::update_or_set_if_room_is_newly_encrypted(
context,
e2ee.olm_machine,
&new_user_ids,
room_info.encryption_state(),
room.encryption_state(),
room_id,
state_store,
)
.await?;
let notification_count = room_response.unread_notifications.clone().into();
room_info.update_notification_count(notification_count);
let ambiguity_changes = ambiguity_cache.changes.remove(room_id).unwrap_or_default();
let room_account_data = rooms_account_data.get(room_id);
match (room_info.state(), maybe_room_update_kind) {
(RoomState::Joined, None) => {
// Ephemeral events are added separately, because we might not
// have a room subsection in the response, yet we may have receipts for
// that room.
let ephemeral = Vec::new();
Ok(Some((
room_info,
RoomUpdateKind::Joined(JoinedRoomUpdate::new(
timeline,
raw_state_events,
room_account_data.cloned().unwrap_or_default(),
ephemeral,
notification_count,
ambiguity_changes,
)),
)))
}
(RoomState::Left, None) | (RoomState::Banned, None) => Ok(Some((
room_info,
RoomUpdateKind::Left(LeftRoomUpdate::new(
timeline,
raw_state_events,
room_account_data.cloned().unwrap_or_default(),
ambiguity_changes,
)),
))),
(RoomState::Invited, Some(update @ RoomUpdateKind::Invited(_)))
| (RoomState::Knocked, Some(update @ RoomUpdateKind::Knocked(_))) => {
Ok(Some((room_info, update)))
}
_ => Ok(None),
}
}
/// Look through the sliding sync data for this room, find/create it in the
/// store, and process any invite information.
///
/// If there is any invite state events, the room can be considered an invited
/// or knocked room, depending of the membership event (if any).
fn membership(
context: &mut Context,
state_events: &[AnySyncStateEvent],
invite_state_events: &Option<(Vec<Raw<AnyStrippedStateEvent>>, Vec<AnyStrippedStateEvent>)>,
store: &BaseStateStore,
user_id: &UserId,
room_id: &RoomId,
room_info_notable_update_sender: Sender<RoomInfoNotableUpdate>,
) -> (Room, RoomInfo, Option<RoomUpdateKind>) {
// There are invite state events. It means the room can be:
//
// 1. either an invited room,
// 2. or a knocked room.
//
// Let's find out.
if let Some(state_events) = invite_state_events {
// We need to find the membership event since it could be for either an invited
// or knocked room.
let membership_event = state_events.1.iter().find_map(|event| {
if let AnyStrippedStateEvent::RoomMember(membership_event) = event {
if membership_event.state_key == user_id {
return Some(membership_event.content.clone());
}
}
None
});
match membership_event {
// There is a membership event indicating it's a knocked room.
Some(RoomMemberEventContent { membership: MembershipState::Knock, .. }) => {
let room = store.get_or_create_room(
room_id,
RoomState::Knocked,
room_info_notable_update_sender,
);
let mut room_info = room.clone_info();
// Override the room state if the room already exists.
room_info.mark_as_knocked();
let raw_events = state_events.0.clone();
let knock_state = assign!(KnockState::default(), { events: raw_events });
let knocked_room = assign!(KnockedRoom::default(), { knock_state: knock_state });
(room, room_info, Some(RoomUpdateKind::Knocked(knocked_room)))
}
// Otherwise, assume it's an invited room because there are invite state events.
_ => {
let room = store.get_or_create_room(
room_id,
RoomState::Invited,
room_info_notable_update_sender,
);
let mut room_info = room.clone_info();
// Override the room state if the room already exists.
room_info.mark_as_invited();
let raw_events = state_events.0.clone();
let invited_room = InvitedRoom::from(InviteState::from(raw_events));
(room, room_info, Some(RoomUpdateKind::Invited(invited_room)))
}
}
}
// No invite state events. We assume this is a joined room for the moment. See this block to
// learn more.
else {
let room =
store.get_or_create_room(room_id, RoomState::Joined, room_info_notable_update_sender);
let mut room_info = room.clone_info();
// We default to considering this room joined if it's not an invite. If it's
// actually left (and we remembered to request membership events in
// our sync request), then we can find this out from the events in
// required_state by calling handle_own_room_membership.
room_info.mark_as_joined();
// We don't need to do this in a v2 sync, because the membership of a room can
// be figured out by whether the room is in the "join", "leave" etc.
// property. In sliding sync we only have invite_state,
// required_state and timeline, so we must process required_state and timeline
// looking for relevant membership events.
own_membership(context, user_id, state_events, &mut room_info);
(room, room_info, None)
}
}
/// Find any `m.room.member` events that refer to the current user, and update
/// the state in room_info to reflect the "membership" property.
fn own_membership(
context: &mut Context,
user_id: &UserId,
state_events: &[AnySyncStateEvent],
room_info: &mut RoomInfo,
) {
// Start from the last event; the first membership event we see in that order is
// the last in the regular order, so that's the only one we need to
// consider.
for event in state_events.iter().rev() {
if let AnySyncStateEvent::RoomMember(member) = &event {
// If this event updates the current user's membership, record that in the
// room_info.
if member.state_key() == user_id.as_str() {
let new_state: RoomState = member.membership().into();
if new_state != room_info.state() {
room_info.set_state(new_state);
// Update an existing notable update entry or create a new one
context
.room_info_notable_updates
.entry(room_info.room_id.to_owned())
.or_default()
.insert(RoomInfoNotableUpdateReasons::MEMBERSHIP);
}
break;
}
}
}
}
fn properties(
context: &mut Context,
room_id: &RoomId,
room_response: &http::response::Room,
room_info: &mut RoomInfo,
is_new_room: bool,
) {
// Handle the room's avatar.
//
// It can be updated via the state events, or via the
// [`http::ResponseRoom::avatar`] field. This part of the code handles the
// latter case. The former case is handled by [`BaseClient::handle_state`].
match &room_response.avatar {
// A new avatar!
JsOption::Some(avatar_uri) => room_info.update_avatar(Some(avatar_uri.to_owned())),
// Avatar must be removed.
JsOption::Null => room_info.update_avatar(None),
// Nothing to do.
JsOption::Undefined => {}
}
// Sliding sync doesn't have a room summary, nevertheless it contains the joined
// and invited member counts, in addition to the heroes.
if let Some(count) = room_response.joined_count {
room_info.update_joined_member_count(count.into());
}
if let Some(count) = room_response.invited_count {
room_info.update_invited_member_count(count.into());
}
if let Some(heroes) = &room_response.heroes {
room_info.update_heroes(
heroes
.iter()
.map(|hero| RoomHero {
user_id: hero.user_id.clone(),
display_name: hero.name.clone(),
avatar_url: hero.avatar.clone(),
})
.collect(),
);
}
room_info.set_prev_batch(room_response.prev_batch.as_deref());
if room_response.limited {
room_info.mark_members_missing();
}
if let Some(recency_stamp) = &room_response.bump_stamp {
let recency_stamp: u64 = (*recency_stamp).into();
if room_info.recency_stamp.as_ref() != Some(&recency_stamp) {
room_info.update_recency_stamp(recency_stamp);
// If it's not a new room, let's emit a `RECENCY_STAMP` update.
// For a new room, the room will appear as new, so we don't care about this
// update.
if !is_new_room {
context
.room_info_notable_updates
.entry(room_id.to_owned())
.or_default()
.insert(RoomInfoNotableUpdateReasons::RECENCY_STAMP);
}
}
}
}
/// Find the most recent decrypted event and cache it in the supplied RoomInfo.
///
/// If any encrypted events are found after that one, store them in the RoomInfo
/// too so we can use them when we get the relevant keys.
///
/// It is the responsibility of the caller to update the `RoomInfo` instance
/// stored in the `Room`.
#[cfg(feature = "e2e-encryption")]
pub(crate) async fn cache_latest_events(
room: &Room,
room_info: &mut RoomInfo,
events: &[TimelineEvent],
changes: Option<&StateChanges>,
store: Option<&BaseStateStore>,
) {
use tracing::warn;
use crate::{
deserialized_responses::DisplayName,
latest_event::{is_suitable_for_latest_event, LatestEvent, PossibleLatestEvent},
store::ambiguity_map::is_display_name_ambiguous,
};
let mut encrypted_events =
Vec::with_capacity(room.latest_encrypted_events.read().unwrap().capacity());
// Try to get room power levels from the current changes
let power_levels_from_changes = || {
let state_changes = changes?.state.get(room_info.room_id())?;
let room_power_levels_state =
state_changes.get(&StateEventType::RoomPowerLevels)?.values().next()?;
match room_power_levels_state.deserialize().ok()? {
AnySyncStateEvent::RoomPowerLevels(ev) => Some(ev.power_levels()),
_ => None,
}
};
// If we didn't get any info, try getting it from local data
let power_levels = match power_levels_from_changes() {
Some(power_levels) => Some(power_levels),
None => room.power_levels().await.ok(),
};
let power_levels_info = Some(room.own_user_id()).zip(power_levels.as_ref());
for event in events.iter().rev() {
if let Ok(timeline_event) = event.raw().deserialize() {
match is_suitable_for_latest_event(&timeline_event, power_levels_info) {
PossibleLatestEvent::YesRoomMessage(_)
| PossibleLatestEvent::YesPoll(_)
| PossibleLatestEvent::YesCallInvite(_)
| PossibleLatestEvent::YesCallNotify(_)
| PossibleLatestEvent::YesSticker(_)
| PossibleLatestEvent::YesKnockedStateEvent(_) => {
// We found a suitable latest event. Store it.
// In order to make the latest event fast to read, we want to keep the
// associated sender in cache. This is a best-effort to gather enough
// information for creating a user profile as fast as possible. If information
// are missing, let's go back on the “slow” path.
let mut sender_profile = None;
let mut sender_name_is_ambiguous = None;
// First off, look up the sender's profile from the `StateChanges`, they are
// likely to be the most recent information.
if let Some(changes) = changes {
sender_profile = changes
.profiles
.get(room.room_id())
.and_then(|profiles_by_user| {
profiles_by_user.get(timeline_event.sender())
})
.cloned();
if let Some(sender_profile) = sender_profile.as_ref() {
sender_name_is_ambiguous = sender_profile
.as_original()
.and_then(|profile| profile.content.displayname.as_ref())
.and_then(|display_name| {
let display_name = DisplayName::new(display_name);
changes.ambiguity_maps.get(room.room_id()).and_then(
|map_for_room| {
map_for_room.get(&display_name).map(|users| {
is_display_name_ambiguous(&display_name, users)
})
},
)
});
}
}
// Otherwise, look up the sender's profile from the `Store`.
if sender_profile.is_none() {
if let Some(store) = store {
sender_profile = store
.get_profile(room.room_id(), timeline_event.sender())
.await
.ok()
.flatten();
// TODO: need to update `sender_name_is_ambiguous`,
// but how?
}
}
let latest_event = Box::new(LatestEvent::new_with_sender_details(
event.clone(),
sender_profile,
sender_name_is_ambiguous,
));
// Store it in the return RoomInfo (it will be saved for us in the room later).
room_info.latest_event = Some(latest_event);
// We don't need any of the older encrypted events because we have a new
// decrypted one.
room.latest_encrypted_events.write().unwrap().clear();
// We can stop looking through the timeline now because everything else is
// older.
break;
}
PossibleLatestEvent::NoEncrypted => {
// m.room.encrypted - this might be the latest event later - we can't tell until
// we are able to decrypt it, so store it for now
//
// Check how many encrypted events we have seen. Only store another if we
// haven't already stored the maximum number.
if encrypted_events.len() < encrypted_events.capacity() {
encrypted_events.push(event.raw().clone());
}
}
_ => {
// Ignore unsuitable events
}
}
} else {
warn!(
"Failed to deserialize event as AnySyncTimelineEvent. ID={}",
event.event_id().expect("Event has no ID!")
);
}
}
// Push the encrypted events we found into the Room, in reverse order, so
// the latest is last
room.latest_encrypted_events.write().unwrap().extend(encrypted_events.into_iter().rev());
}

View File

@@ -0,0 +1,298 @@
// Copyright 2025 The Matrix.org Foundation C.I.C.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::collections::{BTreeMap, BTreeSet};
use ruma::{
api::client::sync::sync_events::v3::{InvitedRoom, JoinedRoom, KnockedRoom, LeftRoom},
OwnedRoomId, OwnedUserId, RoomId,
};
use tokio::sync::broadcast::Sender;
#[cfg(feature = "e2e-encryption")]
use super::super::e2ee;
use super::{
super::{account_data, ephemeral_events, notification, state_events, timeline, Context},
RoomCreationData,
};
use crate::{
sync::{InvitedRoomUpdate, JoinedRoomUpdate, KnockedRoomUpdate, LeftRoomUpdate},
Result, RoomInfoNotableUpdate, RoomState,
};
/// Process updates of a joined room.
#[allow(clippy::too_many_arguments)]
pub async fn update_joined_room(
context: &mut Context,
room_creation_data: RoomCreationData<'_>,
joined_room: JoinedRoom,
updated_members_in_room: &mut BTreeMap<OwnedRoomId, BTreeSet<OwnedUserId>>,
notification: notification::Notification<'_>,
#[cfg(feature = "e2e-encryption")] e2ee: e2ee::E2EE<'_>,
) -> Result<JoinedRoomUpdate> {
let RoomCreationData {
room_id,
room_info_notable_update_sender,
requested_required_states,
ambiguity_cache,
} = room_creation_data;
let state_store = notification.state_store;
let room =
state_store.get_or_create_room(room_id, RoomState::Joined, room_info_notable_update_sender);
let mut room_info = room.clone_info();
room_info.mark_as_joined();
room_info.update_from_ruma_summary(&joined_room.summary);
room_info.set_prev_batch(joined_room.timeline.prev_batch.as_deref());
room_info.mark_state_fully_synced();
room_info.handle_encryption_state(requested_required_states.for_room(room_id));
let (raw_state_events, state_events) =
state_events::sync::collect(context, &joined_room.state.events);
let mut new_user_ids = state_events::sync::dispatch_and_get_new_users(
context,
(&raw_state_events, &state_events),
&mut room_info,
ambiguity_cache,
)
.await?;
ephemeral_events::dispatch(context, &joined_room.ephemeral.events, room_id);
if joined_room.timeline.limited {
room_info.mark_members_missing();
}
let (raw_state_events_from_timeline, state_events_from_timeline) =
state_events::sync::collect_from_timeline(context, &joined_room.timeline.events);
let mut other_new_user_ids = state_events::sync::dispatch_and_get_new_users(
context,
(&raw_state_events_from_timeline, &state_events_from_timeline),
&mut room_info,
ambiguity_cache,
)
.await?;
new_user_ids.append(&mut other_new_user_ids);
updated_members_in_room.insert(room_id.to_owned(), new_user_ids.clone());
#[cfg(feature = "e2e-encryption")]
let olm_machine = e2ee.olm_machine;
let timeline = timeline::build(
context,
&room,
&mut room_info,
timeline::builder::Timeline::from(joined_room.timeline),
notification,
#[cfg(feature = "e2e-encryption")]
e2ee,
)
.await?;
// Save the new `RoomInfo`.
context.state_changes.add_room(room_info);
account_data::for_room(context, room_id, &joined_room.account_data.events, state_store).await;
// `processors::account_data::from_room` might have updated the `RoomInfo`.
// Let's fetch it again.
//
// SAFETY: `expect` is safe because the `RoomInfo` has been inserted 2 lines
// above.
let mut room_info = context
.state_changes
.room_infos
.get(room_id)
.expect("`RoomInfo` must exist in `StateChanges` at this point")
.clone();
#[cfg(feature = "e2e-encryption")]
e2ee::tracked_users::update_or_set_if_room_is_newly_encrypted(
context,
olm_machine,
&new_user_ids,
room_info.encryption_state(),
room.encryption_state(),
room_id,
state_store,
)
.await?;
let notification_count = joined_room.unread_notifications.into();
room_info.update_notification_count(notification_count);
context.state_changes.add_room(room_info);
Ok(JoinedRoomUpdate::new(
timeline,
joined_room.state.events,
joined_room.account_data.events,
joined_room.ephemeral.events,
notification_count,
ambiguity_cache.changes.remove(room_id).unwrap_or_default(),
))
}
/// Process historical updates of a left room.
#[allow(clippy::too_many_arguments)]
pub async fn update_left_room(
context: &mut Context,
room_creation_data: RoomCreationData<'_>,
left_room: LeftRoom,
notification: notification::Notification<'_>,
#[cfg(feature = "e2e-encryption")] e2ee: e2ee::E2EE<'_>,
) -> Result<LeftRoomUpdate> {
let RoomCreationData {
room_id,
room_info_notable_update_sender,
requested_required_states,
ambiguity_cache,
} = room_creation_data;
let state_store = notification.state_store;
let room =
state_store.get_or_create_room(room_id, RoomState::Left, room_info_notable_update_sender);
let mut room_info = room.clone_info();
room_info.mark_as_left();
room_info.mark_state_partially_synced();
room_info.handle_encryption_state(requested_required_states.for_room(room_id));
let (raw_state_events, state_events) =
state_events::sync::collect(context, &left_room.state.events);
let _ = state_events::sync::dispatch_and_get_new_users(
context,
(&raw_state_events, &state_events),
&mut room_info,
ambiguity_cache,
)
.await?;
let (raw_state_events_from_timeline, state_events_from_timeline) =
state_events::sync::collect_from_timeline(context, &left_room.timeline.events);
let _ = state_events::sync::dispatch_and_get_new_users(
context,
(&raw_state_events_from_timeline, &state_events_from_timeline),
&mut room_info,
ambiguity_cache,
)
.await?;
let timeline = timeline::build(
context,
&room,
&mut room_info,
timeline::builder::Timeline::from(left_room.timeline),
notification,
#[cfg(feature = "e2e-encryption")]
e2ee,
)
.await?;
// Save the new `RoomInfo`.
context.state_changes.add_room(room_info);
account_data::for_room(context, room_id, &left_room.account_data.events, state_store).await;
let ambiguity_changes = ambiguity_cache.changes.remove(room_id).unwrap_or_default();
Ok(LeftRoomUpdate::new(
timeline,
left_room.state.events,
left_room.account_data.events,
ambiguity_changes,
))
}
/// Process updates of an invited room.
pub async fn update_invited_room(
context: &mut Context,
room_id: &RoomId,
invited_room: InvitedRoom,
room_info_notable_update_sender: Sender<RoomInfoNotableUpdate>,
notification: notification::Notification<'_>,
) -> Result<InvitedRoomUpdate> {
let state_store = notification.state_store;
let room = state_store.get_or_create_room(
room_id,
RoomState::Invited,
room_info_notable_update_sender,
);
let (raw_events, events) =
state_events::stripped::collect(context, &invited_room.invite_state.events);
let mut room_info = room.clone_info();
room_info.mark_as_invited();
room_info.mark_state_fully_synced();
state_events::stripped::dispatch_invite_or_knock(
context,
(&raw_events, &events),
&room,
&mut room_info,
notification,
)
.await?;
context.state_changes.add_room(room_info);
Ok(invited_room)
}
/// Process updates of a knocked room.
pub async fn update_knocked_room(
context: &mut Context,
room_id: &RoomId,
knocked_room: KnockedRoom,
room_info_notable_update_sender: Sender<RoomInfoNotableUpdate>,
notification: notification::Notification<'_>,
) -> Result<KnockedRoomUpdate> {
let state_store = notification.state_store;
let room = state_store.get_or_create_room(
room_id,
RoomState::Knocked,
room_info_notable_update_sender,
);
let (raw_events, events) =
state_events::stripped::collect(context, &knocked_room.knock_state.events);
let mut room_info = room.clone_info();
room_info.mark_as_knocked();
room_info.mark_state_fully_synced();
state_events::stripped::dispatch_invite_or_knock(
context,
(&raw_events, &events),
&room,
&mut room_info,
notification,
)
.await?;
context.state_changes.add_room(room_info);
Ok(knocked_room)
}

View File

@@ -12,30 +12,30 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use std::{
collections::{BTreeMap, BTreeSet},
iter,
};
use ruma::{
events::{room::member::MembershipState, AnySyncStateEvent},
serde::Raw,
OwnedUserId,
};
use ruma::{events::AnySyncStateEvent, serde::Raw};
use serde::Deserialize;
use tracing::{instrument, warn};
use tracing::warn;
use super::{profiles, Context};
use crate::{
store::{ambiguity_map::AmbiguityCache, Result as StoreResult},
RoomInfo,
};
use super::Context;
/// Collect [`AnySyncStateEvent`].
pub mod sync {
use ruma::events::AnySyncTimelineEvent;
use std::{
collections::{BTreeMap, BTreeSet},
iter,
};
use super::{AnySyncStateEvent, Context, Raw};
use ruma::{
events::{room::member::MembershipState, AnySyncTimelineEvent},
OwnedUserId,
};
use tracing::instrument;
use super::{super::profiles, AnySyncStateEvent, Context, Raw};
use crate::{
store::{ambiguity_map::AmbiguityCache, Result as StoreResult},
RoomInfo,
};
/// Collect [`AnySyncStateEvent`] to [`AnySyncStateEvent`].
pub fn collect(
@@ -61,13 +61,70 @@ pub mod sync {
}
}))
}
/// Dispatch the sync state events and return the new users for this room.
///
/// `raw_events` and `events` must be generated from [`collect`].
/// Events must be exactly the same list of events that are in
/// `raw_events`, but deserialised. We demand them here to avoid
/// deserialising multiple times.
#[instrument(skip_all, fields(room_id = ?room_info.room_id))]
pub async fn dispatch_and_get_new_users(
context: &mut Context,
(raw_events, events): (&[Raw<AnySyncStateEvent>], &[AnySyncStateEvent]),
room_info: &mut RoomInfo,
ambiguity_cache: &mut AmbiguityCache,
) -> StoreResult<BTreeSet<OwnedUserId>> {
let mut user_ids = BTreeSet::new();
if raw_events.is_empty() {
return Ok(user_ids);
}
let mut state_events = BTreeMap::new();
for (raw_event, event) in iter::zip(raw_events, events) {
room_info.handle_state_event(event);
if let AnySyncStateEvent::RoomMember(member) = event {
ambiguity_cache
.handle_event(&context.state_changes, &room_info.room_id, member)
.await?;
match member.membership() {
MembershipState::Join | MembershipState::Invite => {
user_ids.insert(member.state_key().to_owned());
}
_ => (),
}
profiles::upsert_or_delete(context, &room_info.room_id, member);
}
state_events
.entry(event.event_type())
.or_insert_with(BTreeMap::new)
.insert(event.state_key().to_owned(), raw_event.clone());
}
context.state_changes.state.insert(room_info.room_id.clone(), state_events);
Ok(user_ids)
}
}
/// Collect [`AnyStrippedStateEvent`].
pub mod stripped {
use ruma::events::AnyStrippedStateEvent;
use std::{collections::BTreeMap, iter};
use super::{Context, Raw};
use ruma::{events::AnyStrippedStateEvent, push::Action};
use tracing::instrument;
use super::{
super::{notification, timeline},
Context, Raw,
};
use crate::{Result, Room, RoomInfo};
/// Collect [`AnyStrippedStateEvent`] to [`AnyStrippedStateEvent`].
pub fn collect(
@@ -76,6 +133,71 @@ pub mod stripped {
) -> (Vec<Raw<AnyStrippedStateEvent>>, Vec<AnyStrippedStateEvent>) {
super::collect(raw_events)
}
/// Dispatch the stripped state events.
///
/// `raw_events` and `events` must be generated from [`collect`].
/// Events must be exactly the same list of events that are in
/// `raw_events`, but deserialised. We demand them here to avoid
/// deserialising multiple times.
///
/// Dispatch the stripped state events in `invite_state` or `knock_state`,
/// modifying the room's info and posting notifications as needed.
///
/// * `raw_events` and `events` - The contents of `invite_state` in the form
/// of list of pairs of raw stripped state events with their deserialized
/// counterpart.
/// * `room` - The [`Room`] to modify.
/// * `room_info` - The current room's info.
/// * `push_rules` - The push rules for this room.
/// * `changes` - The accumulated list of changes to apply once the
/// processing is finished.
/// * `notifications` - Notifications to post for the current room.
/// * `state_store` — The state store.
#[instrument(skip_all, fields(room_id = ?room_info.room_id))]
pub(crate) async fn dispatch_invite_or_knock(
context: &mut Context,
(raw_events, events): (&[Raw<AnyStrippedStateEvent>], &[AnyStrippedStateEvent]),
room: &Room,
room_info: &mut RoomInfo,
mut notification: notification::Notification<'_>,
) -> Result<()> {
let mut state_events = BTreeMap::new();
for (raw_event, event) in iter::zip(raw_events, events) {
room_info.handle_stripped_state_event(event);
state_events
.entry(event.event_type())
.or_insert_with(BTreeMap::new)
.insert(event.state_key().to_owned(), raw_event.clone());
}
context
.state_changes
.stripped_state
.insert(room_info.room_id().to_owned(), state_events.clone());
// We need to check for notifications after we have handled all state
// events, to make sure we have the full push context.
if let Some(push_context) =
timeline::get_push_room_context(context, room, room_info, notification.state_store)
.await?
{
let room_id = room.room_id();
// Check every event again for notification.
for event in state_events.values().flat_map(|map| map.values()) {
notification.push_notification_from_event_if(
room_id,
&push_context,
event,
Action::should_notify,
);
}
}
Ok(())
}
}
fn collect<'a, I, T>(raw_events: I) -> (Vec<Raw<T>>, Vec<T>)
@@ -94,52 +216,3 @@ where
})
.unzip()
}
/// Dispatch the state events and return the new users for this room.
///
/// `raw_events` and `events` must be generated from [`collect_sync`]. Events
/// must be exactly the same list of events that are in raw_events, but
/// deserialised. We demand them here to avoid deserialising multiple times.
#[instrument(skip_all, fields(room_id = ?room_info.room_id))]
pub async fn dispatch_and_get_new_users(
context: &mut Context,
(raw_events, events): (&[Raw<AnySyncStateEvent>], &[AnySyncStateEvent]),
room_info: &mut RoomInfo,
ambiguity_cache: &mut AmbiguityCache,
) -> StoreResult<BTreeSet<OwnedUserId>> {
let mut user_ids = BTreeSet::new();
if raw_events.is_empty() {
return Ok(user_ids);
}
let mut state_events = BTreeMap::new();
for (raw_event, event) in iter::zip(raw_events, events) {
room_info.handle_state_event(event);
if let AnySyncStateEvent::RoomMember(member) = event {
ambiguity_cache
.handle_event(&context.state_changes, &room_info.room_id, member)
.await?;
match member.membership() {
MembershipState::Join | MembershipState::Invite => {
user_ids.insert(member.state_key().to_owned());
}
_ => (),
}
profiles::upsert_or_delete(context, &room_info.room_id, member);
}
state_events
.entry(event.event_type())
.or_insert_with(BTreeMap::new)
.insert(event.state_key().to_owned(), raw_event.clone());
}
context.state_changes.state.insert(room_info.room_id.clone(), state_events);
Ok(user_ids)
}

View File

@@ -28,13 +28,12 @@ use ruma::{
};
use tracing::{instrument, trace, warn};
use super::Context;
#[cfg(feature = "e2e-encryption")]
use super::{e2ee, verification};
use super::{notification, Context};
use crate::{
deserialized_responses::RawAnySyncOrStrippedTimelineEvent,
store::{BaseStateStore, StateStoreExt as _},
sync::{Notification, Timeline},
sync::Timeline,
Result, Room, RoomInfo,
};
@@ -51,12 +50,12 @@ pub async fn build<'notification, 'e2ee>(
room: &Room,
room_info: &mut RoomInfo,
timeline_inputs: builder::Timeline,
notification_inputs: builder::Notification<'notification>,
#[cfg(feature = "e2e-encryption")] e2ee: builder::E2EE<'e2ee>,
mut notification: notification::Notification<'notification>,
#[cfg(feature = "e2e-encryption")] e2ee: e2ee::E2EE<'e2ee>,
) -> Result<Timeline> {
let mut timeline = Timeline::new(timeline_inputs.limited, timeline_inputs.prev_batch);
let mut push_context =
get_push_room_context(context, room, room_info, notification_inputs.state_store).await?;
get_push_room_context(context, room, room_info, notification.state_store).await?;
let room_id = room.room_id();
for raw_event in timeline_inputs.raw_events {
@@ -101,11 +100,9 @@ pub async fn build<'notification, 'e2ee>(
if let Some(decrypted_timeline_event) =
Box::pin(e2ee::decrypt::sync_timeline_event(
context,
e2ee.olm_machine,
e2ee.clone(),
timeline_event.raw(),
room_id,
e2ee.decryption_trust_requirement,
e2ee.verification_is_allowed,
))
.await?
{
@@ -117,8 +114,7 @@ pub async fn build<'notification, 'e2ee>(
Box::pin(verification::process_if_relevant(
context,
&sync_timeline_event,
e2ee.verification_is_allowed,
e2ee.olm_machine,
e2ee.clone(),
room_id,
))
.await?;
@@ -134,31 +130,18 @@ pub async fn build<'notification, 'e2ee>(
if let Some(push_context) = &mut push_context {
update_push_room_context(context, push_context, room.own_user_id(), room_info)
} else {
push_context = get_push_room_context(
context,
room,
room_info,
notification_inputs.state_store,
)
.await?;
push_context =
get_push_room_context(context, room, room_info, notification.state_store)
.await?;
}
if let Some(context) = &push_context {
let actions =
notification_inputs.push_rules.get_actions(timeline_event.raw(), context);
if actions.iter().any(Action::should_notify) {
notification_inputs
.notifications
.entry(room_id.to_owned())
.or_default()
.push(Notification {
actions: actions.to_owned(),
event: RawAnySyncOrStrippedTimelineEvent::Sync(
timeline_event.raw().clone(),
),
});
}
if let Some(push_context) = &push_context {
let actions = notification.push_notification_from_event_if(
room_id,
push_context,
timeline_event.raw(),
Action::should_notify,
);
timeline_event.push_actions = Some(actions.to_owned());
}
@@ -178,20 +161,12 @@ pub async fn build<'notification, 'e2ee>(
/// Set of types used by [`build`] to reduce the number of arguments by grouping
/// them by thematics.
pub mod builder {
use std::collections::BTreeMap;
#[cfg(feature = "e2e-encryption")]
use matrix_sdk_crypto::{OlmMachine, TrustRequirement};
use ruma::{
api::client::sync::sync_events::{v3, v5},
events::AnySyncTimelineEvent,
push::Ruleset,
serde::Raw,
OwnedRoomId,
};
use crate::{store::BaseStateStore, sync};
pub struct Timeline {
pub limited: bool,
pub raw_events: Vec<Raw<AnySyncTimelineEvent>>,
@@ -213,40 +188,6 @@ pub mod builder {
}
}
}
pub struct Notification<'a> {
pub push_rules: &'a Ruleset,
pub notifications: &'a mut BTreeMap<OwnedRoomId, Vec<sync::Notification>>,
pub state_store: &'a BaseStateStore,
}
impl<'a> Notification<'a> {
pub fn new(
push_rules: &'a Ruleset,
notifications: &'a mut BTreeMap<OwnedRoomId, Vec<sync::Notification>>,
state_store: &'a BaseStateStore,
) -> Self {
Self { push_rules, notifications, state_store }
}
}
#[cfg(feature = "e2e-encryption")]
pub struct E2EE<'a> {
pub olm_machine: Option<&'a OlmMachine>,
pub decryption_trust_requirement: TrustRequirement,
pub verification_is_allowed: bool,
}
#[cfg(feature = "e2e-encryption")]
impl<'a> E2EE<'a> {
pub fn new(
olm_machine: Option<&'a OlmMachine>,
decryption_trust_requirement: TrustRequirement,
verification_is_allowed: bool,
) -> Self {
Self { olm_machine, decryption_trust_requirement, verification_is_allowed }
}
}
}
/// Update the push context for the given room.

View File

@@ -21,7 +21,7 @@ use ruma::{
RoomId,
};
use super::Context;
use super::{e2ee::E2EE, Context};
use crate::Result;
/// Process the given event as a verification event if it is a candidate. The
@@ -29,8 +29,7 @@ use crate::Result;
pub async fn process_if_relevant(
context: &mut Context,
event: &AnySyncTimelineEvent,
verification_is_allowed: bool,
olm_machine: Option<&OlmMachine>,
e2ee: E2EE<'_>,
room_id: &RoomId,
) -> Result<()> {
if let AnySyncTimelineEvent::MessageLike(event) = event {
@@ -58,7 +57,8 @@ pub async fn process_if_relevant(
_ => false,
} {
verification(context, verification_is_allowed, olm_machine, event, room_id).await?;
verification(context, e2ee.verification_is_allowed, e2ee.olm_machine, event, room_id)
.await?;
}
}

View File

@@ -2579,7 +2579,7 @@ mod tests {
.cast();
// When the new tag is handled and applied.
let mut context = processors::Context::new(StateChanges::default(), Default::default());
let mut context = processors::Context::default();
processors::account_data::for_room(&mut context, room_id, &[tag_raw], &client.state_store)
.await;
@@ -2675,7 +2675,7 @@ mod tests {
.cast();
// When the new tag is handled and applied.
let mut context = processors::Context::new(StateChanges::default(), Default::default());
let mut context = processors::Context::default();
processors::account_data::for_room(&mut context, room_id, &[tag_raw], &client.state_store)
.await;
@@ -3261,7 +3261,7 @@ mod tests {
// And I provide a decrypted event to replace the encrypted one,
let event = make_latest_event("$A");
let mut context = processors::Context::new(StateChanges::default(), Default::default());
let mut context = processors::Context::default();
room.on_latest_event_decrypted(
event.clone(),
0,

View File

@@ -14,45 +14,23 @@
//! Extend `BaseClient` with capabilities to handle MSC4186.
use std::collections::BTreeMap;
use ruma::api::client::sync::sync_events::v5 as http;
#[cfg(feature = "e2e-encryption")]
use matrix_sdk_common::deserialized_responses::TimelineEvent;
#[cfg(feature = "e2e-encryption")]
use ruma::events::AnyToDeviceEvent;
use ruma::{
api::client::sync::sync_events::{
v3::{self, InvitedRoom, KnockedRoom},
v5 as http,
},
events::{
room::member::MembershipState, AnyRoomAccountDataEvent, AnyStrippedStateEvent,
AnySyncStateEvent, StateEventType,
},
serde::Raw,
JsOption, OwnedRoomId, RoomId, UserId,
};
use tracing::{instrument, trace, warn};
use ruma::{events::AnyToDeviceEvent, serde::Raw};
use tracing::{instrument, trace};
use super::BaseClient;
#[cfg(feature = "e2e-encryption")]
use crate::latest_event::{is_suitable_for_latest_event, LatestEvent, PossibleLatestEvent};
use crate::{
error::Result,
read_receipts::{compute_unread_counts, PreviousEventsProvider},
response_processors as processors,
rooms::{
normal::{RoomHero, RoomInfoNotableUpdateReasons},
RoomState,
},
ruma::assign,
store::{ambiguity_map::AmbiguityCache, BaseStateStore, StateChanges},
sync::{JoinedRoomUpdate, LeftRoomUpdate, Notification, RoomUpdates, SyncResponse},
RequestedRequiredStates, Room, RoomInfo,
rooms::normal::RoomInfoNotableUpdateReasons,
store::ambiguity_map::AmbiguityCache,
sync::{RoomUpdates, SyncResponse},
RequestedRequiredStates,
};
impl BaseClient {
#[cfg(feature = "e2e-encryption")]
/// Processes the E2EE-related events from the Sliding Sync response.
///
/// In addition to writes to the crypto store, this may also write into the
@@ -60,6 +38,7 @@ impl BaseClient {
/// store.
///
/// Returns whether any change happened.
#[cfg(feature = "e2e-encryption")]
pub async fn process_sliding_sync_e2ee(
&self,
to_device: Option<&http::response::ToDevice>,
@@ -80,7 +59,7 @@ impl BaseClient {
let olm_machine = self.olm_machine().await;
let mut context = processors::Context::new(StateChanges::default(), Default::default());
let mut context = processors::Context::default();
let processors::e2ee::to_device::Output { decrypted_to_device_events, room_key_updates } =
processors::e2ee::to_device::from_msc4186(
@@ -98,9 +77,11 @@ impl BaseClient {
.flatten()
.filter_map(|room_key_info| self.get_room(&room_key_info.room_id))
.collect(),
olm_machine.as_ref(),
self.decryption_trust_requirement,
self.handle_verification_events,
processors::e2ee::E2EE::new(
olm_machine.as_ref(),
self.decryption_trust_requirement,
self.handle_verification_events,
),
)
.await?;
@@ -130,17 +111,7 @@ impl BaseClient {
previous_events_provider: &PEP,
requested_required_states: &RequestedRequiredStates,
) -> Result<SyncResponse> {
let http::Response {
// FIXME not yet supported by sliding sync. see
// https://github.com/matrix-org/matrix-rust-sdk/issues/1014
// next_batch,
rooms,
lists,
extensions,
// FIXME: missing compared to v3::Response
//presence,
..
} = response;
let http::Response { rooms, lists, extensions, .. } = response;
trace!(
rooms = rooms.len(),
@@ -155,17 +126,17 @@ impl BaseClient {
return Ok(SyncResponse::default());
};
let mut context = processors::Context::new(StateChanges::default(), Default::default());
let mut context = processors::Context::default();
let state_store = self.state_store.clone();
let mut ambiguity_cache = AmbiguityCache::new(state_store.inner.clone());
let global_account_data_processor =
processors::account_data::global(&extensions.account_data.global);
let push_rules = self.get_push_rules(&global_account_data_processor).await?;
let mut new_rooms = RoomUpdates::default();
let mut room_updates = RoomUpdates::default();
let mut notifications = Default::default();
let mut rooms_account_data = extensions.account_data.rooms.clone();
let user_id = self
.session_meta()
@@ -173,107 +144,82 @@ impl BaseClient {
.user_id
.to_owned();
for (room_id, response_room_data) in rooms {
let (room_info, joined_room, left_room, invited_room, knocked_room) = self
.process_sliding_sync_room(
&mut context,
for (room_id, room_response) in rooms {
let Some((room_info, room_update)) = processors::room::msc4186::update_any_room(
&mut context,
&user_id,
processors::room::RoomCreationData::new(
room_id,
requested_required_states.for_room(room_id),
response_room_data,
&mut rooms_account_data,
&state_store,
&user_id,
&global_account_data_processor,
&mut notifications,
self.room_info_notable_update_sender.clone(),
requested_required_states,
&mut ambiguity_cache,
)
.await?;
),
room_response,
&extensions.account_data.rooms,
#[cfg(feature = "e2e-encryption")]
processors::e2ee::E2EE::new(
self.olm_machine().await.as_ref(),
self.decryption_trust_requirement,
self.handle_verification_events,
),
processors::notification::Notification::new(
&push_rules,
&mut notifications,
&self.state_store,
),
)
.await?
else {
continue;
};
context.state_changes.add_room(room_info);
if let Some(joined_room) = joined_room {
new_rooms.join.insert(room_id.clone(), joined_room);
}
let room_id = room_id.to_owned();
if let Some(left_room) = left_room {
new_rooms.leave.insert(room_id.clone(), left_room);
}
use processors::room::msc4186::RoomUpdateKind;
if let Some(invited_room) = invited_room {
new_rooms.invite.insert(room_id.clone(), invited_room);
}
if let Some(knocked_room) = knocked_room {
new_rooms.knocked.insert(room_id.clone(), knocked_room);
match room_update {
RoomUpdateKind::Joined(joined_room_update) => {
room_updates.joined.insert(room_id, joined_room_update);
}
RoomUpdateKind::Left(left_room_update) => {
room_updates.left.insert(room_id, left_room_update);
}
RoomUpdateKind::Invited(invited_room_update) => {
room_updates.invited.insert(room_id, invited_room_update);
}
RoomUpdateKind::Knocked(knocked_room_update) => {
room_updates.knocked.insert(room_id, knocked_room_update);
}
}
}
// Handle read receipts and typing notifications independently of the rooms:
// these both live in a different subsection of the server's response,
// so they may exist without any update for the associated room.
for (room_id, raw) in &extensions.receipts.rooms {
match raw.deserialize() {
Ok(event) => {
context.state_changes.add_receipts(room_id, event.content);
}
Err(e) => {
let event_id: Option<String> = raw.get_field("event_id").ok().flatten();
#[rustfmt::skip]
warn!(
?room_id, event_id,
"Failed to deserialize read receipt room event: {e}"
);
}
}
processors::room::msc4186::extensions::dispatch_ephemeral_events(
&mut context,
&extensions.receipts,
&extensions.typing,
// We assume this can only happen in joined rooms, or something's very wrong.
new_rooms
.join
.entry(room_id.to_owned())
.or_insert_with(JoinedRoomUpdate::default)
.ephemeral
.push(raw.clone().cast());
}
&mut room_updates.joined,
);
for (room_id, raw) in &extensions.typing.rooms {
// We assume this can only happen in joined rooms, or something's very wrong.
new_rooms
.join
.entry(room_id.to_owned())
.or_insert_with(JoinedRoomUpdate::default)
.ephemeral
.push(raw.clone().cast());
}
// Handle room account data
for (room_id, raw) in &rooms_account_data {
processors::account_data::for_room(&mut context, room_id, raw, &self.state_store).await;
if let Some(room) = self.state_store.room(room_id) {
match room.state() {
RoomState::Joined => new_rooms
.join
.entry(room_id.to_owned())
.or_insert_with(JoinedRoomUpdate::default)
.account_data
.append(&mut raw.to_vec()),
RoomState::Left | RoomState::Banned => new_rooms
.leave
.entry(room_id.to_owned())
.or_insert_with(LeftRoomUpdate::default)
.account_data
.append(&mut raw.to_vec()),
RoomState::Invited | RoomState::Knocked => {}
}
}
}
// Handle room account data.
processors::room::msc4186::extensions::room_account_data(
&mut context,
&extensions.account_data,
&mut room_updates,
&self.state_store,
)
.await;
// Rooms in `new_rooms.join` either have a timeline update, or a new read
// receipt. Update the read receipt accordingly.
let user_id = &self.session_meta().expect("logged in user").user_id;
for (room_id, joined_room_update) in &mut new_rooms.join {
for (room_id, joined_room_update) in &mut room_updates.joined {
if let Some(mut room_info) = context
.state_changes
.room_infos
@@ -306,16 +252,6 @@ impl BaseClient {
global_account_data_processor.apply(&mut context, &state_store).await;
// FIXME not yet supported by sliding sync.
// changes.presence = presence
// .events
// .iter()
// .filter_map(|e| {
// let event = e.deserialize().ok()?;
// Some((event.sender, e.clone()))
// })
// .collect();
context.state_changes.ambiguity_maps = ambiguity_cache.cache;
processors::changes::save_and_apply(
@@ -331,510 +267,16 @@ impl BaseClient {
// live in memory, until the next sync which will saves the room info to
// disk; we do this to avoid saving that would be redundant with the
// above. Oh well.
new_rooms.update_in_memory_caches(&self.state_store).await;
room_updates.update_in_memory_caches(&self.state_store).await;
Ok(SyncResponse {
rooms: new_rooms,
rooms: room_updates,
notifications,
// FIXME not yet supported by sliding sync.
presence: Default::default(),
account_data: extensions.account_data.global.clone(),
to_device: Default::default(),
})
}
#[allow(clippy::too_many_arguments)]
async fn process_sliding_sync_room(
&self,
context: &mut processors::Context,
room_id: &RoomId,
requested_required_states: &[(StateEventType, String)],
room_data: &http::response::Room,
rooms_account_data: &mut BTreeMap<OwnedRoomId, Vec<Raw<AnyRoomAccountDataEvent>>>,
state_store: &BaseStateStore,
user_id: &UserId,
global_account_data_processor: &processors::account_data::Global,
notifications: &mut BTreeMap<OwnedRoomId, Vec<Notification>>,
ambiguity_cache: &mut AmbiguityCache,
) -> Result<(
RoomInfo,
Option<JoinedRoomUpdate>,
Option<LeftRoomUpdate>,
Option<InvitedRoom>,
Option<KnockedRoom>,
)> {
// Read state events from the `required_state` field.
//
// Don't read state events from the `timeline` field, because they might be
// incomplete or staled already. We must only read state events from
// `required_state`.
let (raw_state_events, state_events) =
processors::state_events::sync::collect(context, &room_data.required_state);
// Find or create the room in the store
let is_new_room = !state_store.room_exists(room_id);
let invite_state_events = room_data
.invite_state
.as_ref()
.map(|events| processors::state_events::stripped::collect(context, events));
#[allow(unused_mut)] // Required for some feature flag combinations
let (mut room, mut room_info, invited_room, knocked_room) = self
.process_sliding_sync_room_membership(
context,
&state_events,
&invite_state_events,
state_store,
user_id,
room_id,
);
room_info.mark_state_partially_synced();
room_info.handle_encryption_state(requested_required_states);
#[cfg_attr(not(feature = "e2e-encryption"), allow(unused))]
let new_user_ids = processors::state_events::dispatch_and_get_new_users(
context,
(&raw_state_events, &state_events),
&mut room_info,
ambiguity_cache,
)
.await?;
let push_rules = self.get_push_rules(global_account_data_processor).await?;
// This will be used for both invited and knocked rooms.
if let Some(invite_state) = invite_state_events {
self.handle_invited_state(
context,
&room,
invite_state,
&push_rules,
&mut room_info,
notifications,
)
.await?;
}
process_room_properties(context, room_id, room_data, &mut room_info, is_new_room);
let timeline = processors::timeline::build(
context,
&room,
&mut room_info,
processors::timeline::builder::Timeline::from(room_data),
processors::timeline::builder::Notification::new(
&push_rules,
notifications,
&self.state_store,
),
#[cfg(feature = "e2e-encryption")]
processors::timeline::builder::E2EE::new(
self.olm_machine().await.as_ref(),
self.decryption_trust_requirement,
self.handle_verification_events,
),
)
.await?;
// Cache the latest decrypted event in room_info, and also keep any later
// encrypted events, so we can slot them in when we get the keys.
#[cfg(feature = "e2e-encryption")]
cache_latest_events(
&room,
&mut room_info,
&timeline.events,
Some(&context.state_changes),
Some(state_store),
)
.await;
#[cfg(feature = "e2e-encryption")]
processors::e2ee::tracked_users::update_or_set_if_room_is_newly_encrypted(
context,
self.olm_machine().await.as_ref(),
&new_user_ids,
room_info.encryption_state(),
room.encryption_state(),
room_id,
state_store,
)
.await?;
let notification_count = room_data.unread_notifications.clone().into();
room_info.update_notification_count(notification_count);
let ambiguity_changes = ambiguity_cache.changes.remove(room_id).unwrap_or_default();
let room_account_data = rooms_account_data.get(room_id).cloned();
match room_info.state() {
RoomState::Joined => {
// Ephemeral events are added separately, because we might not
// have a room subsection in the response, yet we may have receipts for
// that room.
let ephemeral = Vec::new();
Ok((
room_info,
Some(JoinedRoomUpdate::new(
timeline,
raw_state_events,
room_account_data.unwrap_or_default(),
ephemeral,
notification_count,
ambiguity_changes,
)),
None,
None,
None,
))
}
RoomState::Left | RoomState::Banned => Ok((
room_info,
None,
Some(LeftRoomUpdate::new(
timeline,
raw_state_events,
room_account_data.unwrap_or_default(),
ambiguity_changes,
)),
None,
None,
)),
RoomState::Invited => Ok((room_info, None, None, invited_room, None)),
RoomState::Knocked => Ok((room_info, None, None, None, knocked_room)),
}
}
/// Look through the sliding sync data for this room, find/create it in the
/// store, and process any invite information.
/// If any invite_state exists, we take it to mean that we are invited to
/// this room, unless that state contains membership events that specify
/// otherwise. https://github.com/matrix-org/matrix-spec-proposals/blob/kegan/sync-v3/proposals/3575-sync.md#room-list-parameters
fn process_sliding_sync_room_membership(
&self,
context: &mut processors::Context,
state_events: &[AnySyncStateEvent],
invite_state_events: &Option<(Vec<Raw<AnyStrippedStateEvent>>, Vec<AnyStrippedStateEvent>)>,
store: &BaseStateStore,
user_id: &UserId,
room_id: &RoomId,
) -> (Room, RoomInfo, Option<InvitedRoom>, Option<KnockedRoom>) {
if let Some(state_events) = invite_state_events {
let room = store.get_or_create_room(
room_id,
RoomState::Invited,
self.room_info_notable_update_sender.clone(),
);
let mut room_info = room.clone_info();
// We need to find the membership event since it could be for either an invited
// or knocked room
let membership_event_content = state_events.1.iter().find_map(|event| {
if let AnyStrippedStateEvent::RoomMember(membership_event) = event {
if membership_event.state_key == user_id {
return Some(membership_event.content.clone());
}
}
None
});
if let Some(membership_event_content) = membership_event_content {
if membership_event_content.membership == MembershipState::Knock {
// If we have a `Knock` membership state, set the room as such
room_info.mark_as_knocked();
let raw_events = state_events.0.clone();
let knock_state = assign!(v3::KnockState::default(), { events: raw_events });
let knocked_room =
assign!(KnockedRoom::default(), { knock_state: knock_state });
return (room, room_info, None, Some(knocked_room));
}
}
// Otherwise assume it's an invited room
room_info.mark_as_invited();
let raw_events = state_events.0.clone();
let invited_room = InvitedRoom::from(v3::InviteState::from(raw_events));
(room, room_info, Some(invited_room), None)
} else {
let room = store.get_or_create_room(
room_id,
RoomState::Joined,
self.room_info_notable_update_sender.clone(),
);
let mut room_info = room.clone_info();
// We default to considering this room joined if it's not an invite. If it's
// actually left (and we remembered to request membership events in
// our sync request), then we can find this out from the events in
// required_state by calling handle_own_room_membership.
room_info.mark_as_joined();
// We don't need to do this in a v2 sync, because the membership of a room can
// be figured out by whether the room is in the "join", "leave" etc.
// property. In sliding sync we only have invite_state,
// required_state and timeline, so we must process required_state and timeline
// looking for relevant membership events.
self.handle_own_room_membership(context, state_events, &mut room_info);
(room, room_info, None, None)
}
}
/// Find any m.room.member events that refer to the current user, and update
/// the state in room_info to reflect the "membership" property.
fn handle_own_room_membership(
&self,
context: &mut processors::Context,
state_events: &[AnySyncStateEvent],
room_info: &mut RoomInfo,
) {
let Some(meta) = self.session_meta() else {
return;
};
// Start from the last event; the first membership event we see in that order is
// the last in the regular order, so that's the only one we need to
// consider.
for event in state_events.iter().rev() {
if let AnySyncStateEvent::RoomMember(member) = &event {
// If this event updates the current user's membership, record that in the
// room_info.
if member.state_key() == meta.user_id.as_str() {
let new_state: RoomState = member.membership().into();
if new_state != room_info.state() {
room_info.set_state(new_state);
// Update an existing notable update entry or create a new one
context
.room_info_notable_updates
.entry(room_info.room_id.to_owned())
.or_default()
.insert(RoomInfoNotableUpdateReasons::MEMBERSHIP);
}
break;
}
}
}
}
}
/// Find the most recent decrypted event and cache it in the supplied RoomInfo.
///
/// If any encrypted events are found after that one, store them in the RoomInfo
/// too so we can use them when we get the relevant keys.
///
/// It is the responsibility of the caller to update the `RoomInfo` instance
/// stored in the `Room`.
#[cfg(feature = "e2e-encryption")]
async fn cache_latest_events(
room: &Room,
room_info: &mut RoomInfo,
events: &[TimelineEvent],
changes: Option<&StateChanges>,
store: Option<&BaseStateStore>,
) {
use crate::{
deserialized_responses::DisplayName, store::ambiguity_map::is_display_name_ambiguous,
};
let mut encrypted_events =
Vec::with_capacity(room.latest_encrypted_events.read().unwrap().capacity());
// Try to get room power levels from the current changes
let power_levels_from_changes = || {
let state_changes = changes?.state.get(room_info.room_id())?;
let room_power_levels_state =
state_changes.get(&StateEventType::RoomPowerLevels)?.values().next()?;
match room_power_levels_state.deserialize().ok()? {
AnySyncStateEvent::RoomPowerLevels(ev) => Some(ev.power_levels()),
_ => None,
}
};
// If we didn't get any info, try getting it from local data
let power_levels = match power_levels_from_changes() {
Some(power_levels) => Some(power_levels),
None => room.power_levels().await.ok(),
};
let power_levels_info = Some(room.own_user_id()).zip(power_levels.as_ref());
for event in events.iter().rev() {
if let Ok(timeline_event) = event.raw().deserialize() {
match is_suitable_for_latest_event(&timeline_event, power_levels_info) {
PossibleLatestEvent::YesRoomMessage(_)
| PossibleLatestEvent::YesPoll(_)
| PossibleLatestEvent::YesCallInvite(_)
| PossibleLatestEvent::YesCallNotify(_)
| PossibleLatestEvent::YesSticker(_)
| PossibleLatestEvent::YesKnockedStateEvent(_) => {
// We found a suitable latest event. Store it.
// In order to make the latest event fast to read, we want to keep the
// associated sender in cache. This is a best-effort to gather enough
// information for creating a user profile as fast as possible. If information
// are missing, let's go back on the “slow” path.
let mut sender_profile = None;
let mut sender_name_is_ambiguous = None;
// First off, look up the sender's profile from the `StateChanges`, they are
// likely to be the most recent information.
if let Some(changes) = changes {
sender_profile = changes
.profiles
.get(room.room_id())
.and_then(|profiles_by_user| {
profiles_by_user.get(timeline_event.sender())
})
.cloned();
if let Some(sender_profile) = sender_profile.as_ref() {
sender_name_is_ambiguous = sender_profile
.as_original()
.and_then(|profile| profile.content.displayname.as_ref())
.and_then(|display_name| {
let display_name = DisplayName::new(display_name);
changes.ambiguity_maps.get(room.room_id()).and_then(
|map_for_room| {
map_for_room.get(&display_name).map(|users| {
is_display_name_ambiguous(&display_name, users)
})
},
)
});
}
}
// Otherwise, look up the sender's profile from the `Store`.
if sender_profile.is_none() {
if let Some(store) = store {
sender_profile = store
.get_profile(room.room_id(), timeline_event.sender())
.await
.ok()
.flatten();
// TODO: need to update `sender_name_is_ambiguous`,
// but how?
}
}
let latest_event = Box::new(LatestEvent::new_with_sender_details(
event.clone(),
sender_profile,
sender_name_is_ambiguous,
));
// Store it in the return RoomInfo (it will be saved for us in the room later).
room_info.latest_event = Some(latest_event);
// We don't need any of the older encrypted events because we have a new
// decrypted one.
room.latest_encrypted_events.write().unwrap().clear();
// We can stop looking through the timeline now because everything else is
// older.
break;
}
PossibleLatestEvent::NoEncrypted => {
// m.room.encrypted - this might be the latest event later - we can't tell until
// we are able to decrypt it, so store it for now
//
// Check how many encrypted events we have seen. Only store another if we
// haven't already stored the maximum number.
if encrypted_events.len() < encrypted_events.capacity() {
encrypted_events.push(event.raw().clone());
}
}
_ => {
// Ignore unsuitable events
}
}
} else {
warn!(
"Failed to deserialize event as AnySyncTimelineEvent. ID={}",
event.event_id().expect("Event has no ID!")
);
}
}
// Push the encrypted events we found into the Room, in reverse order, so
// the latest is last
room.latest_encrypted_events.write().unwrap().extend(encrypted_events.into_iter().rev());
}
fn process_room_properties(
context: &mut processors::Context,
room_id: &RoomId,
room_data: &http::response::Room,
room_info: &mut RoomInfo,
is_new_room: bool,
) {
// Handle the room's avatar.
//
// It can be updated via the state events, or via the
// [`http::ResponseRoom::avatar`] field. This part of the code handles the
// latter case. The former case is handled by [`BaseClient::handle_state`].
match &room_data.avatar {
// A new avatar!
JsOption::Some(avatar_uri) => room_info.update_avatar(Some(avatar_uri.to_owned())),
// Avatar must be removed.
JsOption::Null => room_info.update_avatar(None),
// Nothing to do.
JsOption::Undefined => {}
}
// Sliding sync doesn't have a room summary, nevertheless it contains the joined
// and invited member counts, in addition to the heroes if it's been configured
// to return them (see the [`http::RequestRoomSubscription::include_heroes`]).
if let Some(count) = room_data.joined_count {
room_info.update_joined_member_count(count.into());
}
if let Some(count) = room_data.invited_count {
room_info.update_invited_member_count(count.into());
}
if let Some(heroes) = &room_data.heroes {
room_info.update_heroes(
heroes
.iter()
.map(|hero| RoomHero {
user_id: hero.user_id.clone(),
display_name: hero.name.clone(),
avatar_url: hero.avatar.clone(),
})
.collect(),
);
}
room_info.set_prev_batch(room_data.prev_batch.as_deref());
if room_data.limited {
room_info.mark_members_missing();
}
if let Some(recency_stamp) = &room_data.bump_stamp {
let recency_stamp: u64 = (*recency_stamp).into();
if room_info.recency_stamp.as_ref() != Some(&recency_stamp) {
room_info.update_recency_stamp(recency_stamp);
// If it's not a new room, let's emit a `RECENCY_STAMP` update.
// For a new room, the room will appear as new, so we don't care about this
// update.
if !is_new_room {
context
.room_info_notable_updates
.entry(room_id.to_owned())
.or_default()
.insert(RoomInfoNotableUpdateReasons::RECENCY_STAMP);
}
}
}
}
#[cfg(all(test, not(target_family = "wasm")))]
@@ -874,9 +316,9 @@ mod tests {
};
use serde_json::json;
#[cfg(feature = "e2e-encryption")]
use super::cache_latest_events;
use super::http;
#[cfg(feature = "e2e-encryption")]
use super::processors::room::msc4186::cache_latest_events;
use crate::{
rooms::normal::{RoomHero, RoomInfoNotableUpdateReasons},
test_utils::logged_in_base_client,
@@ -909,7 +351,7 @@ mod tests {
.expect("Failed to process sync");
// Check it's present in the response.
let room = sync_response.rooms.join.get(room_id).unwrap();
let room = sync_response.rooms.joined.get(room_id).unwrap();
assert_eq!(room.unread_notifications, count.clone().into());
// Check it's been updated in the store.
@@ -950,9 +392,9 @@ mod tests {
assert_eq!(client_room.state(), RoomState::Joined);
// And it is added to the list of joined rooms only.
assert!(sync_resp.rooms.join.contains_key(room_id));
assert!(!sync_resp.rooms.leave.contains_key(room_id));
assert!(!sync_resp.rooms.invite.contains_key(room_id));
assert!(sync_resp.rooms.joined.contains_key(room_id));
assert!(!sync_resp.rooms.left.contains_key(room_id));
assert!(!sync_resp.rooms.invited.contains_key(room_id));
}
#[async_test]
@@ -978,9 +420,9 @@ mod tests {
assert_eq!(client_room.state(), RoomState::Joined);
// And it is added to the list of joined rooms only.
assert!(sync_resp.rooms.join.contains_key(room_id));
assert!(!sync_resp.rooms.leave.contains_key(room_id));
assert!(!sync_resp.rooms.invite.contains_key(room_id));
assert!(sync_resp.rooms.joined.contains_key(room_id));
assert!(!sync_resp.rooms.left.contains_key(room_id));
assert!(!sync_resp.rooms.invited.contains_key(room_id));
assert!(!sync_resp.rooms.knocked.contains_key(room_id));
}
@@ -1038,9 +480,9 @@ mod tests {
assert_eq!(client_room.state(), RoomState::Invited);
// And it is added to the list of invited rooms only.
assert!(!sync_resp.rooms.join.contains_key(room_id));
assert!(!sync_resp.rooms.leave.contains_key(room_id));
assert!(sync_resp.rooms.invite.contains_key(room_id));
assert!(!sync_resp.rooms.joined.contains_key(room_id));
assert!(!sync_resp.rooms.left.contains_key(room_id));
assert!(sync_resp.rooms.invited.contains_key(room_id));
assert!(!sync_resp.rooms.knocked.contains_key(room_id));
}
@@ -1184,9 +626,9 @@ mod tests {
assert_eq!(client.get_room(room_id).unwrap().state(), RoomState::Left);
// And it is added to the list of left rooms only.
assert!(!sync_resp.rooms.join.contains_key(room_id));
assert!(sync_resp.rooms.leave.contains_key(room_id));
assert!(!sync_resp.rooms.invite.contains_key(room_id));
assert!(!sync_resp.rooms.joined.contains_key(room_id));
assert!(sync_resp.rooms.left.contains_key(room_id));
assert!(!sync_resp.rooms.invited.contains_key(room_id));
assert!(!sync_resp.rooms.knocked.contains_key(room_id));
}
@@ -1235,9 +677,9 @@ mod tests {
}
// And it is added to the list of left rooms only.
assert!(!sync_resp.rooms.join.contains_key(room_id));
assert!(sync_resp.rooms.leave.contains_key(room_id));
assert!(!sync_resp.rooms.invite.contains_key(room_id));
assert!(!sync_resp.rooms.joined.contains_key(room_id));
assert!(sync_resp.rooms.left.contains_key(room_id));
assert!(!sync_resp.rooms.invited.contains_key(room_id));
assert!(!sync_resp.rooms.knocked.contains_key(room_id));
}
}
@@ -1550,8 +992,8 @@ mod tests {
assert_eq!(client_room.state(), RoomState::Invited);
// And it is added to the list of invited rooms, not the joined ones
assert!(!sync_resp.rooms.invite[room_id].invite_state.is_empty());
assert!(!sync_resp.rooms.join.contains_key(room_id));
assert!(!sync_resp.rooms.invited[room_id].invite_state.is_empty());
assert!(!sync_resp.rooms.joined.contains_key(room_id));
}
#[async_test]

View File

@@ -321,7 +321,7 @@ impl BaseStateStore {
pub fn get_or_create_room(
&self,
room_id: &RoomId,
room_type: RoomState,
room_state: RoomState,
room_info_notable_update_sender: broadcast::Sender<RoomInfoNotableUpdate>,
) -> Room {
let user_id =
@@ -335,7 +335,7 @@ impl BaseStateStore {
user_id,
self.inner.clone(),
room_id,
room_type,
room_state,
room_info_notable_update_sender,
)
})

View File

@@ -17,11 +17,11 @@
use std::{collections::BTreeMap, fmt};
use matrix_sdk_common::{debug::DebugRawEvent, deserialized_responses::TimelineEvent};
pub use ruma::api::client::sync::sync_events::v3::{
InvitedRoom as InvitedRoomUpdate, KnockedRoom as KnockedRoomUpdate,
};
use ruma::{
api::client::sync::sync_events::{
v3::{InvitedRoom as InvitedRoomUpdate, KnockedRoom as KnockedRoomUpdate},
UnreadNotificationsCount as RumaUnreadNotificationsCount,
},
api::client::sync::sync_events::UnreadNotificationsCount as RumaUnreadNotificationsCount,
events::{
presence::PresenceEvent, AnyGlobalAccountDataEvent, AnyRoomAccountDataEvent,
AnySyncEphemeralRoomEvent, AnySyncStateEvent, AnyToDeviceEvent,
@@ -72,11 +72,11 @@ impl fmt::Debug for SyncResponse {
#[derive(Clone, Default)]
pub struct RoomUpdates {
/// The rooms that the user has left or been banned from.
pub leave: BTreeMap<OwnedRoomId, LeftRoomUpdate>,
pub left: BTreeMap<OwnedRoomId, LeftRoomUpdate>,
/// The rooms that the user has joined.
pub join: BTreeMap<OwnedRoomId, JoinedRoomUpdate>,
pub joined: BTreeMap<OwnedRoomId, JoinedRoomUpdate>,
/// The rooms that the user has been invited to.
pub invite: BTreeMap<OwnedRoomId, InvitedRoomUpdate>,
pub invited: BTreeMap<OwnedRoomId, InvitedRoomUpdate>,
/// The rooms that the user has knocked on.
pub knocked: BTreeMap<OwnedRoomId, KnockedRoomUpdate>,
}
@@ -87,10 +87,10 @@ impl RoomUpdates {
/// This will only fill the in-memory caches, not save the info on disk.
pub(crate) async fn update_in_memory_caches(&self, store: &BaseStateStore) {
for room in self
.leave
.left
.keys()
.chain(self.join.keys())
.chain(self.invite.keys())
.chain(self.joined.keys())
.chain(self.invited.keys())
.chain(self.knocked.keys())
.filter_map(|room_id| store.room(room_id))
{
@@ -103,9 +103,9 @@ impl RoomUpdates {
impl fmt::Debug for RoomUpdates {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("RoomUpdates")
.field("leave", &self.leave)
.field("join", &self.join)
.field("invite", &DebugInvitedRoomUpdates(&self.invite))
.field("leave", &self.left)
.field("join", &self.joined)
.field("invite", &DebugInvitedRoomUpdates(&self.invited))
.field("knocked", &DebugKnockedRoomUpdates(&self.knocked))
.finish()
}

View File

@@ -6,9 +6,16 @@ All notable changes to this project will be documented in this file.
## [Unreleased] - ReleaseDate
### Features
- Add experimental APIs for sharing encrypted room key history with new members, `Store::build_room_key_bundle` and `OlmMachine::share_room_key_bundle_data`.
([#4775](https://github.com/matrix-org/matrix-rust-sdk/pull/4775), [#4864](https://github.com/matrix-org/matrix-rust-sdk/pull/4864))
- Check the `sender_device_keys` field on *all* incoming Olm-encrypted to-device messages
and ignore any to-device messages which include the field but whose data is invalid
(as per [MSC4147](https://github.com/matrix-org/matrix-spec-proposals/pull/4147)).
([#4922](https://github.com/matrix-org/matrix-rust-sdk/pull/4922))
## [0.11.0] - 2025-04-11
### Features

View File

@@ -207,6 +207,14 @@ pub enum EventError {
decrypted event: expected {0}, got {1:?}"
)]
MismatchedRoom(OwnedRoomId, Option<OwnedRoomId>),
/// The event includes `sender_device_keys` as per [MSC4147], but the
/// signature was invalid, or the ed25519 or curve25519 key did not
/// match other data in the event.
///
/// [MSC4147]: https://github.com/matrix-org/matrix-spec-proposals/pull/4147
#[error("the event included sender_device_keys which were invalid in some way")]
InvalidSenderDeviceKeys,
}
/// Error type describing different errors that can happen when we create an

View File

@@ -18,7 +18,7 @@ use assert_matches::assert_matches;
use futures_core::Stream;
use futures_util::{FutureExt, StreamExt};
use matrix_sdk_test::async_test;
use ruma::{room_id, user_id, RoomId, TransactionId, UserId};
use ruma::{events::AnyToDeviceEvent, room_id, serde::Raw, user_id, RoomId, TransactionId, UserId};
use serde::Serialize;
use serde_json::json;
use tokio_stream::wrappers::errors::BroadcastStreamRecvError;
@@ -283,7 +283,10 @@ async fn create_and_share_session_without_sender_data(
}
/// Pipe a to-device event into an [`OlmMachine`].
async fn receive_to_device_event<C>(machine: &OlmMachine, event: &ToDeviceEvent<C>)
pub async fn receive_to_device_event<C>(
machine: &OlmMachine,
event: &ToDeviceEvent<C>,
) -> (Vec<Raw<AnyToDeviceEvent>>, Vec<RoomKeyInfo>)
where
C: EventType + Serialize + Debug,
{
@@ -298,7 +301,7 @@ where
next_batch_token: None,
})
.await
.expect("Error receiving to-device event");
.expect("Error receiving to-device event")
}
/// Given the `room_keys_received_stream`, check that there is a pending update,

View File

@@ -24,12 +24,17 @@ use ruma::{
events::{dummy::ToDeviceDummyEventContent, AnyToDeviceEvent},
user_id, DeviceKeyAlgorithm, DeviceKeyId, SecondsSinceUnixEpoch,
};
use serde_json::json;
use vodozemac::Ed25519SecretKey;
use crate::{
machine::{
test_helpers::{create_session, get_machine_pair, get_machine_pair_with_session},
test_helpers::{
create_session, get_machine_pair, get_machine_pair_with_session,
get_machine_pair_with_setup_sessions_test_helper,
},
tests,
tests::megolm_sender_data::receive_to_device_event,
},
olm::utility::SignJson,
store::Changes,
@@ -242,3 +247,52 @@ async fn test_olm_encryption() {
async fn test_olm_encryption_with_fallback_key() {
olm_encryption_test_helper(true).await;
}
/// Encrypted to-device messages which hold a `sender_device_keys`, but that
/// data is unsigned, should not be decrypted.
#[async_test]
async fn test_decrypt_to_device_message_with_unsigned_sender_keys() {
let (alice, bob) = get_machine_pair_with_setup_sessions_test_helper(
tests::alice_id(),
tests::user_id(),
false,
)
.await;
let mut alice_session = alice
.get_device(bob.user_id(), bob.device_id(), None)
.await
.unwrap()
.unwrap()
.get_most_recent_session()
.await
.unwrap()
.unwrap();
let mut malformed_device_keys = alice_session.our_device_keys.clone();
malformed_device_keys.signatures.clear();
let plaintext = serde_json::to_string(&json!({
"sender": alice.user_id(),
"sender_device": alice.device_id(),
"keys": { "ed25519": alice.identity_keys().ed25519.to_base64() },
"recipient": bob.user_id(),
"recipient_keys": { "ed25519": bob.identity_keys().ed25519.to_base64() },
"type": "org.matrix.test",
"content": {"a": "b"},
"sender_device_keys": malformed_device_keys,
}))
.unwrap();
let ciphertext = alice_session.encrypt_helper(&plaintext).await;
let event = ToDeviceEvent::new(
alice.user_id().to_owned(),
alice_session.build_encrypted_event(ciphertext, None).await.unwrap(),
);
// Bob receives the to-device message
let (to_device_events, _) = receive_to_device_event(&bob, &event).await;
// The to-device event should remain decrypted.
let event = to_device_events.first().expect("Bob did not get a to-device event");
assert_eq!(event.get_field("type").unwrap(), Some("m.room.encrypted"));
}

View File

@@ -1464,6 +1464,9 @@ impl Account {
)
.into())
} else {
// If the event contained sender_device_keys, check them now.
Self::check_sender_device_keys(event.as_ref(), sender_key)?;
// If this event is an `m.room_key` event, defer the check for the
// Ed25519 key of the sender until we decrypt room events. This
// ensures that we receive the room key even if we don't have access
@@ -1496,6 +1499,57 @@ impl Account {
}
}
/// If the plaintext of the decrypted message includes a
/// `sender_device_keys` property per [MSC4147], check that it is valid.
///
/// # Arguments
///
/// * `event` - The decrypted and deserialized plaintext of the event.
/// * `sender_key` - The curve25519 key of the sender of the event.
///
/// [MSC4147]: https://github.com/matrix-org/matrix-spec-proposals/pull/4147
fn check_sender_device_keys(
event: &AnyDecryptedOlmEvent,
sender_key: Curve25519PublicKey,
) -> OlmResult<()> {
let Some(sender_device_keys) = event.sender_device_keys() else {
return Ok(());
};
// Check the signature within the device_keys structure
let sender_device_data = DeviceData::try_from(sender_device_keys).map_err(|err| {
warn!(
"Received a to-device message with sender_device_keys with invalid signature: {:?}",
err
);
OlmError::EventError(EventError::InvalidSenderDeviceKeys)
})?;
// Check that the Ed25519 key in the sender_device_keys matches the `ed25519`
// key in the `keys` field in the event.
if sender_device_data.ed25519_key() != Some(event.keys().ed25519) {
warn!(
"Received a to-device message with sender_device_keys with incorrect ed25519 key: expected {:?}, got {:?}",
event.keys().ed25519,
sender_device_data.ed25519_key(),
);
return Err(OlmError::EventError(EventError::InvalidSenderDeviceKeys));
}
// Check that the Curve25519 key in the sender_device_keys matches the key that
// was used for the Olm session.
if sender_device_data.curve25519_key() != Some(sender_key) {
warn!(
"Received a to-device message with sender_device_keys with incorrect curve25519 key: expected {:?}, got {:?}",
sender_key,
sender_device_data.curve25519_key(),
);
return Err(OlmError::EventError(EventError::InvalidSenderDeviceKeys));
}
Ok(())
}
/// Internal use only.
///
/// Cloning should only be done for testing purposes or when we are certain

View File

@@ -174,7 +174,10 @@ impl<'a> SenderDataFinder<'a> {
if let Some(sender_device_keys) = &room_key_event.sender_device_keys {
// Yes: use the device keys to continue.
// Validate the signature of the DeviceKeys supplied.
// Validate the signature of the DeviceKeys supplied. (We've actually already
// done this when decrypting the event, but doing it again here is
// relatively harmless and is the easiest way of getting hold of a
// DeviceData so that we can follow the rest of this logic).
let sender_device_data = DeviceData::try_from(sender_device_keys)?;
Ok(self.have_device_data(sender_device_data).await?)
} else {

View File

@@ -152,7 +152,7 @@ impl AnyDecryptedOlmEvent {
/// The sender's device keys, if supplied in the message as per MSC4147
pub fn sender_device_keys(&self) -> Option<&DeviceKeys> {
match self {
AnyDecryptedOlmEvent::Custom(_) => None,
AnyDecryptedOlmEvent::Custom(e) => e.sender_device_keys.as_ref(),
AnyDecryptedOlmEvent::RoomKey(e) => e.sender_device_keys.as_ref(),
AnyDecryptedOlmEvent::ForwardedRoomKey(e) => e.sender_device_keys.as_ref(),
AnyDecryptedOlmEvent::SecretSend(e) => e.sender_device_keys.as_ref(),
@@ -274,6 +274,9 @@ pub struct ToDeviceCustomEvent {
pub keys: OlmV1Keys,
/// The recipient's signing keys of the encrypted event.
pub recipient_keys: OlmV1Keys,
/// The device keys if supplied as per MSC4147
#[serde(alias = "org.matrix.msc4147.device_keys")]
pub sender_device_keys: Option<DeviceKeys>,
/// The type of the event.
#[serde(rename = "type")]
pub event_type: String,

View File

@@ -161,7 +161,6 @@ impl RoomListService {
.map(|(state_event, value)| (state_event.clone(), (*value).to_owned()))
.collect(),
)
.include_heroes(Some(true))
.filters(Some(assign!(http::request::ListFilters::default(), {
// As defined in the [SlidingSync MSC](https://github.com/matrix-org/matrix-spec-proposals/blob/9450ced7fb9cf5ea9077d029b3adf36aebfa8709/proposals/3575-sync.md?plain=1#L444)
// If unset, both invited and joined rooms are returned. If false, no invited rooms are

View File

@@ -368,7 +368,6 @@ async fn test_sync_all_states() -> Result<(), Error> {
["m.room.history_visibility", ""],
["io.element.functional_members", ""],
],
"include_heroes": true,
"filters": {
"not_room_types": ["m.space"],
},

View File

@@ -2203,7 +2203,7 @@ impl Client {
/// client
/// .sync_with_callback(sync_settings, |response| async move {
/// let channel = sync_channel;
/// for (room_id, room) in response.rooms.join {
/// for (room_id, room) in response.rooms.joined {
/// for event in room.timeline.events {
/// channel.send(event).await.unwrap();
/// }
@@ -2283,7 +2283,7 @@ impl Client {
/// .sync_with_result_callback(sync_settings, |response| async move {
/// let channel = sync_channel;
/// let sync_response = response?;
/// for (room_id, room) in sync_response.rooms.join {
/// for (room_id, room) in sync_response.rooms.joined {
/// for event in room.timeline.events {
/// channel.send(event).await.unwrap();
/// }
@@ -2356,7 +2356,7 @@ impl Client {
/// Box::pin(client.sync_stream(SyncSettings::default()).await);
///
/// while let Some(Ok(response)) = sync_stream.next().await {
/// for room in response.rooms.join.values() {
/// for room in response.rooms.joined.values() {
/// for e in &room.timeline.events {
/// if let Ok(event) = e.raw().deserialize() {
/// println!("Received event {:?}", event);

View File

@@ -538,7 +538,7 @@ impl EventCacheInner {
let _lock = self.multiple_room_updates_lock.lock().await;
// Left rooms.
for (room_id, left_room_update) in updates.leave {
for (room_id, left_room_update) in updates.left {
let room = self.for_room(&room_id).await?;
if let Err(err) =
@@ -550,7 +550,7 @@ impl EventCacheInner {
}
// Joined rooms.
for (room_id, joined_room_update) in updates.join {
for (room_id, joined_room_update) in updates.joined {
let room = self.for_room(&room_id).await?;
if let Err(err) =
@@ -810,8 +810,8 @@ mod tests {
};
let mut updates = RoomUpdates::default();
updates.join.insert(room_id1.to_owned(), joined_room_update1);
updates.join.insert(room_id2.to_owned(), joined_room_update2);
updates.joined.insert(room_id1.to_owned(), joined_room_update1);
updates.joined.insert(room_id2.to_owned(), joined_room_update2);
// Have the event cache handle them.
event_cache.inner.handle_room_updates(updates).await.unwrap();

View File

@@ -248,7 +248,7 @@ impl<'a> SlidingSyncResponseProcessor<'a> {
///
/// This will only fill the in-memory caches, not save the info on disk.
async fn update_in_memory_caches(client: &Client, response: &SyncResponse) -> Result<()> {
for room_id in response.rooms.join.keys() {
for room_id in response.rooms.joined.keys() {
let Some(room) = client.get_room(room_id) else {
error!(room_id = ?room_id, "Cannot post process a room in sliding sync because it is missing");
continue;

View File

@@ -34,7 +34,6 @@ struct SlidingSyncListCachedData {
pub struct SlidingSyncListBuilder {
sync_mode: SlidingSyncMode,
required_state: Vec<(StateEventType, String)>,
include_heroes: Option<bool>,
filters: Option<http::request::ListFilters>,
timeline_limit: Bound,
pub(crate) name: String,
@@ -57,7 +56,6 @@ impl fmt::Debug for SlidingSyncListBuilder {
.debug_struct("SlidingSyncListBuilder")
.field("sync_mode", &self.sync_mode)
.field("required_state", &self.required_state)
.field("include_heroes", &self.include_heroes)
.field("filters", &self.filters)
.field("timeline_limit", &self.timeline_limit)
.field("name", &self.name)
@@ -73,7 +71,6 @@ impl SlidingSyncListBuilder {
(StateEventType::RoomEncryption, "".to_owned()),
(StateEventType::RoomTombstone, "".to_owned()),
],
include_heroes: None,
filters: None,
timeline_limit: 1,
name: name.into(),
@@ -108,12 +105,6 @@ impl SlidingSyncListBuilder {
self
}
/// Include heroes.
pub fn include_heroes(mut self, value: Option<bool>) -> Self {
self.include_heroes = value;
self
}
/// Any filters to apply to the query.
pub fn filters(mut self, value: Option<http::request::ListFilters>) -> Self {
self.filters = value;
@@ -172,11 +163,7 @@ impl SlidingSyncListBuilder {
// From the builder
sticky: StdRwLock::new(SlidingSyncStickyManager::new(
SlidingSyncListStickyParameters::new(
self.required_state,
self.include_heroes,
self.filters,
),
SlidingSyncListStickyParameters::new(self.required_state, self.filters),
)),
timeline_limit: StdRwLock::new(self.timeline_limit),
name: self.name,

View File

@@ -9,10 +9,6 @@ pub(super) struct SlidingSyncListStickyParameters {
/// Required states to return per room.
required_state: Vec<(StateEventType, String)>,
/// Return a stripped variant of membership events for the users used to
/// calculate the room name.
include_heroes: Option<bool>,
/// Any filters to apply to the query.
filters: Option<http::request::ListFilters>,
}
@@ -20,12 +16,11 @@ pub(super) struct SlidingSyncListStickyParameters {
impl SlidingSyncListStickyParameters {
pub fn new(
required_state: Vec<(StateEventType, String)>,
include_heroes: Option<bool>,
filters: Option<http::request::ListFilters>,
) -> Self {
// Consider that each list will have at least one parameter set, so invalidate
// it by default.
Self { required_state, include_heroes, filters }
Self { required_state, filters }
}
}
@@ -34,7 +29,6 @@ impl StickyData for SlidingSyncListStickyParameters {
fn apply(&self, request: &mut Self::Request) {
request.room_details.required_state = self.required_state.to_vec();
request.include_heroes = self.include_heroes;
request.filters = self.filters.clone();
}
}

View File

@@ -321,14 +321,14 @@ impl SlidingSync {
let updated_rooms = {
let mut rooms_map = self.inner.rooms.write().await;
let mut updated_rooms = Vec::with_capacity(sync_response.rooms.join.len());
let mut updated_rooms = Vec::with_capacity(sync_response.rooms.joined.len());
for (room_id, mut room_data) in sliding_sync_response.rooms.into_iter() {
// `sync_response` contains the rooms with decrypted events if any, so look at
// the timeline events here first if the room exists.
// Otherwise, let's look at the timeline inside the `sliding_sync_response`.
let timeline =
if let Some(joined_room) = sync_response.rooms.join.remove(&room_id) {
if let Some(joined_room) = sync_response.rooms.joined.remove(&room_id) {
joined_room.timeline.events
} else {
room_data.timeline.drain(..).map(TimelineEvent::new).collect()
@@ -363,7 +363,7 @@ impl SlidingSync {
// Since we've removed rooms that were in the room subsection from
// `sync_response.rooms.join`, the remaining ones aren't already present in
// `updated_rooms` and wouldn't cause any duplicates.
updated_rooms.extend(sync_response.rooms.join.keys().cloned());
updated_rooms.extend(sync_response.rooms.joined.keys().cloned());
updated_rooms
};

View File

@@ -178,7 +178,7 @@ impl Client {
// Ignore errors when there are no receivers.
let _ = self.inner.room_updates_sender.send(rooms.clone());
for (room_id, room_info) in &rooms.join {
for (room_id, room_info) in &rooms.joined {
let Some(room) = self.get_room(room_id) else {
error!(?room_id, "Can't call event handler, room not found");
continue;
@@ -207,7 +207,7 @@ impl Client {
self.handle_sync_events(HandlerKind::EphemeralRoomData, room, ephemeral).await?;
}
for (room_id, room_info) in &rooms.leave {
for (room_id, room_info) in &rooms.left {
let Some(room) = self.get_room(room_id) else {
error!(?room_id, "Can't call event handler, room not found");
continue;
@@ -226,7 +226,7 @@ impl Client {
self.handle_sync_timeline_events(room, &timeline.events).await?;
}
for (room_id, room_info) in &rooms.invite {
for (room_id, room_info) in &rooms.invited {
let Some(room) = self.get_room(room_id) else {
error!(?room_id, "Can't call event handler, room not found");
continue;

View File

@@ -349,13 +349,13 @@ async fn test_subscribe_all_room_updates() {
client.sync_once(sync_settings).await.unwrap();
let room_updates = rx.recv().now_or_never().unwrap().unwrap();
assert_let!(RoomUpdates { leave, join, invite, knocked } = room_updates);
assert_let!(RoomUpdates { left, joined, invited, knocked } = room_updates);
// Check the left room updates.
{
assert_eq!(leave.len(), 1);
assert_eq!(left.len(), 1);
let (room_id, update) = leave.iter().next().unwrap();
let (room_id, update) = left.iter().next().unwrap();
assert_eq!(room_id, *MIXED_LEFT_ROOM_ID);
assert!(update.state.is_empty());
@@ -365,9 +365,9 @@ async fn test_subscribe_all_room_updates() {
// Check the joined room updates.
{
assert_eq!(join.len(), 1);
assert_eq!(joined.len(), 1);
let (room_id, update) = join.iter().next().unwrap();
let (room_id, update) = joined.iter().next().unwrap();
assert_eq!(room_id, *MIXED_JOINED_ROOM_ID);
@@ -385,9 +385,9 @@ async fn test_subscribe_all_room_updates() {
// Check the invited room updates.
{
assert_eq!(invite.len(), 1);
assert_eq!(invited.len(), 1);
let (room_id, update) = invite.iter().next().unwrap();
let (room_id, update) = invited.iter().next().unwrap();
assert_eq!(room_id, *MIXED_INVITED_ROOM_ID);
assert_eq!(update.invite_state.events.len(), 2);
@@ -938,7 +938,7 @@ async fn test_test_ambiguity_changes() {
let room = client.get_room(&DEFAULT_TEST_ROOM_ID).unwrap();
let changes = &response.rooms.join.get(*DEFAULT_TEST_ROOM_ID).unwrap().ambiguity_changes;
let changes = &response.rooms.joined.get(*DEFAULT_TEST_ROOM_ID).unwrap().ambiguity_changes;
// A new member always triggers an ambiguity change.
let example_change = changes.get(event_id!("$151800140517rfvjc:localhost")).unwrap();
@@ -1009,7 +1009,7 @@ async fn test_test_ambiguity_changes() {
let response = client.sync_once(SyncSettings::default()).await.unwrap();
server.reset().await;
let changes = &response.rooms.join.get(*DEFAULT_TEST_ROOM_ID).unwrap().ambiguity_changes;
let changes = &response.rooms.joined.get(*DEFAULT_TEST_ROOM_ID).unwrap().ambiguity_changes;
// First joined member made both members ambiguous.
let example_2_change = changes.get(example_2_rename_1_event_id).unwrap();
@@ -1068,7 +1068,7 @@ async fn test_test_ambiguity_changes() {
let response = client.sync_once(SyncSettings::default()).await.unwrap();
server.reset().await;
let changes = &response.rooms.join.get(*DEFAULT_TEST_ROOM_ID).unwrap().ambiguity_changes;
let changes = &response.rooms.joined.get(*DEFAULT_TEST_ROOM_ID).unwrap().ambiguity_changes;
// example 2 is not ambiguous anymore.
let example_2_change = changes.get(example_2_rename_2_event_id).unwrap();
@@ -1114,7 +1114,7 @@ async fn test_test_ambiguity_changes() {
let response = client.sync_once(SyncSettings::default()).await.unwrap();
server.reset().await;
let changes = &response.rooms.join.get(*DEFAULT_TEST_ROOM_ID).unwrap().ambiguity_changes;
let changes = &response.rooms.joined.get(*DEFAULT_TEST_ROOM_ID).unwrap().ambiguity_changes;
// example 3 is now ambiguous with example 2, not example.
let example_3_change = changes.get(example_3_rename_event_id).unwrap();
@@ -1160,7 +1160,7 @@ async fn test_test_ambiguity_changes() {
let response = client.sync_once(SyncSettings::default()).await.unwrap();
server.reset().await;
let changes = &response.rooms.join.get(*DEFAULT_TEST_ROOM_ID).unwrap().ambiguity_changes;
let changes = &response.rooms.joined.get(*DEFAULT_TEST_ROOM_ID).unwrap().ambiguity_changes;
// name change, even if still not ambiguous, triggers ambiguity change.
let example_change = changes.get(example_rename_event_id).unwrap();
@@ -1207,7 +1207,7 @@ async fn test_test_ambiguity_changes() {
server.reset().await;
// Avatar change does not trigger ambiguity change.
assert!(response.rooms.join.get(*DEFAULT_TEST_ROOM_ID).unwrap().ambiguity_changes.is_empty());
assert!(response.rooms.joined.get(*DEFAULT_TEST_ROOM_ID).unwrap().ambiguity_changes.is_empty());
let changes = assert_next_matches!(updates, Ok(RoomUpdate::Joined { updates, .. }) => updates.ambiguity_changes);
assert!(changes.is_empty());

View File

@@ -61,7 +61,7 @@ async fn test_notification() -> Result<()> {
let bob_invite_response = bob.sync_once(Default::default()).await?;
let sync_token = bob_invite_response.next_batch;
let mut invited_rooms = bob_invite_response.rooms.invite;
let mut invited_rooms = bob_invite_response.rooms.invited;
assert_eq!(invited_rooms.len(), 1, "must be only one invitation");
@@ -145,7 +145,7 @@ async fn test_notification() -> Result<()> {
// In this sync, Bob receives the message from Alice.
let bob_response = bob.sync_once(SyncSettings::default().token(sync_token)).await?;
let mut joined_rooms = bob_response.rooms.join.into_iter();
let mut joined_rooms = bob_response.rooms.joined.into_iter();
let (_, bob_room) = joined_rooms.next().expect("must have joined one room");
assert!(joined_rooms.next().is_none(), "no more joined rooms: {joined_rooms:#?}");