diff --git a/.github/workflows/detect-long-path.yml b/.github/workflows/detect-long-path.yml index 86cea3dbe..1542fed24 100644 --- a/.github/workflows/detect-long-path.yml +++ b/.github/workflows/detect-long-path.yml @@ -25,7 +25,7 @@ jobs: - uses: actions/checkout@v4 - name: Check for changed files id: changed-files - uses: tj-actions/changed-files@6f67ee9ac810f0192ea7b3d2086406f97847bcf9 # v45 + uses: tj-actions/changed-files@9b4bb2bedb217d3ede225b6b07ebde713177cd8f # v45 - name: Detect long path env: ALL_CHANGED_FILES: ${{ steps.changed-files.outputs.all_changed_files }} # ignore the deleted files diff --git a/bindings/matrix-sdk-ffi/CHANGELOG.md b/bindings/matrix-sdk-ffi/CHANGELOG.md index be05e05e6..ba73c927e 100644 --- a/bindings/matrix-sdk-ffi/CHANGELOG.md +++ b/bindings/matrix-sdk-ffi/CHANGELOG.md @@ -6,6 +6,10 @@ All notable changes to this project will be documented in this file. ## [Unreleased] - ReleaseDate +Additions: + +- Add room topic string to `StateEventContent` + ## [0.11.0] - 2025-04-11 Breaking changes: @@ -20,7 +24,7 @@ Breaking changes: programs can set it to `true`. - Matrix client API errors coming from API responses will now be mapped to `ClientError::MatrixApi`, containing both the - original message and the associated error code and kind. + original message and the associated error code and kind. - `EventSendState` now has two additional variants: `CrossSigningNotSetup` and `SendingFromUnverifiedDevice`. These indicate that your own device is not diff --git a/bindings/matrix-sdk-ffi/src/event.rs b/bindings/matrix-sdk-ffi/src/event.rs index 188c75e9c..6eb2c4c21 100644 --- a/bindings/matrix-sdk-ffi/src/event.rs +++ b/bindings/matrix-sdk-ffi/src/event.rs @@ -83,7 +83,7 @@ pub enum StateEventContent { RoomServerAcl, RoomThirdPartyInvite, RoomTombstone, - RoomTopic, + RoomTopic { topic: String }, SpaceChild, SpaceParent, } @@ -118,7 +118,11 @@ impl TryFrom for StateEventContent { AnySyncStateEvent::RoomServerAcl(_) => StateEventContent::RoomServerAcl, AnySyncStateEvent::RoomThirdPartyInvite(_) => StateEventContent::RoomThirdPartyInvite, AnySyncStateEvent::RoomTombstone(_) => StateEventContent::RoomTombstone, - AnySyncStateEvent::RoomTopic(_) => StateEventContent::RoomTopic, + AnySyncStateEvent::RoomTopic(content) => { + let content = get_state_event_original_content(content)?; + + StateEventContent::RoomTopic { topic: content.topic } + } AnySyncStateEvent::SpaceChild(_) => StateEventContent::SpaceChild, AnySyncStateEvent::SpaceParent(_) => StateEventContent::SpaceParent, _ => bail!("Unsupported state event"), diff --git a/crates/matrix-sdk-base/src/client.rs b/crates/matrix-sdk-base/src/client.rs index 2b4022a7b..4e96871af 100644 --- a/crates/matrix-sdk-base/src/client.rs +++ b/crates/matrix-sdk-base/src/client.rs @@ -17,7 +17,7 @@ use std::sync::Arc; use std::{ collections::{BTreeMap, BTreeSet, HashMap}, - fmt, iter, + fmt, ops::Deref, }; @@ -38,35 +38,34 @@ use ruma::{ events::{ push_rules::{PushRulesEvent, PushRulesEventContent}, room::member::SyncRoomMemberEvent, - AnyStrippedStateEvent, AnySyncEphemeralRoomEvent, StateEvent, StateEventType, + StateEvent, StateEventType, }, - push::{Action, Ruleset}, - serde::Raw, + push::Ruleset, time::Instant, OwnedRoomId, OwnedUserId, RoomId, }; use tokio::sync::{broadcast, Mutex}; #[cfg(feature = "e2e-encryption")] use tokio::sync::{RwLock, RwLockReadGuard}; -use tracing::{debug, info, instrument}; +use tracing::{debug, enabled, info, instrument, Level}; #[cfg(feature = "e2e-encryption")] use crate::RoomMemberships; use crate::{ - deserialized_responses::{DisplayName, RawAnySyncOrStrippedTimelineEvent}, + deserialized_responses::DisplayName, error::{Error, Result}, event_cache::store::EventCacheStoreLock, response_processors::{self as processors, Context}, rooms::{ normal::{RoomInfoNotableUpdate, RoomInfoNotableUpdateReasons, RoomMembersUpdate}, - Room, RoomInfo, RoomState, + Room, RoomState, }, store::{ ambiguity_map::AmbiguityCache, BaseStateStore, DynStateStore, MemoryStore, Result as StoreResult, RoomLoadSettings, StateChanges, StateStoreDataKey, StateStoreDataValue, StateStoreExt, StoreConfig, }, - sync::{JoinedRoomUpdate, LeftRoomUpdate, Notification, RoomUpdates, SyncResponse}, + sync::{RoomUpdates, SyncResponse}, RoomStateFilter, SessionMeta, }; @@ -367,65 +366,6 @@ impl BaseClient { self.state_store.sync_token.read().await.clone() } - /// Handles the stripped state events in `invite_state`, modifying the - /// room's info and posting notifications as needed. - /// - /// * `room` - The [`Room`] to modify. - /// * `events` - The contents of `invite_state` in the form of list of pairs - /// of raw stripped state events with their deserialized counterpart. - /// * `push_rules` - The push rules for this room. - /// * `room_info` - The current room's info. - /// * `changes` - The accumulated list of changes to apply once the - /// processing is finished. - /// * `notifications` - Notifications to post for the current room. - #[instrument(skip_all, fields(room_id = ?room_info.room_id))] - pub(crate) async fn handle_invited_state( - &self, - context: &mut Context, - room: &Room, - events: (Vec>, Vec), - push_rules: &Ruleset, - room_info: &mut RoomInfo, - notifications: &mut BTreeMap>, - ) -> Result<()> { - let mut state_events = BTreeMap::new(); - - for (raw_event, event) in iter::zip(events.0, events.1) { - room_info.handle_stripped_state_event(&event); - state_events - .entry(event.event_type()) - .or_insert_with(BTreeMap::new) - .insert(event.state_key().to_owned(), raw_event); - } - - context - .state_changes - .stripped_state - .insert(room_info.room_id().to_owned(), state_events.clone()); - - // We need to check for notifications after we have handled all state - // events, to make sure we have the full push context. - if let Some(push_context) = - processors::timeline::get_push_room_context(context, room, room_info, &self.state_store) - .await? - { - // Check every event again for notification. - for event in state_events.values().flat_map(|map| map.values()) { - let actions = push_rules.get_actions(event, &push_context); - if actions.iter().any(Action::should_notify) { - notifications.entry(room.room_id().to_owned()).or_default().push( - Notification { - actions: actions.to_owned(), - event: RawAnySyncOrStrippedTimelineEvent::Stripped(event.clone()), - }, - ); - } - } - } - - Ok(()) - } - /// User has knocked on a room. /// /// Update the internal and cached state accordingly. Return the final Room. @@ -546,13 +486,12 @@ impl BaseClient { return Ok(SyncResponse::default()); } - let now = Instant::now(); + let now = if enabled!(Level::INFO) { Some(Instant::now()) } else { None }; #[cfg(feature = "e2e-encryption")] let olm_machine = self.olm_machine().await; - let mut context = - Context::new(StateChanges::new(response.next_batch.clone()), Default::default()); + let mut context = Context::new(StateChanges::new(response.next_batch.clone())); #[cfg(feature = "e2e-encryption")] let to_device = { @@ -573,9 +512,11 @@ impl BaseClient { .flatten() .filter_map(|room_key_info| self.get_room(&room_key_info.room_id)) .collect(), - olm_machine.as_ref(), - self.decryption_trust_requirement, - self.handle_verification_events, + processors::e2ee::E2EE::new( + olm_machine.as_ref(), + self.decryption_trust_requirement, + self.handle_verification_events, + ), ) .await?; @@ -598,81 +539,24 @@ impl BaseClient { let mut updated_members_in_room: BTreeMap> = BTreeMap::new(); - for (room_id, new_info) in response.rooms.join { - let room = self.state_store.get_or_create_room( - &room_id, - RoomState::Joined, - self.room_info_notable_update_sender.clone(), - ); - - let mut room_info = room.clone_info(); - - room_info.mark_as_joined(); - room_info.update_from_ruma_summary(&new_info.summary); - room_info.set_prev_batch(new_info.timeline.prev_batch.as_deref()); - room_info.mark_state_fully_synced(); - room_info.handle_encryption_state(requested_required_states.for_room(&room_id)); - - let (raw_state_events, state_events) = - processors::state_events::sync::collect(&mut context, &new_info.state.events); - - let mut new_user_ids = processors::state_events::dispatch_and_get_new_users( + for (room_id, joined_room) in response.rooms.join { + let joined_room_update = processors::room::sync_v2::update_joined_room( &mut context, - (&raw_state_events, &state_events), - &mut room_info, - &mut ambiguity_cache, - ) - .await?; - - for raw in &new_info.ephemeral.events { - match raw.deserialize() { - Ok(AnySyncEphemeralRoomEvent::Receipt(event)) => { - context.state_changes.add_receipts(&room_id, event.content); - } - Ok(_) => {} - Err(e) => { - let event_id: Option = raw.get_field("event_id").ok().flatten(); - #[rustfmt::skip] - info!( - ?room_id, event_id, - "Failed to deserialize ephemeral room event: {e}" - ); - } - } - } - - if new_info.timeline.limited { - room_info.mark_members_missing(); - } - - let (raw_state_events_from_timeline, state_events_from_timeline) = - processors::state_events::sync::collect_from_timeline( - &mut context, - &new_info.timeline.events, - ); - - let mut other_new_user_ids = processors::state_events::dispatch_and_get_new_users( - &mut context, - (&raw_state_events_from_timeline, &state_events_from_timeline), - &mut room_info, - &mut ambiguity_cache, - ) - .await?; - new_user_ids.append(&mut other_new_user_ids); - updated_members_in_room.insert(room_id.to_owned(), new_user_ids.clone()); - - let timeline = processors::timeline::build( - &mut context, - &room, - &mut room_info, - processors::timeline::builder::Timeline::from(new_info.timeline), - processors::timeline::builder::Notification::new( + processors::room::RoomCreationData::new( + &room_id, + self.room_info_notable_update_sender.clone(), + requested_required_states, + &mut ambiguity_cache, + ), + joined_room, + &mut updated_members_in_room, + processors::notification::Notification::new( &push_rules, &mut notifications, &self.state_store, ), #[cfg(feature = "e2e-encryption")] - processors::timeline::builder::E2EE::new( + processors::e2ee::E2EE::new( olm_machine.as_ref(), self.decryption_trust_requirement, self.handle_verification_events, @@ -680,106 +564,26 @@ impl BaseClient { ) .await?; - // Save the new `RoomInfo`. - context.state_changes.add_room(room_info); - - processors::account_data::for_room( - &mut context, - &room_id, - &new_info.account_data.events, - &self.state_store, - ) - .await; - - // `Self::handle_room_account_data` might have updated the `RoomInfo`. Let's - // fetch it again. - // - // SAFETY: `unwrap` is safe because the `RoomInfo` has been inserted 2 lines - // above. - let mut room_info = context.state_changes.room_infos.get(&room_id).unwrap().clone(); - - #[cfg(feature = "e2e-encryption")] - processors::e2ee::tracked_users::update_or_set_if_room_is_newly_encrypted( - &mut context, - olm_machine.as_ref(), - &new_user_ids, - room_info.encryption_state(), - room.encryption_state(), - &room_id, - &self.state_store, - ) - .await?; - - let notification_count = new_info.unread_notifications.into(); - room_info.update_notification_count(notification_count); - - let ambiguity_changes = ambiguity_cache.changes.remove(&room_id).unwrap_or_default(); - - new_rooms.join.insert( - room_id, - JoinedRoomUpdate::new( - timeline, - new_info.state.events, - new_info.account_data.events, - new_info.ephemeral.events, - notification_count, - ambiguity_changes, - ), - ); - - context.state_changes.add_room(room_info); + new_rooms.joined.insert(room_id, joined_room_update); } - for (room_id, new_info) in response.rooms.leave { - let room = self.state_store.get_or_create_room( - &room_id, - RoomState::Left, - self.room_info_notable_update_sender.clone(), - ); - - let mut room_info = room.clone_info(); - room_info.mark_as_left(); - room_info.mark_state_partially_synced(); - room_info.handle_encryption_state(requested_required_states.for_room(&room_id)); - - let (raw_state_events, state_events) = - processors::state_events::sync::collect(&mut context, &new_info.state.events); - - let mut new_user_ids = processors::state_events::dispatch_and_get_new_users( + for (room_id, left_room) in response.rooms.leave { + let left_room_update = processors::room::sync_v2::update_left_room( &mut context, - (&raw_state_events, &state_events), - &mut room_info, - &mut ambiguity_cache, - ) - .await?; - - let (raw_state_events_from_timeline, state_events_from_timeline) = - processors::state_events::sync::collect_from_timeline( - &mut context, - &new_info.timeline.events, - ); - - let mut other_new_user_ids = processors::state_events::dispatch_and_get_new_users( - &mut context, - (&raw_state_events_from_timeline, &state_events_from_timeline), - &mut room_info, - &mut ambiguity_cache, - ) - .await?; - new_user_ids.append(&mut other_new_user_ids); - - let timeline = processors::timeline::build( - &mut context, - &room, - &mut room_info, - processors::timeline::builder::Timeline::from(new_info.timeline), - processors::timeline::builder::Notification::new( + processors::room::RoomCreationData::new( + &room_id, + self.room_info_notable_update_sender.clone(), + requested_required_states, + &mut ambiguity_cache, + ), + left_room, + processors::notification::Notification::new( &push_rules, &mut notifications, &self.state_store, ), #[cfg(feature = "e2e-encryption")] - processors::timeline::builder::E2EE::new( + processors::e2ee::E2EE::new( olm_machine.as_ref(), self.decryption_trust_requirement, self.handle_verification_events, @@ -787,90 +591,41 @@ impl BaseClient { ) .await?; - // Save the new `RoomInfo`. - context.state_changes.add_room(room_info); + new_rooms.left.insert(room_id, left_room_update); + } - processors::account_data::for_room( + for (room_id, invited_room) in response.rooms.invite { + let invited_room_update = processors::room::sync_v2::update_invited_room( &mut context, &room_id, - &new_info.account_data.events, - &self.state_store, - ) - .await; - - let ambiguity_changes = ambiguity_cache.changes.remove(&room_id).unwrap_or_default(); - - new_rooms.leave.insert( - room_id, - LeftRoomUpdate::new( - timeline, - new_info.state.events, - new_info.account_data.events, - ambiguity_changes, + invited_room, + self.room_info_notable_update_sender.clone(), + processors::notification::Notification::new( + &push_rules, + &mut notifications, + &self.state_store, ), - ); - } - - for (room_id, new_info) in response.rooms.invite { - let room = self.state_store.get_or_create_room( - &room_id, - RoomState::Invited, - self.room_info_notable_update_sender.clone(), - ); - - let invite_state = processors::state_events::stripped::collect( - &mut context, - &new_info.invite_state.events, - ); - - let mut room_info = room.clone_info(); - room_info.mark_as_invited(); - room_info.mark_state_fully_synced(); - - self.handle_invited_state( - &mut context, - &room, - invite_state, - &push_rules, - &mut room_info, - &mut notifications, ) .await?; - context.state_changes.add_room(room_info); - - new_rooms.invite.insert(room_id, new_info); + new_rooms.invited.insert(room_id, invited_room_update); } - for (room_id, new_info) in response.rooms.knock { - let room = self.state_store.get_or_create_room( + for (room_id, knocked_room) in response.rooms.knock { + let knocked_room_update = processors::room::sync_v2::update_knocked_room( + &mut context, &room_id, - RoomState::Knocked, + knocked_room, self.room_info_notable_update_sender.clone(), - ); - - let knock_state = processors::state_events::stripped::collect( - &mut context, - &new_info.knock_state.events, - ); - - let mut room_info = room.clone_info(); - room_info.mark_as_knocked(); - room_info.mark_state_fully_synced(); - - self.handle_invited_state( - &mut context, - &room, - knock_state, - &push_rules, - &mut room_info, - &mut notifications, + processors::notification::Notification::new( + &push_rules, + &mut notifications, + &self.state_store, + ), ) .await?; - context.state_changes.add_room(room_info); - - new_rooms.knocked.insert(room_id, new_info); + new_rooms.knocked.insert(room_id, knocked_room_update); } global_account_data_processor.apply(&mut context, &self.state_store).await; @@ -913,7 +668,9 @@ impl BaseClient { } } - info!("Processed a sync response in {:?}", now.elapsed()); + if enabled!(Level::INFO) { + info!("Processed a sync response in {:?}", now.map(|now| now.elapsed())); + } let response = SyncResponse { rooms: new_rooms, @@ -958,7 +715,7 @@ impl BaseClient { }; let mut chunk = Vec::with_capacity(response.chunk.len()); - let mut context = Context::new(StateChanges::default(), Default::default()); + let mut context = Context::default(); #[cfg(feature = "e2e-encryption")] let mut user_ids = BTreeSet::new(); diff --git a/crates/matrix-sdk-base/src/deserialized_responses.rs b/crates/matrix-sdk-base/src/deserialized_responses.rs index cc81e33cc..022cbe4fc 100644 --- a/crates/matrix-sdk-base/src/deserialized_responses.rs +++ b/crates/matrix-sdk-base/src/deserialized_responses.rs @@ -263,6 +263,18 @@ pub enum RawAnySyncOrStrippedTimelineEvent { Stripped(Raw), } +impl From> for RawAnySyncOrStrippedTimelineEvent { + fn from(event: Raw) -> Self { + Self::Sync(event) + } +} + +impl From> for RawAnySyncOrStrippedTimelineEvent { + fn from(event: Raw) -> Self { + Self::Stripped(event) + } +} + /// Wrapper around both versions of any raw state event. #[derive(Clone, Debug, Serialize)] #[serde(untagged)] diff --git a/crates/matrix-sdk-base/src/response_processors/e2ee/decrypt.rs b/crates/matrix-sdk-base/src/response_processors/e2ee/decrypt.rs index 58c5e0a67..c59bf5de5 100644 --- a/crates/matrix-sdk-base/src/response_processors/e2ee/decrypt.rs +++ b/crates/matrix-sdk-base/src/response_processors/e2ee/decrypt.rs @@ -13,12 +13,13 @@ // limitations under the License. use matrix_sdk_common::deserialized_responses::TimelineEvent; -use matrix_sdk_crypto::{ - DecryptionSettings, OlmMachine, RoomEventDecryptionResult, TrustRequirement, -}; +use matrix_sdk_crypto::{DecryptionSettings, RoomEventDecryptionResult}; use ruma::{events::AnySyncTimelineEvent, serde::Raw, RoomId}; -use super::super::{verification, Context}; +use super::{ + super::{verification, Context}, + E2EE, +}; use crate::Result; /// Attempt to decrypt the given raw event into a [`TimelineEvent`]. @@ -30,16 +31,14 @@ use crate::Result; /// Returns `Ok(None)` if encryption is not configured. pub async fn sync_timeline_event( context: &mut Context, - olm_machine: Option<&OlmMachine>, + e2ee: E2EE<'_>, event: &Raw, room_id: &RoomId, - decryption_trust_requirement: TrustRequirement, - verification_is_allowed: bool, ) -> Result> { - let Some(olm) = olm_machine else { return Ok(None) }; + let Some(olm) = e2ee.olm_machine else { return Ok(None) }; let decryption_settings = - DecryptionSettings { sender_device_trust_requirement: decryption_trust_requirement }; + DecryptionSettings { sender_device_trust_requirement: e2ee.decryption_trust_requirement }; Ok(Some( match olm.try_decrypt_room_event(event.cast_ref(), room_id, &decryption_settings).await? { @@ -47,14 +46,8 @@ pub async fn sync_timeline_event( let timeline_event = TimelineEvent::from(decrypted); if let Ok(sync_timeline_event) = timeline_event.raw().deserialize() { - verification::process_if_relevant( - context, - &sync_timeline_event, - verification_is_allowed, - olm_machine, - room_id, - ) - .await?; + verification::process_if_relevant(context, &sync_timeline_event, e2ee, room_id) + .await?; } timeline_event diff --git a/crates/matrix-sdk-base/src/response_processors/e2ee/mod.rs b/crates/matrix-sdk-base/src/response_processors/e2ee/mod.rs index cfd0bd336..1ffbe608d 100644 --- a/crates/matrix-sdk-base/src/response_processors/e2ee/mod.rs +++ b/crates/matrix-sdk-base/src/response_processors/e2ee/mod.rs @@ -12,6 +12,26 @@ // See the License for the specific language governing permissions and // limitations under the License. +use matrix_sdk_crypto::{OlmMachine, TrustRequirement}; + pub mod decrypt; pub mod to_device; pub mod tracked_users; + +/// A classical set of data used by some processors in this module. +#[derive(Clone)] +pub struct E2EE<'a> { + pub olm_machine: Option<&'a OlmMachine>, + pub decryption_trust_requirement: TrustRequirement, + pub verification_is_allowed: bool, +} + +impl<'a> E2EE<'a> { + pub fn new( + olm_machine: Option<&'a OlmMachine>, + decryption_trust_requirement: TrustRequirement, + verification_is_allowed: bool, + ) -> Self { + Self { olm_machine, decryption_trust_requirement, verification_is_allowed } + } +} diff --git a/crates/matrix-sdk-base/src/response_processors/ephemeral_events.rs b/crates/matrix-sdk-base/src/response_processors/ephemeral_events.rs new file mode 100644 index 000000000..427a76601 --- /dev/null +++ b/crates/matrix-sdk-base/src/response_processors/ephemeral_events.rs @@ -0,0 +1,50 @@ +// Copyright 2025 The Matrix.org Foundation C.I.C. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use ruma::{events::AnySyncEphemeralRoomEvent, serde::Raw, RoomId}; +use tracing::info; + +use super::Context; + +/// Dispatch [`AnySyncEphemeralRoomEvent`]s on the [`Context`]. +pub fn dispatch( + context: &mut Context, + raw_events: &[Raw], + room_id: &RoomId, +) { + for raw_event in raw_events { + dispatch_receipt(context, raw_event, room_id); + } +} + +/// Dispatch the [`AnySyncEphemeralRoomEvent::Receipt`] on the [`Context`]. +pub(super) fn dispatch_receipt( + context: &mut Context, + raw_event: &Raw, + room_id: &RoomId, +) { + match raw_event.deserialize() { + Ok(AnySyncEphemeralRoomEvent::Receipt(event)) => { + context.state_changes.add_receipts(room_id, event.content); + } + + Ok(_) => {} + + Err(e) => { + let event_id = raw_event.get_field::("event_id").ok().flatten(); + + info!(?room_id, event_id, "Failed to deserialize ephemeral room event: {e}"); + } + } +} diff --git a/crates/matrix-sdk-base/src/response_processors/latest_event.rs b/crates/matrix-sdk-base/src/response_processors/latest_event.rs index f6ad4603e..16d253210 100644 --- a/crates/matrix-sdk-base/src/response_processors/latest_event.rs +++ b/crates/matrix-sdk-base/src/response_processors/latest_event.rs @@ -13,12 +13,10 @@ // limitations under the License. use matrix_sdk_common::deserialized_responses::TimelineEvent; -use matrix_sdk_crypto::{ - DecryptionSettings, OlmMachine, RoomEventDecryptionResult, TrustRequirement, -}; +use matrix_sdk_crypto::{DecryptionSettings, RoomEventDecryptionResult}; use ruma::{events::AnySyncTimelineEvent, serde::Raw, RoomId}; -use super::{verification, Context}; +use super::{e2ee::E2EE, verification, Context}; use crate::{ latest_event::{is_suitable_for_latest_event, LatestEvent, PossibleLatestEvent}, Result, Room, @@ -33,27 +31,19 @@ use crate::{ pub async fn decrypt_from_rooms( context: &mut Context, rooms: Vec, - olm_machine: Option<&OlmMachine>, - decryption_trust_requirement: TrustRequirement, - verification_is_allowed: bool, + e2ee: E2EE<'_>, ) -> Result<()> { - let Some(olm_machine) = olm_machine else { + // All functions used by this one expect an `OlmMachine`. Return if there is + // none. + if e2ee.olm_machine.is_none() { return Ok(()); - }; + } for room in rooms { // Try to find a message we can decrypt and is suitable for using as the latest // event. If we found one, set it as the latest and delete any older // encrypted events - if let Some((found, found_index)) = find_suitable_and_decrypt( - context, - olm_machine, - &room, - &decryption_trust_requirement, - verification_is_allowed, - ) - .await - { + if let Some((found, found_index)) = find_suitable_and_decrypt(context, &room, &e2ee).await { room.on_latest_event_decrypted( found, found_index, @@ -68,10 +58,8 @@ pub async fn decrypt_from_rooms( async fn find_suitable_and_decrypt( context: &mut Context, - olm_machine: &OlmMachine, room: &Room, - decryption_trust_requirement: &TrustRequirement, - verification_is_allowed: bool, + e2ee: &E2EE<'_>, ) -> Option<(Box, usize)> { let enc_events = room.latest_encrypted_events(); let power_levels = room.power_levels().await.ok(); @@ -82,14 +70,8 @@ async fn find_suitable_and_decrypt( // Size of the `decrypt_sync_room_event` future should not impact this // async fn since it is likely that there aren't even any encrypted // events when calling it. - let decrypt_sync_room_event = Box::pin(decrypt_sync_room_event( - context, - olm_machine, - event, - room.room_id(), - decryption_trust_requirement, - verification_is_allowed, - )); + let decrypt_sync_room_event = + Box::pin(decrypt_sync_room_event(context, event, e2ee, room.room_id())); if let Ok(decrypted) = decrypt_sync_room_event.await { // We found an event we can decrypt @@ -119,19 +101,21 @@ async fn find_suitable_and_decrypt( /// representing the decryption error; in the case of problems with our /// application, returns `Err`. /// -/// Returns `Ok(None)` if encryption is not configured. +/// # Panics +/// +/// Panics if there is no [`OlmMachine`] in [`E2EE`]. async fn decrypt_sync_room_event( context: &mut Context, - olm_machine: &OlmMachine, event: &Raw, + e2ee: &E2EE<'_>, room_id: &RoomId, - decryption_trust_requirement: &TrustRequirement, - verification_is_allowed: bool, ) -> Result { let decryption_settings = - DecryptionSettings { sender_device_trust_requirement: *decryption_trust_requirement }; + DecryptionSettings { sender_device_trust_requirement: e2ee.decryption_trust_requirement }; - let event = match olm_machine + let event = match e2ee + .olm_machine + .expect("An `OlmMachine` is expected") .try_decrypt_room_event(event.cast_ref(), room_id, &decryption_settings) .await? { @@ -142,8 +126,7 @@ async fn decrypt_sync_room_event( verification::process_if_relevant( context, &sync_timeline_event, - verification_is_allowed, - Some(olm_machine), + e2ee.clone(), room_id, ) .await?; @@ -167,11 +150,8 @@ mod tests { }; use ruma::{event_id, events::room::member::MembershipState, room_id, user_id}; - use super::{decrypt_from_rooms, Context}; - use crate::{ - rooms::normal::RoomInfoNotableUpdateReasons, test_utils::logged_in_base_client, - StateChanges, - }; + use super::{decrypt_from_rooms, Context, E2EE}; + use crate::{rooms::normal::RoomInfoNotableUpdateReasons, test_utils::logged_in_base_client}; #[async_test] async fn test_when_there_are_no_latest_encrypted_events_decrypting_them_does_nothing() { @@ -203,14 +183,16 @@ mod tests { assert!(room.latest_event().is_none()); // When I tell it to do some decryption - let mut context = Context::new(StateChanges::default(), Default::default()); + let mut context = Context::default(); decrypt_from_rooms( &mut context, vec![room.clone()], - client.olm_machine().await.as_ref(), - client.decryption_trust_requirement, - client.handle_verification_events, + E2EE::new( + client.olm_machine().await.as_ref(), + client.decryption_trust_requirement, + client.handle_verification_events, + ), ) .await .unwrap(); diff --git a/crates/matrix-sdk-base/src/response_processors/mod.rs b/crates/matrix-sdk-base/src/response_processors/mod.rs index 34af1cfeb..ce632df30 100644 --- a/crates/matrix-sdk-base/src/response_processors/mod.rs +++ b/crates/matrix-sdk-base/src/response_processors/mod.rs @@ -16,9 +16,12 @@ pub mod account_data; pub mod changes; #[cfg(feature = "e2e-encryption")] pub mod e2ee; +pub mod ephemeral_events; #[cfg(feature = "e2e-encryption")] pub mod latest_event; +pub mod notification; pub mod profiles; +pub mod room; pub mod state_events; pub mod timeline; #[cfg(feature = "e2e-encryption")] @@ -33,17 +36,15 @@ use crate::{RoomInfoNotableUpdateReasons, StateChanges}; type RoomInfoNotableUpdates = BTreeMap; #[cfg_attr(test, derive(Clone))] +#[derive(Default)] pub(crate) struct Context { pub(super) state_changes: StateChanges, pub(super) room_info_notable_updates: RoomInfoNotableUpdates, } impl Context { - pub fn new( - state_changes: StateChanges, - room_info_notable_updates: RoomInfoNotableUpdates, - ) -> Self { - Self { state_changes, room_info_notable_updates } + pub fn new(state_changes: StateChanges) -> Self { + Self { state_changes, room_info_notable_updates: Default::default() } } pub fn into_parts(self) -> (StateChanges, RoomInfoNotableUpdates) { diff --git a/crates/matrix-sdk-base/src/response_processors/notification.rs b/crates/matrix-sdk-base/src/response_processors/notification.rs new file mode 100644 index 000000000..12c8a0e8e --- /dev/null +++ b/crates/matrix-sdk-base/src/response_processors/notification.rs @@ -0,0 +1,81 @@ +// Copyright 2025 The Matrix.org Foundation C.I.C. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::collections::BTreeMap; + +use ruma::{ + push::{Action, PushConditionRoomCtx, Ruleset}, + serde::Raw, + OwnedRoomId, RoomId, +}; + +use crate::{ + deserialized_responses::RawAnySyncOrStrippedTimelineEvent, store::BaseStateStore, sync, +}; + +/// A classical set of data used by some processors dealing with notifications +/// and push rules. +pub struct Notification<'a> { + pub push_rules: &'a Ruleset, + pub notifications: &'a mut BTreeMap>, + pub state_store: &'a BaseStateStore, +} + +impl<'a> Notification<'a> { + pub fn new( + push_rules: &'a Ruleset, + notifications: &'a mut BTreeMap>, + state_store: &'a BaseStateStore, + ) -> Self { + Self { push_rules, notifications, state_store } + } + + fn push_notification( + &mut self, + room_id: &RoomId, + actions: Vec, + event: RawAnySyncOrStrippedTimelineEvent, + ) { + self.notifications + .entry(room_id.to_owned()) + .or_default() + .push(sync::Notification { actions, event }); + } + + /// Push a new [`sync::Notification`] in [`Self::notifications`] from + /// `event` if and only if `predicate` returns `true` for at least one of + /// the [`Action`]s associated to this event and this `push_context` + /// (based on `Self::push_rules`). + /// + /// This method returns the fetched [`Action`]s. + pub fn push_notification_from_event_if( + &mut self, + room_id: &RoomId, + push_context: &PushConditionRoomCtx, + event: &Raw, + predicate: P, + ) -> &[Action] + where + Raw: Into, + P: Fn(&Action) -> bool, + { + let actions = self.push_rules.get_actions(event, push_context); + + if actions.iter().any(predicate) { + self.push_notification(room_id, actions.to_owned(), event.clone().into()); + } + + actions + } +} diff --git a/crates/matrix-sdk-base/src/response_processors/room/mod.rs b/crates/matrix-sdk-base/src/response_processors/room/mod.rs new file mode 100644 index 000000000..a3cbd914f --- /dev/null +++ b/crates/matrix-sdk-base/src/response_processors/room/mod.rs @@ -0,0 +1,45 @@ +// Copyright 2025 The Matrix.org Foundation C.I.C. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use ruma::RoomId; +use tokio::sync::broadcast::Sender; + +use crate::{store::ambiguity_map::AmbiguityCache, RequestedRequiredStates, RoomInfoNotableUpdate}; + +pub mod msc4186; +pub mod sync_v2; + +/// A classical set of data used by some processors in this module. +pub struct RoomCreationData<'a> { + room_id: &'a RoomId, + room_info_notable_update_sender: Sender, + requested_required_states: &'a RequestedRequiredStates, + ambiguity_cache: &'a mut AmbiguityCache, +} + +impl<'a> RoomCreationData<'a> { + pub fn new( + room_id: &'a RoomId, + room_info_notable_update_sender: Sender, + requested_required_states: &'a RequestedRequiredStates, + ambiguity_cache: &'a mut AmbiguityCache, + ) -> Self { + Self { + room_id, + room_info_notable_update_sender, + requested_required_states, + ambiguity_cache, + } + } +} diff --git a/crates/matrix-sdk-base/src/response_processors/room/msc4186/extensions.rs b/crates/matrix-sdk-base/src/response_processors/room/msc4186/extensions.rs new file mode 100644 index 000000000..8d13f959a --- /dev/null +++ b/crates/matrix-sdk-base/src/response_processors/room/msc4186/extensions.rs @@ -0,0 +1,80 @@ +// Copyright 2025 The Matrix.org Foundation C.I.C. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::collections::BTreeMap; + +use ruma::{api::client::sync::sync_events::v5 as http, OwnedRoomId}; + +use super::super::super::{ + account_data::for_room as account_data_for_room, ephemeral_events::dispatch_receipt, Context, +}; +use crate::{ + store::BaseStateStore, + sync::{JoinedRoomUpdate, RoomUpdates}, + RoomState, +}; + +pub fn dispatch_ephemeral_events( + context: &mut Context, + receipts: &http::response::Receipts, + typing: &http::response::Typing, + joined_room_updates: &mut BTreeMap, +) { + for (room_id, raw) in &receipts.rooms { + dispatch_receipt(context, raw.cast_ref(), room_id); + + joined_room_updates + .entry(room_id.to_owned()) + .or_default() + .ephemeral + .push(raw.clone().cast()); + } + + for (room_id, raw) in &typing.rooms { + joined_room_updates + .entry(room_id.to_owned()) + .or_default() + .ephemeral + .push(raw.clone().cast()); + } +} + +pub async fn room_account_data( + context: &mut Context, + account_data: &http::response::AccountData, + room_updates: &mut RoomUpdates, + state_store: &BaseStateStore, +) { + for (room_id, raw) in &account_data.rooms { + account_data_for_room(context, room_id, raw, state_store).await; + + if let Some(room) = state_store.room(room_id) { + match room.state() { + RoomState::Joined => room_updates + .joined + .entry(room_id.to_owned()) + .or_default() + .account_data + .append(&mut raw.to_vec()), + RoomState::Left | RoomState::Banned => room_updates + .left + .entry(room_id.to_owned()) + .or_default() + .account_data + .append(&mut raw.to_vec()), + RoomState::Invited | RoomState::Knocked => {} + } + } + } +} diff --git a/crates/matrix-sdk-base/src/response_processors/room/msc4186/mod.rs b/crates/matrix-sdk-base/src/response_processors/room/msc4186/mod.rs new file mode 100644 index 000000000..a07f795d0 --- /dev/null +++ b/crates/matrix-sdk-base/src/response_processors/room/msc4186/mod.rs @@ -0,0 +1,559 @@ +// Copyright 2025 The Matrix.org Foundation C.I.C. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +pub mod extensions; + +use std::collections::BTreeMap; + +#[cfg(feature = "e2e-encryption")] +use matrix_sdk_common::deserialized_responses::TimelineEvent; +#[cfg(feature = "e2e-encryption")] +use ruma::events::StateEventType; +use ruma::{ + api::client::sync::sync_events::{ + v3::{InviteState, InvitedRoom, KnockState, KnockedRoom}, + v5 as http, + }, + assign, + events::{ + room::member::{MembershipState, RoomMemberEventContent}, + AnyRoomAccountDataEvent, AnyStrippedStateEvent, AnySyncStateEvent, + }, + serde::Raw, + JsOption, OwnedRoomId, RoomId, UserId, +}; +use tokio::sync::broadcast::Sender; + +#[cfg(feature = "e2e-encryption")] +use super::super::e2ee; +use super::{ + super::{notification, state_events, timeline, Context}, + RoomCreationData, +}; +#[cfg(feature = "e2e-encryption")] +use crate::StateChanges; +use crate::{ + store::BaseStateStore, + sync::{InvitedRoomUpdate, JoinedRoomUpdate, KnockedRoomUpdate, LeftRoomUpdate}, + Result, Room, RoomHero, RoomInfo, RoomInfoNotableUpdate, RoomInfoNotableUpdateReasons, + RoomState, +}; + +/// Represent any kind of room updates. +pub enum RoomUpdateKind { + Joined(JoinedRoomUpdate), + Left(LeftRoomUpdate), + Invited(InvitedRoomUpdate), + Knocked(KnockedRoomUpdate), +} + +pub async fn update_any_room( + context: &mut Context, + user_id: &UserId, + room_creation_data: RoomCreationData<'_>, + room_response: &http::response::Room, + rooms_account_data: &BTreeMap>>, + #[cfg(feature = "e2e-encryption")] e2ee: e2ee::E2EE<'_>, + notification: notification::Notification<'_>, +) -> Result> { + let RoomCreationData { + room_id, + room_info_notable_update_sender, + requested_required_states, + ambiguity_cache, + } = room_creation_data; + + // Read state events from the `required_state` field. + // + // Don't read state events from the `timeline` field, because they might be + // incomplete or staled already. We must only read state events from + // `required_state`. + let (raw_state_events, state_events) = + state_events::sync::collect(context, &room_response.required_state); + + let state_store = notification.state_store; + + // Find or create the room in the store + let is_new_room = !state_store.room_exists(room_id); + + let invite_state_events = room_response + .invite_state + .as_ref() + .map(|events| state_events::stripped::collect(context, events)); + + #[allow(unused_mut)] // Required for some feature flag combinations + let (mut room, mut room_info, maybe_room_update_kind) = membership( + context, + &state_events, + &invite_state_events, + state_store, + user_id, + room_id, + room_info_notable_update_sender, + ); + + room_info.mark_state_partially_synced(); + room_info.handle_encryption_state(requested_required_states.for_room(room_id)); + + #[cfg_attr(not(feature = "e2e-encryption"), allow(unused))] + let new_user_ids = state_events::sync::dispatch_and_get_new_users( + context, + (&raw_state_events, &state_events), + &mut room_info, + ambiguity_cache, + ) + .await?; + + // This will be used for both invited and knocked rooms. + if let Some((raw_events, events)) = invite_state_events { + state_events::stripped::dispatch_invite_or_knock( + context, + (&raw_events, &events), + &room, + &mut room_info, + notification::Notification::new( + notification.push_rules, + notification.notifications, + notification.state_store, + ), + ) + .await?; + } + + properties(context, room_id, room_response, &mut room_info, is_new_room); + + let timeline = timeline::build( + context, + &room, + &mut room_info, + timeline::builder::Timeline::from(room_response), + notification, + #[cfg(feature = "e2e-encryption")] + e2ee.clone(), + ) + .await?; + + // Cache the latest decrypted event in room_info, and also keep any later + // encrypted events, so we can slot them in when we get the keys. + #[cfg(feature = "e2e-encryption")] + cache_latest_events( + &room, + &mut room_info, + &timeline.events, + Some(&context.state_changes), + Some(state_store), + ) + .await; + + #[cfg(feature = "e2e-encryption")] + e2ee::tracked_users::update_or_set_if_room_is_newly_encrypted( + context, + e2ee.olm_machine, + &new_user_ids, + room_info.encryption_state(), + room.encryption_state(), + room_id, + state_store, + ) + .await?; + + let notification_count = room_response.unread_notifications.clone().into(); + room_info.update_notification_count(notification_count); + + let ambiguity_changes = ambiguity_cache.changes.remove(room_id).unwrap_or_default(); + let room_account_data = rooms_account_data.get(room_id); + + match (room_info.state(), maybe_room_update_kind) { + (RoomState::Joined, None) => { + // Ephemeral events are added separately, because we might not + // have a room subsection in the response, yet we may have receipts for + // that room. + let ephemeral = Vec::new(); + + Ok(Some(( + room_info, + RoomUpdateKind::Joined(JoinedRoomUpdate::new( + timeline, + raw_state_events, + room_account_data.cloned().unwrap_or_default(), + ephemeral, + notification_count, + ambiguity_changes, + )), + ))) + } + + (RoomState::Left, None) | (RoomState::Banned, None) => Ok(Some(( + room_info, + RoomUpdateKind::Left(LeftRoomUpdate::new( + timeline, + raw_state_events, + room_account_data.cloned().unwrap_or_default(), + ambiguity_changes, + )), + ))), + + (RoomState::Invited, Some(update @ RoomUpdateKind::Invited(_))) + | (RoomState::Knocked, Some(update @ RoomUpdateKind::Knocked(_))) => { + Ok(Some((room_info, update))) + } + + _ => Ok(None), + } +} + +/// Look through the sliding sync data for this room, find/create it in the +/// store, and process any invite information. +/// +/// If there is any invite state events, the room can be considered an invited +/// or knocked room, depending of the membership event (if any). +fn membership( + context: &mut Context, + state_events: &[AnySyncStateEvent], + invite_state_events: &Option<(Vec>, Vec)>, + store: &BaseStateStore, + user_id: &UserId, + room_id: &RoomId, + room_info_notable_update_sender: Sender, +) -> (Room, RoomInfo, Option) { + // There are invite state events. It means the room can be: + // + // 1. either an invited room, + // 2. or a knocked room. + // + // Let's find out. + if let Some(state_events) = invite_state_events { + // We need to find the membership event since it could be for either an invited + // or knocked room. + let membership_event = state_events.1.iter().find_map(|event| { + if let AnyStrippedStateEvent::RoomMember(membership_event) = event { + if membership_event.state_key == user_id { + return Some(membership_event.content.clone()); + } + } + None + }); + + match membership_event { + // There is a membership event indicating it's a knocked room. + Some(RoomMemberEventContent { membership: MembershipState::Knock, .. }) => { + let room = store.get_or_create_room( + room_id, + RoomState::Knocked, + room_info_notable_update_sender, + ); + let mut room_info = room.clone_info(); + // Override the room state if the room already exists. + room_info.mark_as_knocked(); + + let raw_events = state_events.0.clone(); + let knock_state = assign!(KnockState::default(), { events: raw_events }); + let knocked_room = assign!(KnockedRoom::default(), { knock_state: knock_state }); + + (room, room_info, Some(RoomUpdateKind::Knocked(knocked_room))) + } + + // Otherwise, assume it's an invited room because there are invite state events. + _ => { + let room = store.get_or_create_room( + room_id, + RoomState::Invited, + room_info_notable_update_sender, + ); + let mut room_info = room.clone_info(); + // Override the room state if the room already exists. + room_info.mark_as_invited(); + + let raw_events = state_events.0.clone(); + let invited_room = InvitedRoom::from(InviteState::from(raw_events)); + + (room, room_info, Some(RoomUpdateKind::Invited(invited_room))) + } + } + } + // No invite state events. We assume this is a joined room for the moment. See this block to + // learn more. + else { + let room = + store.get_or_create_room(room_id, RoomState::Joined, room_info_notable_update_sender); + let mut room_info = room.clone_info(); + + // We default to considering this room joined if it's not an invite. If it's + // actually left (and we remembered to request membership events in + // our sync request), then we can find this out from the events in + // required_state by calling handle_own_room_membership. + room_info.mark_as_joined(); + + // We don't need to do this in a v2 sync, because the membership of a room can + // be figured out by whether the room is in the "join", "leave" etc. + // property. In sliding sync we only have invite_state, + // required_state and timeline, so we must process required_state and timeline + // looking for relevant membership events. + own_membership(context, user_id, state_events, &mut room_info); + + (room, room_info, None) + } +} + +/// Find any `m.room.member` events that refer to the current user, and update +/// the state in room_info to reflect the "membership" property. +fn own_membership( + context: &mut Context, + user_id: &UserId, + state_events: &[AnySyncStateEvent], + room_info: &mut RoomInfo, +) { + // Start from the last event; the first membership event we see in that order is + // the last in the regular order, so that's the only one we need to + // consider. + for event in state_events.iter().rev() { + if let AnySyncStateEvent::RoomMember(member) = &event { + // If this event updates the current user's membership, record that in the + // room_info. + if member.state_key() == user_id.as_str() { + let new_state: RoomState = member.membership().into(); + + if new_state != room_info.state() { + room_info.set_state(new_state); + // Update an existing notable update entry or create a new one + context + .room_info_notable_updates + .entry(room_info.room_id.to_owned()) + .or_default() + .insert(RoomInfoNotableUpdateReasons::MEMBERSHIP); + } + + break; + } + } + } +} + +fn properties( + context: &mut Context, + room_id: &RoomId, + room_response: &http::response::Room, + room_info: &mut RoomInfo, + is_new_room: bool, +) { + // Handle the room's avatar. + // + // It can be updated via the state events, or via the + // [`http::ResponseRoom::avatar`] field. This part of the code handles the + // latter case. The former case is handled by [`BaseClient::handle_state`]. + match &room_response.avatar { + // A new avatar! + JsOption::Some(avatar_uri) => room_info.update_avatar(Some(avatar_uri.to_owned())), + // Avatar must be removed. + JsOption::Null => room_info.update_avatar(None), + // Nothing to do. + JsOption::Undefined => {} + } + + // Sliding sync doesn't have a room summary, nevertheless it contains the joined + // and invited member counts, in addition to the heroes. + if let Some(count) = room_response.joined_count { + room_info.update_joined_member_count(count.into()); + } + if let Some(count) = room_response.invited_count { + room_info.update_invited_member_count(count.into()); + } + + if let Some(heroes) = &room_response.heroes { + room_info.update_heroes( + heroes + .iter() + .map(|hero| RoomHero { + user_id: hero.user_id.clone(), + display_name: hero.name.clone(), + avatar_url: hero.avatar.clone(), + }) + .collect(), + ); + } + + room_info.set_prev_batch(room_response.prev_batch.as_deref()); + + if room_response.limited { + room_info.mark_members_missing(); + } + + if let Some(recency_stamp) = &room_response.bump_stamp { + let recency_stamp: u64 = (*recency_stamp).into(); + + if room_info.recency_stamp.as_ref() != Some(&recency_stamp) { + room_info.update_recency_stamp(recency_stamp); + + // If it's not a new room, let's emit a `RECENCY_STAMP` update. + // For a new room, the room will appear as new, so we don't care about this + // update. + if !is_new_room { + context + .room_info_notable_updates + .entry(room_id.to_owned()) + .or_default() + .insert(RoomInfoNotableUpdateReasons::RECENCY_STAMP); + } + } + } +} + +/// Find the most recent decrypted event and cache it in the supplied RoomInfo. +/// +/// If any encrypted events are found after that one, store them in the RoomInfo +/// too so we can use them when we get the relevant keys. +/// +/// It is the responsibility of the caller to update the `RoomInfo` instance +/// stored in the `Room`. +#[cfg(feature = "e2e-encryption")] +pub(crate) async fn cache_latest_events( + room: &Room, + room_info: &mut RoomInfo, + events: &[TimelineEvent], + changes: Option<&StateChanges>, + store: Option<&BaseStateStore>, +) { + use tracing::warn; + + use crate::{ + deserialized_responses::DisplayName, + latest_event::{is_suitable_for_latest_event, LatestEvent, PossibleLatestEvent}, + store::ambiguity_map::is_display_name_ambiguous, + }; + + let mut encrypted_events = + Vec::with_capacity(room.latest_encrypted_events.read().unwrap().capacity()); + + // Try to get room power levels from the current changes + let power_levels_from_changes = || { + let state_changes = changes?.state.get(room_info.room_id())?; + let room_power_levels_state = + state_changes.get(&StateEventType::RoomPowerLevels)?.values().next()?; + match room_power_levels_state.deserialize().ok()? { + AnySyncStateEvent::RoomPowerLevels(ev) => Some(ev.power_levels()), + _ => None, + } + }; + + // If we didn't get any info, try getting it from local data + let power_levels = match power_levels_from_changes() { + Some(power_levels) => Some(power_levels), + None => room.power_levels().await.ok(), + }; + + let power_levels_info = Some(room.own_user_id()).zip(power_levels.as_ref()); + + for event in events.iter().rev() { + if let Ok(timeline_event) = event.raw().deserialize() { + match is_suitable_for_latest_event(&timeline_event, power_levels_info) { + PossibleLatestEvent::YesRoomMessage(_) + | PossibleLatestEvent::YesPoll(_) + | PossibleLatestEvent::YesCallInvite(_) + | PossibleLatestEvent::YesCallNotify(_) + | PossibleLatestEvent::YesSticker(_) + | PossibleLatestEvent::YesKnockedStateEvent(_) => { + // We found a suitable latest event. Store it. + + // In order to make the latest event fast to read, we want to keep the + // associated sender in cache. This is a best-effort to gather enough + // information for creating a user profile as fast as possible. If information + // are missing, let's go back on the “slow” path. + + let mut sender_profile = None; + let mut sender_name_is_ambiguous = None; + + // First off, look up the sender's profile from the `StateChanges`, they are + // likely to be the most recent information. + if let Some(changes) = changes { + sender_profile = changes + .profiles + .get(room.room_id()) + .and_then(|profiles_by_user| { + profiles_by_user.get(timeline_event.sender()) + }) + .cloned(); + + if let Some(sender_profile) = sender_profile.as_ref() { + sender_name_is_ambiguous = sender_profile + .as_original() + .and_then(|profile| profile.content.displayname.as_ref()) + .and_then(|display_name| { + let display_name = DisplayName::new(display_name); + + changes.ambiguity_maps.get(room.room_id()).and_then( + |map_for_room| { + map_for_room.get(&display_name).map(|users| { + is_display_name_ambiguous(&display_name, users) + }) + }, + ) + }); + } + } + + // Otherwise, look up the sender's profile from the `Store`. + if sender_profile.is_none() { + if let Some(store) = store { + sender_profile = store + .get_profile(room.room_id(), timeline_event.sender()) + .await + .ok() + .flatten(); + + // TODO: need to update `sender_name_is_ambiguous`, + // but how? + } + } + + let latest_event = Box::new(LatestEvent::new_with_sender_details( + event.clone(), + sender_profile, + sender_name_is_ambiguous, + )); + + // Store it in the return RoomInfo (it will be saved for us in the room later). + room_info.latest_event = Some(latest_event); + // We don't need any of the older encrypted events because we have a new + // decrypted one. + room.latest_encrypted_events.write().unwrap().clear(); + // We can stop looking through the timeline now because everything else is + // older. + break; + } + PossibleLatestEvent::NoEncrypted => { + // m.room.encrypted - this might be the latest event later - we can't tell until + // we are able to decrypt it, so store it for now + // + // Check how many encrypted events we have seen. Only store another if we + // haven't already stored the maximum number. + if encrypted_events.len() < encrypted_events.capacity() { + encrypted_events.push(event.raw().clone()); + } + } + _ => { + // Ignore unsuitable events + } + } + } else { + warn!( + "Failed to deserialize event as AnySyncTimelineEvent. ID={}", + event.event_id().expect("Event has no ID!") + ); + } + } + + // Push the encrypted events we found into the Room, in reverse order, so + // the latest is last + room.latest_encrypted_events.write().unwrap().extend(encrypted_events.into_iter().rev()); +} diff --git a/crates/matrix-sdk-base/src/response_processors/room/sync_v2.rs b/crates/matrix-sdk-base/src/response_processors/room/sync_v2.rs new file mode 100644 index 000000000..4f3ac1c52 --- /dev/null +++ b/crates/matrix-sdk-base/src/response_processors/room/sync_v2.rs @@ -0,0 +1,298 @@ +// Copyright 2025 The Matrix.org Foundation C.I.C. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::collections::{BTreeMap, BTreeSet}; + +use ruma::{ + api::client::sync::sync_events::v3::{InvitedRoom, JoinedRoom, KnockedRoom, LeftRoom}, + OwnedRoomId, OwnedUserId, RoomId, +}; +use tokio::sync::broadcast::Sender; + +#[cfg(feature = "e2e-encryption")] +use super::super::e2ee; +use super::{ + super::{account_data, ephemeral_events, notification, state_events, timeline, Context}, + RoomCreationData, +}; +use crate::{ + sync::{InvitedRoomUpdate, JoinedRoomUpdate, KnockedRoomUpdate, LeftRoomUpdate}, + Result, RoomInfoNotableUpdate, RoomState, +}; + +/// Process updates of a joined room. +#[allow(clippy::too_many_arguments)] +pub async fn update_joined_room( + context: &mut Context, + room_creation_data: RoomCreationData<'_>, + joined_room: JoinedRoom, + updated_members_in_room: &mut BTreeMap>, + notification: notification::Notification<'_>, + #[cfg(feature = "e2e-encryption")] e2ee: e2ee::E2EE<'_>, +) -> Result { + let RoomCreationData { + room_id, + room_info_notable_update_sender, + requested_required_states, + ambiguity_cache, + } = room_creation_data; + + let state_store = notification.state_store; + + let room = + state_store.get_or_create_room(room_id, RoomState::Joined, room_info_notable_update_sender); + + let mut room_info = room.clone_info(); + + room_info.mark_as_joined(); + room_info.update_from_ruma_summary(&joined_room.summary); + room_info.set_prev_batch(joined_room.timeline.prev_batch.as_deref()); + room_info.mark_state_fully_synced(); + room_info.handle_encryption_state(requested_required_states.for_room(room_id)); + + let (raw_state_events, state_events) = + state_events::sync::collect(context, &joined_room.state.events); + + let mut new_user_ids = state_events::sync::dispatch_and_get_new_users( + context, + (&raw_state_events, &state_events), + &mut room_info, + ambiguity_cache, + ) + .await?; + + ephemeral_events::dispatch(context, &joined_room.ephemeral.events, room_id); + + if joined_room.timeline.limited { + room_info.mark_members_missing(); + } + + let (raw_state_events_from_timeline, state_events_from_timeline) = + state_events::sync::collect_from_timeline(context, &joined_room.timeline.events); + + let mut other_new_user_ids = state_events::sync::dispatch_and_get_new_users( + context, + (&raw_state_events_from_timeline, &state_events_from_timeline), + &mut room_info, + ambiguity_cache, + ) + .await?; + new_user_ids.append(&mut other_new_user_ids); + updated_members_in_room.insert(room_id.to_owned(), new_user_ids.clone()); + + #[cfg(feature = "e2e-encryption")] + let olm_machine = e2ee.olm_machine; + + let timeline = timeline::build( + context, + &room, + &mut room_info, + timeline::builder::Timeline::from(joined_room.timeline), + notification, + #[cfg(feature = "e2e-encryption")] + e2ee, + ) + .await?; + + // Save the new `RoomInfo`. + context.state_changes.add_room(room_info); + + account_data::for_room(context, room_id, &joined_room.account_data.events, state_store).await; + + // `processors::account_data::from_room` might have updated the `RoomInfo`. + // Let's fetch it again. + // + // SAFETY: `expect` is safe because the `RoomInfo` has been inserted 2 lines + // above. + let mut room_info = context + .state_changes + .room_infos + .get(room_id) + .expect("`RoomInfo` must exist in `StateChanges` at this point") + .clone(); + + #[cfg(feature = "e2e-encryption")] + e2ee::tracked_users::update_or_set_if_room_is_newly_encrypted( + context, + olm_machine, + &new_user_ids, + room_info.encryption_state(), + room.encryption_state(), + room_id, + state_store, + ) + .await?; + + let notification_count = joined_room.unread_notifications.into(); + room_info.update_notification_count(notification_count); + + context.state_changes.add_room(room_info); + + Ok(JoinedRoomUpdate::new( + timeline, + joined_room.state.events, + joined_room.account_data.events, + joined_room.ephemeral.events, + notification_count, + ambiguity_cache.changes.remove(room_id).unwrap_or_default(), + )) +} + +/// Process historical updates of a left room. +#[allow(clippy::too_many_arguments)] +pub async fn update_left_room( + context: &mut Context, + room_creation_data: RoomCreationData<'_>, + left_room: LeftRoom, + notification: notification::Notification<'_>, + #[cfg(feature = "e2e-encryption")] e2ee: e2ee::E2EE<'_>, +) -> Result { + let RoomCreationData { + room_id, + room_info_notable_update_sender, + requested_required_states, + ambiguity_cache, + } = room_creation_data; + + let state_store = notification.state_store; + + let room = + state_store.get_or_create_room(room_id, RoomState::Left, room_info_notable_update_sender); + + let mut room_info = room.clone_info(); + room_info.mark_as_left(); + room_info.mark_state_partially_synced(); + room_info.handle_encryption_state(requested_required_states.for_room(room_id)); + + let (raw_state_events, state_events) = + state_events::sync::collect(context, &left_room.state.events); + + let _ = state_events::sync::dispatch_and_get_new_users( + context, + (&raw_state_events, &state_events), + &mut room_info, + ambiguity_cache, + ) + .await?; + + let (raw_state_events_from_timeline, state_events_from_timeline) = + state_events::sync::collect_from_timeline(context, &left_room.timeline.events); + + let _ = state_events::sync::dispatch_and_get_new_users( + context, + (&raw_state_events_from_timeline, &state_events_from_timeline), + &mut room_info, + ambiguity_cache, + ) + .await?; + + let timeline = timeline::build( + context, + &room, + &mut room_info, + timeline::builder::Timeline::from(left_room.timeline), + notification, + #[cfg(feature = "e2e-encryption")] + e2ee, + ) + .await?; + + // Save the new `RoomInfo`. + context.state_changes.add_room(room_info); + + account_data::for_room(context, room_id, &left_room.account_data.events, state_store).await; + + let ambiguity_changes = ambiguity_cache.changes.remove(room_id).unwrap_or_default(); + + Ok(LeftRoomUpdate::new( + timeline, + left_room.state.events, + left_room.account_data.events, + ambiguity_changes, + )) +} + +/// Process updates of an invited room. +pub async fn update_invited_room( + context: &mut Context, + room_id: &RoomId, + invited_room: InvitedRoom, + room_info_notable_update_sender: Sender, + notification: notification::Notification<'_>, +) -> Result { + let state_store = notification.state_store; + + let room = state_store.get_or_create_room( + room_id, + RoomState::Invited, + room_info_notable_update_sender, + ); + + let (raw_events, events) = + state_events::stripped::collect(context, &invited_room.invite_state.events); + + let mut room_info = room.clone_info(); + room_info.mark_as_invited(); + room_info.mark_state_fully_synced(); + + state_events::stripped::dispatch_invite_or_knock( + context, + (&raw_events, &events), + &room, + &mut room_info, + notification, + ) + .await?; + + context.state_changes.add_room(room_info); + + Ok(invited_room) +} + +/// Process updates of a knocked room. +pub async fn update_knocked_room( + context: &mut Context, + room_id: &RoomId, + knocked_room: KnockedRoom, + room_info_notable_update_sender: Sender, + notification: notification::Notification<'_>, +) -> Result { + let state_store = notification.state_store; + + let room = state_store.get_or_create_room( + room_id, + RoomState::Knocked, + room_info_notable_update_sender, + ); + + let (raw_events, events) = + state_events::stripped::collect(context, &knocked_room.knock_state.events); + + let mut room_info = room.clone_info(); + room_info.mark_as_knocked(); + room_info.mark_state_fully_synced(); + + state_events::stripped::dispatch_invite_or_knock( + context, + (&raw_events, &events), + &room, + &mut room_info, + notification, + ) + .await?; + + context.state_changes.add_room(room_info); + + Ok(knocked_room) +} diff --git a/crates/matrix-sdk-base/src/response_processors/state_events.rs b/crates/matrix-sdk-base/src/response_processors/state_events.rs index ab8fef7fc..1da00f03a 100644 --- a/crates/matrix-sdk-base/src/response_processors/state_events.rs +++ b/crates/matrix-sdk-base/src/response_processors/state_events.rs @@ -12,30 +12,30 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::{ - collections::{BTreeMap, BTreeSet}, - iter, -}; - -use ruma::{ - events::{room::member::MembershipState, AnySyncStateEvent}, - serde::Raw, - OwnedUserId, -}; +use ruma::{events::AnySyncStateEvent, serde::Raw}; use serde::Deserialize; -use tracing::{instrument, warn}; +use tracing::warn; -use super::{profiles, Context}; -use crate::{ - store::{ambiguity_map::AmbiguityCache, Result as StoreResult}, - RoomInfo, -}; +use super::Context; /// Collect [`AnySyncStateEvent`]. pub mod sync { - use ruma::events::AnySyncTimelineEvent; + use std::{ + collections::{BTreeMap, BTreeSet}, + iter, + }; - use super::{AnySyncStateEvent, Context, Raw}; + use ruma::{ + events::{room::member::MembershipState, AnySyncTimelineEvent}, + OwnedUserId, + }; + use tracing::instrument; + + use super::{super::profiles, AnySyncStateEvent, Context, Raw}; + use crate::{ + store::{ambiguity_map::AmbiguityCache, Result as StoreResult}, + RoomInfo, + }; /// Collect [`AnySyncStateEvent`] to [`AnySyncStateEvent`]. pub fn collect( @@ -61,13 +61,70 @@ pub mod sync { } })) } + + /// Dispatch the sync state events and return the new users for this room. + /// + /// `raw_events` and `events` must be generated from [`collect`]. + /// Events must be exactly the same list of events that are in + /// `raw_events`, but deserialised. We demand them here to avoid + /// deserialising multiple times. + #[instrument(skip_all, fields(room_id = ?room_info.room_id))] + pub async fn dispatch_and_get_new_users( + context: &mut Context, + (raw_events, events): (&[Raw], &[AnySyncStateEvent]), + room_info: &mut RoomInfo, + ambiguity_cache: &mut AmbiguityCache, + ) -> StoreResult> { + let mut user_ids = BTreeSet::new(); + + if raw_events.is_empty() { + return Ok(user_ids); + } + + let mut state_events = BTreeMap::new(); + + for (raw_event, event) in iter::zip(raw_events, events) { + room_info.handle_state_event(event); + + if let AnySyncStateEvent::RoomMember(member) = event { + ambiguity_cache + .handle_event(&context.state_changes, &room_info.room_id, member) + .await?; + + match member.membership() { + MembershipState::Join | MembershipState::Invite => { + user_ids.insert(member.state_key().to_owned()); + } + _ => (), + } + + profiles::upsert_or_delete(context, &room_info.room_id, member); + } + + state_events + .entry(event.event_type()) + .or_insert_with(BTreeMap::new) + .insert(event.state_key().to_owned(), raw_event.clone()); + } + + context.state_changes.state.insert(room_info.room_id.clone(), state_events); + + Ok(user_ids) + } } /// Collect [`AnyStrippedStateEvent`]. pub mod stripped { - use ruma::events::AnyStrippedStateEvent; + use std::{collections::BTreeMap, iter}; - use super::{Context, Raw}; + use ruma::{events::AnyStrippedStateEvent, push::Action}; + use tracing::instrument; + + use super::{ + super::{notification, timeline}, + Context, Raw, + }; + use crate::{Result, Room, RoomInfo}; /// Collect [`AnyStrippedStateEvent`] to [`AnyStrippedStateEvent`]. pub fn collect( @@ -76,6 +133,71 @@ pub mod stripped { ) -> (Vec>, Vec) { super::collect(raw_events) } + + /// Dispatch the stripped state events. + /// + /// `raw_events` and `events` must be generated from [`collect`]. + /// Events must be exactly the same list of events that are in + /// `raw_events`, but deserialised. We demand them here to avoid + /// deserialising multiple times. + /// + /// Dispatch the stripped state events in `invite_state` or `knock_state`, + /// modifying the room's info and posting notifications as needed. + /// + /// * `raw_events` and `events` - The contents of `invite_state` in the form + /// of list of pairs of raw stripped state events with their deserialized + /// counterpart. + /// * `room` - The [`Room`] to modify. + /// * `room_info` - The current room's info. + /// * `push_rules` - The push rules for this room. + /// * `changes` - The accumulated list of changes to apply once the + /// processing is finished. + /// * `notifications` - Notifications to post for the current room. + /// * `state_store` — The state store. + #[instrument(skip_all, fields(room_id = ?room_info.room_id))] + pub(crate) async fn dispatch_invite_or_knock( + context: &mut Context, + (raw_events, events): (&[Raw], &[AnyStrippedStateEvent]), + room: &Room, + room_info: &mut RoomInfo, + mut notification: notification::Notification<'_>, + ) -> Result<()> { + let mut state_events = BTreeMap::new(); + + for (raw_event, event) in iter::zip(raw_events, events) { + room_info.handle_stripped_state_event(event); + state_events + .entry(event.event_type()) + .or_insert_with(BTreeMap::new) + .insert(event.state_key().to_owned(), raw_event.clone()); + } + + context + .state_changes + .stripped_state + .insert(room_info.room_id().to_owned(), state_events.clone()); + + // We need to check for notifications after we have handled all state + // events, to make sure we have the full push context. + if let Some(push_context) = + timeline::get_push_room_context(context, room, room_info, notification.state_store) + .await? + { + let room_id = room.room_id(); + + // Check every event again for notification. + for event in state_events.values().flat_map(|map| map.values()) { + notification.push_notification_from_event_if( + room_id, + &push_context, + event, + Action::should_notify, + ); + } + } + + Ok(()) + } } fn collect<'a, I, T>(raw_events: I) -> (Vec>, Vec) @@ -94,52 +216,3 @@ where }) .unzip() } - -/// Dispatch the state events and return the new users for this room. -/// -/// `raw_events` and `events` must be generated from [`collect_sync`]. Events -/// must be exactly the same list of events that are in raw_events, but -/// deserialised. We demand them here to avoid deserialising multiple times. -#[instrument(skip_all, fields(room_id = ?room_info.room_id))] -pub async fn dispatch_and_get_new_users( - context: &mut Context, - (raw_events, events): (&[Raw], &[AnySyncStateEvent]), - room_info: &mut RoomInfo, - ambiguity_cache: &mut AmbiguityCache, -) -> StoreResult> { - let mut user_ids = BTreeSet::new(); - - if raw_events.is_empty() { - return Ok(user_ids); - } - - let mut state_events = BTreeMap::new(); - - for (raw_event, event) in iter::zip(raw_events, events) { - room_info.handle_state_event(event); - - if let AnySyncStateEvent::RoomMember(member) = event { - ambiguity_cache - .handle_event(&context.state_changes, &room_info.room_id, member) - .await?; - - match member.membership() { - MembershipState::Join | MembershipState::Invite => { - user_ids.insert(member.state_key().to_owned()); - } - _ => (), - } - - profiles::upsert_or_delete(context, &room_info.room_id, member); - } - - state_events - .entry(event.event_type()) - .or_insert_with(BTreeMap::new) - .insert(event.state_key().to_owned(), raw_event.clone()); - } - - context.state_changes.state.insert(room_info.room_id.clone(), state_events); - - Ok(user_ids) -} diff --git a/crates/matrix-sdk-base/src/response_processors/timeline.rs b/crates/matrix-sdk-base/src/response_processors/timeline.rs index bf932fb30..7eefb6d3d 100644 --- a/crates/matrix-sdk-base/src/response_processors/timeline.rs +++ b/crates/matrix-sdk-base/src/response_processors/timeline.rs @@ -28,13 +28,12 @@ use ruma::{ }; use tracing::{instrument, trace, warn}; -use super::Context; #[cfg(feature = "e2e-encryption")] use super::{e2ee, verification}; +use super::{notification, Context}; use crate::{ - deserialized_responses::RawAnySyncOrStrippedTimelineEvent, store::{BaseStateStore, StateStoreExt as _}, - sync::{Notification, Timeline}, + sync::Timeline, Result, Room, RoomInfo, }; @@ -51,12 +50,12 @@ pub async fn build<'notification, 'e2ee>( room: &Room, room_info: &mut RoomInfo, timeline_inputs: builder::Timeline, - notification_inputs: builder::Notification<'notification>, - #[cfg(feature = "e2e-encryption")] e2ee: builder::E2EE<'e2ee>, + mut notification: notification::Notification<'notification>, + #[cfg(feature = "e2e-encryption")] e2ee: e2ee::E2EE<'e2ee>, ) -> Result { let mut timeline = Timeline::new(timeline_inputs.limited, timeline_inputs.prev_batch); let mut push_context = - get_push_room_context(context, room, room_info, notification_inputs.state_store).await?; + get_push_room_context(context, room, room_info, notification.state_store).await?; let room_id = room.room_id(); for raw_event in timeline_inputs.raw_events { @@ -101,11 +100,9 @@ pub async fn build<'notification, 'e2ee>( if let Some(decrypted_timeline_event) = Box::pin(e2ee::decrypt::sync_timeline_event( context, - e2ee.olm_machine, + e2ee.clone(), timeline_event.raw(), room_id, - e2ee.decryption_trust_requirement, - e2ee.verification_is_allowed, )) .await? { @@ -117,8 +114,7 @@ pub async fn build<'notification, 'e2ee>( Box::pin(verification::process_if_relevant( context, &sync_timeline_event, - e2ee.verification_is_allowed, - e2ee.olm_machine, + e2ee.clone(), room_id, )) .await?; @@ -134,31 +130,18 @@ pub async fn build<'notification, 'e2ee>( if let Some(push_context) = &mut push_context { update_push_room_context(context, push_context, room.own_user_id(), room_info) } else { - push_context = get_push_room_context( - context, - room, - room_info, - notification_inputs.state_store, - ) - .await?; + push_context = + get_push_room_context(context, room, room_info, notification.state_store) + .await?; } - if let Some(context) = &push_context { - let actions = - notification_inputs.push_rules.get_actions(timeline_event.raw(), context); - - if actions.iter().any(Action::should_notify) { - notification_inputs - .notifications - .entry(room_id.to_owned()) - .or_default() - .push(Notification { - actions: actions.to_owned(), - event: RawAnySyncOrStrippedTimelineEvent::Sync( - timeline_event.raw().clone(), - ), - }); - } + if let Some(push_context) = &push_context { + let actions = notification.push_notification_from_event_if( + room_id, + push_context, + timeline_event.raw(), + Action::should_notify, + ); timeline_event.push_actions = Some(actions.to_owned()); } @@ -178,20 +161,12 @@ pub async fn build<'notification, 'e2ee>( /// Set of types used by [`build`] to reduce the number of arguments by grouping /// them by thematics. pub mod builder { - use std::collections::BTreeMap; - - #[cfg(feature = "e2e-encryption")] - use matrix_sdk_crypto::{OlmMachine, TrustRequirement}; use ruma::{ api::client::sync::sync_events::{v3, v5}, events::AnySyncTimelineEvent, - push::Ruleset, serde::Raw, - OwnedRoomId, }; - use crate::{store::BaseStateStore, sync}; - pub struct Timeline { pub limited: bool, pub raw_events: Vec>, @@ -213,40 +188,6 @@ pub mod builder { } } } - - pub struct Notification<'a> { - pub push_rules: &'a Ruleset, - pub notifications: &'a mut BTreeMap>, - pub state_store: &'a BaseStateStore, - } - - impl<'a> Notification<'a> { - pub fn new( - push_rules: &'a Ruleset, - notifications: &'a mut BTreeMap>, - state_store: &'a BaseStateStore, - ) -> Self { - Self { push_rules, notifications, state_store } - } - } - - #[cfg(feature = "e2e-encryption")] - pub struct E2EE<'a> { - pub olm_machine: Option<&'a OlmMachine>, - pub decryption_trust_requirement: TrustRequirement, - pub verification_is_allowed: bool, - } - - #[cfg(feature = "e2e-encryption")] - impl<'a> E2EE<'a> { - pub fn new( - olm_machine: Option<&'a OlmMachine>, - decryption_trust_requirement: TrustRequirement, - verification_is_allowed: bool, - ) -> Self { - Self { olm_machine, decryption_trust_requirement, verification_is_allowed } - } - } } /// Update the push context for the given room. diff --git a/crates/matrix-sdk-base/src/response_processors/verification.rs b/crates/matrix-sdk-base/src/response_processors/verification.rs index 7d1871fdf..46d157479 100644 --- a/crates/matrix-sdk-base/src/response_processors/verification.rs +++ b/crates/matrix-sdk-base/src/response_processors/verification.rs @@ -21,7 +21,7 @@ use ruma::{ RoomId, }; -use super::Context; +use super::{e2ee::E2EE, Context}; use crate::Result; /// Process the given event as a verification event if it is a candidate. The @@ -29,8 +29,7 @@ use crate::Result; pub async fn process_if_relevant( context: &mut Context, event: &AnySyncTimelineEvent, - verification_is_allowed: bool, - olm_machine: Option<&OlmMachine>, + e2ee: E2EE<'_>, room_id: &RoomId, ) -> Result<()> { if let AnySyncTimelineEvent::MessageLike(event) = event { @@ -58,7 +57,8 @@ pub async fn process_if_relevant( _ => false, } { - verification(context, verification_is_allowed, olm_machine, event, room_id).await?; + verification(context, e2ee.verification_is_allowed, e2ee.olm_machine, event, room_id) + .await?; } } diff --git a/crates/matrix-sdk-base/src/rooms/normal.rs b/crates/matrix-sdk-base/src/rooms/normal.rs index 9c3faa7a7..4aef408c2 100644 --- a/crates/matrix-sdk-base/src/rooms/normal.rs +++ b/crates/matrix-sdk-base/src/rooms/normal.rs @@ -2579,7 +2579,7 @@ mod tests { .cast(); // When the new tag is handled and applied. - let mut context = processors::Context::new(StateChanges::default(), Default::default()); + let mut context = processors::Context::default(); processors::account_data::for_room(&mut context, room_id, &[tag_raw], &client.state_store) .await; @@ -2675,7 +2675,7 @@ mod tests { .cast(); // When the new tag is handled and applied. - let mut context = processors::Context::new(StateChanges::default(), Default::default()); + let mut context = processors::Context::default(); processors::account_data::for_room(&mut context, room_id, &[tag_raw], &client.state_store) .await; @@ -3261,7 +3261,7 @@ mod tests { // And I provide a decrypted event to replace the encrypted one, let event = make_latest_event("$A"); - let mut context = processors::Context::new(StateChanges::default(), Default::default()); + let mut context = processors::Context::default(); room.on_latest_event_decrypted( event.clone(), 0, diff --git a/crates/matrix-sdk-base/src/sliding_sync.rs b/crates/matrix-sdk-base/src/sliding_sync.rs index 95e38ba19..e781ad840 100644 --- a/crates/matrix-sdk-base/src/sliding_sync.rs +++ b/crates/matrix-sdk-base/src/sliding_sync.rs @@ -14,45 +14,23 @@ //! Extend `BaseClient` with capabilities to handle MSC4186. -use std::collections::BTreeMap; - +use ruma::api::client::sync::sync_events::v5 as http; #[cfg(feature = "e2e-encryption")] -use matrix_sdk_common::deserialized_responses::TimelineEvent; -#[cfg(feature = "e2e-encryption")] -use ruma::events::AnyToDeviceEvent; -use ruma::{ - api::client::sync::sync_events::{ - v3::{self, InvitedRoom, KnockedRoom}, - v5 as http, - }, - events::{ - room::member::MembershipState, AnyRoomAccountDataEvent, AnyStrippedStateEvent, - AnySyncStateEvent, StateEventType, - }, - serde::Raw, - JsOption, OwnedRoomId, RoomId, UserId, -}; -use tracing::{instrument, trace, warn}; +use ruma::{events::AnyToDeviceEvent, serde::Raw}; +use tracing::{instrument, trace}; use super::BaseClient; -#[cfg(feature = "e2e-encryption")] -use crate::latest_event::{is_suitable_for_latest_event, LatestEvent, PossibleLatestEvent}; use crate::{ error::Result, read_receipts::{compute_unread_counts, PreviousEventsProvider}, response_processors as processors, - rooms::{ - normal::{RoomHero, RoomInfoNotableUpdateReasons}, - RoomState, - }, - ruma::assign, - store::{ambiguity_map::AmbiguityCache, BaseStateStore, StateChanges}, - sync::{JoinedRoomUpdate, LeftRoomUpdate, Notification, RoomUpdates, SyncResponse}, - RequestedRequiredStates, Room, RoomInfo, + rooms::normal::RoomInfoNotableUpdateReasons, + store::ambiguity_map::AmbiguityCache, + sync::{RoomUpdates, SyncResponse}, + RequestedRequiredStates, }; impl BaseClient { - #[cfg(feature = "e2e-encryption")] /// Processes the E2EE-related events from the Sliding Sync response. /// /// In addition to writes to the crypto store, this may also write into the @@ -60,6 +38,7 @@ impl BaseClient { /// store. /// /// Returns whether any change happened. + #[cfg(feature = "e2e-encryption")] pub async fn process_sliding_sync_e2ee( &self, to_device: Option<&http::response::ToDevice>, @@ -80,7 +59,7 @@ impl BaseClient { let olm_machine = self.olm_machine().await; - let mut context = processors::Context::new(StateChanges::default(), Default::default()); + let mut context = processors::Context::default(); let processors::e2ee::to_device::Output { decrypted_to_device_events, room_key_updates } = processors::e2ee::to_device::from_msc4186( @@ -98,9 +77,11 @@ impl BaseClient { .flatten() .filter_map(|room_key_info| self.get_room(&room_key_info.room_id)) .collect(), - olm_machine.as_ref(), - self.decryption_trust_requirement, - self.handle_verification_events, + processors::e2ee::E2EE::new( + olm_machine.as_ref(), + self.decryption_trust_requirement, + self.handle_verification_events, + ), ) .await?; @@ -130,17 +111,7 @@ impl BaseClient { previous_events_provider: &PEP, requested_required_states: &RequestedRequiredStates, ) -> Result { - let http::Response { - // FIXME not yet supported by sliding sync. see - // https://github.com/matrix-org/matrix-rust-sdk/issues/1014 - // next_batch, - rooms, - lists, - extensions, - // FIXME: missing compared to v3::Response - //presence, - .. - } = response; + let http::Response { rooms, lists, extensions, .. } = response; trace!( rooms = rooms.len(), @@ -155,17 +126,17 @@ impl BaseClient { return Ok(SyncResponse::default()); }; - let mut context = processors::Context::new(StateChanges::default(), Default::default()); + let mut context = processors::Context::default(); let state_store = self.state_store.clone(); let mut ambiguity_cache = AmbiguityCache::new(state_store.inner.clone()); let global_account_data_processor = processors::account_data::global(&extensions.account_data.global); + let push_rules = self.get_push_rules(&global_account_data_processor).await?; - let mut new_rooms = RoomUpdates::default(); + let mut room_updates = RoomUpdates::default(); let mut notifications = Default::default(); - let mut rooms_account_data = extensions.account_data.rooms.clone(); let user_id = self .session_meta() @@ -173,107 +144,82 @@ impl BaseClient { .user_id .to_owned(); - for (room_id, response_room_data) in rooms { - let (room_info, joined_room, left_room, invited_room, knocked_room) = self - .process_sliding_sync_room( - &mut context, + for (room_id, room_response) in rooms { + let Some((room_info, room_update)) = processors::room::msc4186::update_any_room( + &mut context, + &user_id, + processors::room::RoomCreationData::new( room_id, - requested_required_states.for_room(room_id), - response_room_data, - &mut rooms_account_data, - &state_store, - &user_id, - &global_account_data_processor, - &mut notifications, + self.room_info_notable_update_sender.clone(), + requested_required_states, &mut ambiguity_cache, - ) - .await?; + ), + room_response, + &extensions.account_data.rooms, + #[cfg(feature = "e2e-encryption")] + processors::e2ee::E2EE::new( + self.olm_machine().await.as_ref(), + self.decryption_trust_requirement, + self.handle_verification_events, + ), + processors::notification::Notification::new( + &push_rules, + &mut notifications, + &self.state_store, + ), + ) + .await? + else { + continue; + }; context.state_changes.add_room(room_info); - if let Some(joined_room) = joined_room { - new_rooms.join.insert(room_id.clone(), joined_room); - } + let room_id = room_id.to_owned(); - if let Some(left_room) = left_room { - new_rooms.leave.insert(room_id.clone(), left_room); - } + use processors::room::msc4186::RoomUpdateKind; - if let Some(invited_room) = invited_room { - new_rooms.invite.insert(room_id.clone(), invited_room); - } - - if let Some(knocked_room) = knocked_room { - new_rooms.knocked.insert(room_id.clone(), knocked_room); + match room_update { + RoomUpdateKind::Joined(joined_room_update) => { + room_updates.joined.insert(room_id, joined_room_update); + } + RoomUpdateKind::Left(left_room_update) => { + room_updates.left.insert(room_id, left_room_update); + } + RoomUpdateKind::Invited(invited_room_update) => { + room_updates.invited.insert(room_id, invited_room_update); + } + RoomUpdateKind::Knocked(knocked_room_update) => { + room_updates.knocked.insert(room_id, knocked_room_update); + } } } // Handle read receipts and typing notifications independently of the rooms: // these both live in a different subsection of the server's response, // so they may exist without any update for the associated room. - - for (room_id, raw) in &extensions.receipts.rooms { - match raw.deserialize() { - Ok(event) => { - context.state_changes.add_receipts(room_id, event.content); - } - Err(e) => { - let event_id: Option = raw.get_field("event_id").ok().flatten(); - #[rustfmt::skip] - warn!( - ?room_id, event_id, - "Failed to deserialize read receipt room event: {e}" - ); - } - } - + processors::room::msc4186::extensions::dispatch_ephemeral_events( + &mut context, + &extensions.receipts, + &extensions.typing, // We assume this can only happen in joined rooms, or something's very wrong. - new_rooms - .join - .entry(room_id.to_owned()) - .or_insert_with(JoinedRoomUpdate::default) - .ephemeral - .push(raw.clone().cast()); - } + &mut room_updates.joined, + ); - for (room_id, raw) in &extensions.typing.rooms { - // We assume this can only happen in joined rooms, or something's very wrong. - new_rooms - .join - .entry(room_id.to_owned()) - .or_insert_with(JoinedRoomUpdate::default) - .ephemeral - .push(raw.clone().cast()); - } - - // Handle room account data - for (room_id, raw) in &rooms_account_data { - processors::account_data::for_room(&mut context, room_id, raw, &self.state_store).await; - - if let Some(room) = self.state_store.room(room_id) { - match room.state() { - RoomState::Joined => new_rooms - .join - .entry(room_id.to_owned()) - .or_insert_with(JoinedRoomUpdate::default) - .account_data - .append(&mut raw.to_vec()), - RoomState::Left | RoomState::Banned => new_rooms - .leave - .entry(room_id.to_owned()) - .or_insert_with(LeftRoomUpdate::default) - .account_data - .append(&mut raw.to_vec()), - RoomState::Invited | RoomState::Knocked => {} - } - } - } + // Handle room account data. + processors::room::msc4186::extensions::room_account_data( + &mut context, + &extensions.account_data, + &mut room_updates, + &self.state_store, + ) + .await; // Rooms in `new_rooms.join` either have a timeline update, or a new read // receipt. Update the read receipt accordingly. let user_id = &self.session_meta().expect("logged in user").user_id; - for (room_id, joined_room_update) in &mut new_rooms.join { + for (room_id, joined_room_update) in &mut room_updates.joined { if let Some(mut room_info) = context .state_changes .room_infos @@ -306,16 +252,6 @@ impl BaseClient { global_account_data_processor.apply(&mut context, &state_store).await; - // FIXME not yet supported by sliding sync. - // changes.presence = presence - // .events - // .iter() - // .filter_map(|e| { - // let event = e.deserialize().ok()?; - // Some((event.sender, e.clone())) - // }) - // .collect(); - context.state_changes.ambiguity_maps = ambiguity_cache.cache; processors::changes::save_and_apply( @@ -331,510 +267,16 @@ impl BaseClient { // live in memory, until the next sync which will saves the room info to // disk; we do this to avoid saving that would be redundant with the // above. Oh well. - new_rooms.update_in_memory_caches(&self.state_store).await; + room_updates.update_in_memory_caches(&self.state_store).await; Ok(SyncResponse { - rooms: new_rooms, + rooms: room_updates, notifications, - // FIXME not yet supported by sliding sync. presence: Default::default(), account_data: extensions.account_data.global.clone(), to_device: Default::default(), }) } - - #[allow(clippy::too_many_arguments)] - async fn process_sliding_sync_room( - &self, - context: &mut processors::Context, - room_id: &RoomId, - requested_required_states: &[(StateEventType, String)], - room_data: &http::response::Room, - rooms_account_data: &mut BTreeMap>>, - state_store: &BaseStateStore, - user_id: &UserId, - global_account_data_processor: &processors::account_data::Global, - notifications: &mut BTreeMap>, - ambiguity_cache: &mut AmbiguityCache, - ) -> Result<( - RoomInfo, - Option, - Option, - Option, - Option, - )> { - // Read state events from the `required_state` field. - // - // Don't read state events from the `timeline` field, because they might be - // incomplete or staled already. We must only read state events from - // `required_state`. - let (raw_state_events, state_events) = - processors::state_events::sync::collect(context, &room_data.required_state); - - // Find or create the room in the store - let is_new_room = !state_store.room_exists(room_id); - - let invite_state_events = room_data - .invite_state - .as_ref() - .map(|events| processors::state_events::stripped::collect(context, events)); - - #[allow(unused_mut)] // Required for some feature flag combinations - let (mut room, mut room_info, invited_room, knocked_room) = self - .process_sliding_sync_room_membership( - context, - &state_events, - &invite_state_events, - state_store, - user_id, - room_id, - ); - - room_info.mark_state_partially_synced(); - room_info.handle_encryption_state(requested_required_states); - - #[cfg_attr(not(feature = "e2e-encryption"), allow(unused))] - let new_user_ids = processors::state_events::dispatch_and_get_new_users( - context, - (&raw_state_events, &state_events), - &mut room_info, - ambiguity_cache, - ) - .await?; - - let push_rules = self.get_push_rules(global_account_data_processor).await?; - - // This will be used for both invited and knocked rooms. - if let Some(invite_state) = invite_state_events { - self.handle_invited_state( - context, - &room, - invite_state, - &push_rules, - &mut room_info, - notifications, - ) - .await?; - } - - process_room_properties(context, room_id, room_data, &mut room_info, is_new_room); - - let timeline = processors::timeline::build( - context, - &room, - &mut room_info, - processors::timeline::builder::Timeline::from(room_data), - processors::timeline::builder::Notification::new( - &push_rules, - notifications, - &self.state_store, - ), - #[cfg(feature = "e2e-encryption")] - processors::timeline::builder::E2EE::new( - self.olm_machine().await.as_ref(), - self.decryption_trust_requirement, - self.handle_verification_events, - ), - ) - .await?; - - // Cache the latest decrypted event in room_info, and also keep any later - // encrypted events, so we can slot them in when we get the keys. - #[cfg(feature = "e2e-encryption")] - cache_latest_events( - &room, - &mut room_info, - &timeline.events, - Some(&context.state_changes), - Some(state_store), - ) - .await; - - #[cfg(feature = "e2e-encryption")] - processors::e2ee::tracked_users::update_or_set_if_room_is_newly_encrypted( - context, - self.olm_machine().await.as_ref(), - &new_user_ids, - room_info.encryption_state(), - room.encryption_state(), - room_id, - state_store, - ) - .await?; - - let notification_count = room_data.unread_notifications.clone().into(); - room_info.update_notification_count(notification_count); - - let ambiguity_changes = ambiguity_cache.changes.remove(room_id).unwrap_or_default(); - let room_account_data = rooms_account_data.get(room_id).cloned(); - - match room_info.state() { - RoomState::Joined => { - // Ephemeral events are added separately, because we might not - // have a room subsection in the response, yet we may have receipts for - // that room. - let ephemeral = Vec::new(); - - Ok(( - room_info, - Some(JoinedRoomUpdate::new( - timeline, - raw_state_events, - room_account_data.unwrap_or_default(), - ephemeral, - notification_count, - ambiguity_changes, - )), - None, - None, - None, - )) - } - - RoomState::Left | RoomState::Banned => Ok(( - room_info, - None, - Some(LeftRoomUpdate::new( - timeline, - raw_state_events, - room_account_data.unwrap_or_default(), - ambiguity_changes, - )), - None, - None, - )), - - RoomState::Invited => Ok((room_info, None, None, invited_room, None)), - - RoomState::Knocked => Ok((room_info, None, None, None, knocked_room)), - } - } - - /// Look through the sliding sync data for this room, find/create it in the - /// store, and process any invite information. - /// If any invite_state exists, we take it to mean that we are invited to - /// this room, unless that state contains membership events that specify - /// otherwise. https://github.com/matrix-org/matrix-spec-proposals/blob/kegan/sync-v3/proposals/3575-sync.md#room-list-parameters - fn process_sliding_sync_room_membership( - &self, - context: &mut processors::Context, - state_events: &[AnySyncStateEvent], - invite_state_events: &Option<(Vec>, Vec)>, - store: &BaseStateStore, - user_id: &UserId, - room_id: &RoomId, - ) -> (Room, RoomInfo, Option, Option) { - if let Some(state_events) = invite_state_events { - let room = store.get_or_create_room( - room_id, - RoomState::Invited, - self.room_info_notable_update_sender.clone(), - ); - let mut room_info = room.clone_info(); - - // We need to find the membership event since it could be for either an invited - // or knocked room - let membership_event_content = state_events.1.iter().find_map(|event| { - if let AnyStrippedStateEvent::RoomMember(membership_event) = event { - if membership_event.state_key == user_id { - return Some(membership_event.content.clone()); - } - } - None - }); - - if let Some(membership_event_content) = membership_event_content { - if membership_event_content.membership == MembershipState::Knock { - // If we have a `Knock` membership state, set the room as such - room_info.mark_as_knocked(); - let raw_events = state_events.0.clone(); - let knock_state = assign!(v3::KnockState::default(), { events: raw_events }); - let knocked_room = - assign!(KnockedRoom::default(), { knock_state: knock_state }); - return (room, room_info, None, Some(knocked_room)); - } - } - - // Otherwise assume it's an invited room - room_info.mark_as_invited(); - let raw_events = state_events.0.clone(); - let invited_room = InvitedRoom::from(v3::InviteState::from(raw_events)); - (room, room_info, Some(invited_room), None) - } else { - let room = store.get_or_create_room( - room_id, - RoomState::Joined, - self.room_info_notable_update_sender.clone(), - ); - let mut room_info = room.clone_info(); - - // We default to considering this room joined if it's not an invite. If it's - // actually left (and we remembered to request membership events in - // our sync request), then we can find this out from the events in - // required_state by calling handle_own_room_membership. - room_info.mark_as_joined(); - - // We don't need to do this in a v2 sync, because the membership of a room can - // be figured out by whether the room is in the "join", "leave" etc. - // property. In sliding sync we only have invite_state, - // required_state and timeline, so we must process required_state and timeline - // looking for relevant membership events. - self.handle_own_room_membership(context, state_events, &mut room_info); - - (room, room_info, None, None) - } - } - - /// Find any m.room.member events that refer to the current user, and update - /// the state in room_info to reflect the "membership" property. - fn handle_own_room_membership( - &self, - context: &mut processors::Context, - state_events: &[AnySyncStateEvent], - room_info: &mut RoomInfo, - ) { - let Some(meta) = self.session_meta() else { - return; - }; - - // Start from the last event; the first membership event we see in that order is - // the last in the regular order, so that's the only one we need to - // consider. - for event in state_events.iter().rev() { - if let AnySyncStateEvent::RoomMember(member) = &event { - // If this event updates the current user's membership, record that in the - // room_info. - if member.state_key() == meta.user_id.as_str() { - let new_state: RoomState = member.membership().into(); - if new_state != room_info.state() { - room_info.set_state(new_state); - // Update an existing notable update entry or create a new one - context - .room_info_notable_updates - .entry(room_info.room_id.to_owned()) - .or_default() - .insert(RoomInfoNotableUpdateReasons::MEMBERSHIP); - } - break; - } - } - } - } -} - -/// Find the most recent decrypted event and cache it in the supplied RoomInfo. -/// -/// If any encrypted events are found after that one, store them in the RoomInfo -/// too so we can use them when we get the relevant keys. -/// -/// It is the responsibility of the caller to update the `RoomInfo` instance -/// stored in the `Room`. -#[cfg(feature = "e2e-encryption")] -async fn cache_latest_events( - room: &Room, - room_info: &mut RoomInfo, - events: &[TimelineEvent], - changes: Option<&StateChanges>, - store: Option<&BaseStateStore>, -) { - use crate::{ - deserialized_responses::DisplayName, store::ambiguity_map::is_display_name_ambiguous, - }; - - let mut encrypted_events = - Vec::with_capacity(room.latest_encrypted_events.read().unwrap().capacity()); - - // Try to get room power levels from the current changes - let power_levels_from_changes = || { - let state_changes = changes?.state.get(room_info.room_id())?; - let room_power_levels_state = - state_changes.get(&StateEventType::RoomPowerLevels)?.values().next()?; - match room_power_levels_state.deserialize().ok()? { - AnySyncStateEvent::RoomPowerLevels(ev) => Some(ev.power_levels()), - _ => None, - } - }; - - // If we didn't get any info, try getting it from local data - let power_levels = match power_levels_from_changes() { - Some(power_levels) => Some(power_levels), - None => room.power_levels().await.ok(), - }; - - let power_levels_info = Some(room.own_user_id()).zip(power_levels.as_ref()); - - for event in events.iter().rev() { - if let Ok(timeline_event) = event.raw().deserialize() { - match is_suitable_for_latest_event(&timeline_event, power_levels_info) { - PossibleLatestEvent::YesRoomMessage(_) - | PossibleLatestEvent::YesPoll(_) - | PossibleLatestEvent::YesCallInvite(_) - | PossibleLatestEvent::YesCallNotify(_) - | PossibleLatestEvent::YesSticker(_) - | PossibleLatestEvent::YesKnockedStateEvent(_) => { - // We found a suitable latest event. Store it. - - // In order to make the latest event fast to read, we want to keep the - // associated sender in cache. This is a best-effort to gather enough - // information for creating a user profile as fast as possible. If information - // are missing, let's go back on the “slow” path. - - let mut sender_profile = None; - let mut sender_name_is_ambiguous = None; - - // First off, look up the sender's profile from the `StateChanges`, they are - // likely to be the most recent information. - if let Some(changes) = changes { - sender_profile = changes - .profiles - .get(room.room_id()) - .and_then(|profiles_by_user| { - profiles_by_user.get(timeline_event.sender()) - }) - .cloned(); - - if let Some(sender_profile) = sender_profile.as_ref() { - sender_name_is_ambiguous = sender_profile - .as_original() - .and_then(|profile| profile.content.displayname.as_ref()) - .and_then(|display_name| { - let display_name = DisplayName::new(display_name); - - changes.ambiguity_maps.get(room.room_id()).and_then( - |map_for_room| { - map_for_room.get(&display_name).map(|users| { - is_display_name_ambiguous(&display_name, users) - }) - }, - ) - }); - } - } - - // Otherwise, look up the sender's profile from the `Store`. - if sender_profile.is_none() { - if let Some(store) = store { - sender_profile = store - .get_profile(room.room_id(), timeline_event.sender()) - .await - .ok() - .flatten(); - - // TODO: need to update `sender_name_is_ambiguous`, - // but how? - } - } - - let latest_event = Box::new(LatestEvent::new_with_sender_details( - event.clone(), - sender_profile, - sender_name_is_ambiguous, - )); - - // Store it in the return RoomInfo (it will be saved for us in the room later). - room_info.latest_event = Some(latest_event); - // We don't need any of the older encrypted events because we have a new - // decrypted one. - room.latest_encrypted_events.write().unwrap().clear(); - // We can stop looking through the timeline now because everything else is - // older. - break; - } - PossibleLatestEvent::NoEncrypted => { - // m.room.encrypted - this might be the latest event later - we can't tell until - // we are able to decrypt it, so store it for now - // - // Check how many encrypted events we have seen. Only store another if we - // haven't already stored the maximum number. - if encrypted_events.len() < encrypted_events.capacity() { - encrypted_events.push(event.raw().clone()); - } - } - _ => { - // Ignore unsuitable events - } - } - } else { - warn!( - "Failed to deserialize event as AnySyncTimelineEvent. ID={}", - event.event_id().expect("Event has no ID!") - ); - } - } - - // Push the encrypted events we found into the Room, in reverse order, so - // the latest is last - room.latest_encrypted_events.write().unwrap().extend(encrypted_events.into_iter().rev()); -} - -fn process_room_properties( - context: &mut processors::Context, - room_id: &RoomId, - room_data: &http::response::Room, - room_info: &mut RoomInfo, - is_new_room: bool, -) { - // Handle the room's avatar. - // - // It can be updated via the state events, or via the - // [`http::ResponseRoom::avatar`] field. This part of the code handles the - // latter case. The former case is handled by [`BaseClient::handle_state`]. - match &room_data.avatar { - // A new avatar! - JsOption::Some(avatar_uri) => room_info.update_avatar(Some(avatar_uri.to_owned())), - // Avatar must be removed. - JsOption::Null => room_info.update_avatar(None), - // Nothing to do. - JsOption::Undefined => {} - } - - // Sliding sync doesn't have a room summary, nevertheless it contains the joined - // and invited member counts, in addition to the heroes if it's been configured - // to return them (see the [`http::RequestRoomSubscription::include_heroes`]). - if let Some(count) = room_data.joined_count { - room_info.update_joined_member_count(count.into()); - } - if let Some(count) = room_data.invited_count { - room_info.update_invited_member_count(count.into()); - } - - if let Some(heroes) = &room_data.heroes { - room_info.update_heroes( - heroes - .iter() - .map(|hero| RoomHero { - user_id: hero.user_id.clone(), - display_name: hero.name.clone(), - avatar_url: hero.avatar.clone(), - }) - .collect(), - ); - } - - room_info.set_prev_batch(room_data.prev_batch.as_deref()); - - if room_data.limited { - room_info.mark_members_missing(); - } - - if let Some(recency_stamp) = &room_data.bump_stamp { - let recency_stamp: u64 = (*recency_stamp).into(); - - if room_info.recency_stamp.as_ref() != Some(&recency_stamp) { - room_info.update_recency_stamp(recency_stamp); - - // If it's not a new room, let's emit a `RECENCY_STAMP` update. - // For a new room, the room will appear as new, so we don't care about this - // update. - if !is_new_room { - context - .room_info_notable_updates - .entry(room_id.to_owned()) - .or_default() - .insert(RoomInfoNotableUpdateReasons::RECENCY_STAMP); - } - } - } } #[cfg(all(test, not(target_family = "wasm")))] @@ -874,9 +316,9 @@ mod tests { }; use serde_json::json; - #[cfg(feature = "e2e-encryption")] - use super::cache_latest_events; use super::http; + #[cfg(feature = "e2e-encryption")] + use super::processors::room::msc4186::cache_latest_events; use crate::{ rooms::normal::{RoomHero, RoomInfoNotableUpdateReasons}, test_utils::logged_in_base_client, @@ -909,7 +351,7 @@ mod tests { .expect("Failed to process sync"); // Check it's present in the response. - let room = sync_response.rooms.join.get(room_id).unwrap(); + let room = sync_response.rooms.joined.get(room_id).unwrap(); assert_eq!(room.unread_notifications, count.clone().into()); // Check it's been updated in the store. @@ -950,9 +392,9 @@ mod tests { assert_eq!(client_room.state(), RoomState::Joined); // And it is added to the list of joined rooms only. - assert!(sync_resp.rooms.join.contains_key(room_id)); - assert!(!sync_resp.rooms.leave.contains_key(room_id)); - assert!(!sync_resp.rooms.invite.contains_key(room_id)); + assert!(sync_resp.rooms.joined.contains_key(room_id)); + assert!(!sync_resp.rooms.left.contains_key(room_id)); + assert!(!sync_resp.rooms.invited.contains_key(room_id)); } #[async_test] @@ -978,9 +420,9 @@ mod tests { assert_eq!(client_room.state(), RoomState::Joined); // And it is added to the list of joined rooms only. - assert!(sync_resp.rooms.join.contains_key(room_id)); - assert!(!sync_resp.rooms.leave.contains_key(room_id)); - assert!(!sync_resp.rooms.invite.contains_key(room_id)); + assert!(sync_resp.rooms.joined.contains_key(room_id)); + assert!(!sync_resp.rooms.left.contains_key(room_id)); + assert!(!sync_resp.rooms.invited.contains_key(room_id)); assert!(!sync_resp.rooms.knocked.contains_key(room_id)); } @@ -1038,9 +480,9 @@ mod tests { assert_eq!(client_room.state(), RoomState::Invited); // And it is added to the list of invited rooms only. - assert!(!sync_resp.rooms.join.contains_key(room_id)); - assert!(!sync_resp.rooms.leave.contains_key(room_id)); - assert!(sync_resp.rooms.invite.contains_key(room_id)); + assert!(!sync_resp.rooms.joined.contains_key(room_id)); + assert!(!sync_resp.rooms.left.contains_key(room_id)); + assert!(sync_resp.rooms.invited.contains_key(room_id)); assert!(!sync_resp.rooms.knocked.contains_key(room_id)); } @@ -1184,9 +626,9 @@ mod tests { assert_eq!(client.get_room(room_id).unwrap().state(), RoomState::Left); // And it is added to the list of left rooms only. - assert!(!sync_resp.rooms.join.contains_key(room_id)); - assert!(sync_resp.rooms.leave.contains_key(room_id)); - assert!(!sync_resp.rooms.invite.contains_key(room_id)); + assert!(!sync_resp.rooms.joined.contains_key(room_id)); + assert!(sync_resp.rooms.left.contains_key(room_id)); + assert!(!sync_resp.rooms.invited.contains_key(room_id)); assert!(!sync_resp.rooms.knocked.contains_key(room_id)); } @@ -1235,9 +677,9 @@ mod tests { } // And it is added to the list of left rooms only. - assert!(!sync_resp.rooms.join.contains_key(room_id)); - assert!(sync_resp.rooms.leave.contains_key(room_id)); - assert!(!sync_resp.rooms.invite.contains_key(room_id)); + assert!(!sync_resp.rooms.joined.contains_key(room_id)); + assert!(sync_resp.rooms.left.contains_key(room_id)); + assert!(!sync_resp.rooms.invited.contains_key(room_id)); assert!(!sync_resp.rooms.knocked.contains_key(room_id)); } } @@ -1550,8 +992,8 @@ mod tests { assert_eq!(client_room.state(), RoomState::Invited); // And it is added to the list of invited rooms, not the joined ones - assert!(!sync_resp.rooms.invite[room_id].invite_state.is_empty()); - assert!(!sync_resp.rooms.join.contains_key(room_id)); + assert!(!sync_resp.rooms.invited[room_id].invite_state.is_empty()); + assert!(!sync_resp.rooms.joined.contains_key(room_id)); } #[async_test] diff --git a/crates/matrix-sdk-base/src/store/mod.rs b/crates/matrix-sdk-base/src/store/mod.rs index 461fc89b6..253d9a0ad 100644 --- a/crates/matrix-sdk-base/src/store/mod.rs +++ b/crates/matrix-sdk-base/src/store/mod.rs @@ -321,7 +321,7 @@ impl BaseStateStore { pub fn get_or_create_room( &self, room_id: &RoomId, - room_type: RoomState, + room_state: RoomState, room_info_notable_update_sender: broadcast::Sender, ) -> Room { let user_id = @@ -335,7 +335,7 @@ impl BaseStateStore { user_id, self.inner.clone(), room_id, - room_type, + room_state, room_info_notable_update_sender, ) }) diff --git a/crates/matrix-sdk-base/src/sync.rs b/crates/matrix-sdk-base/src/sync.rs index cd829b46b..3d220b5d5 100644 --- a/crates/matrix-sdk-base/src/sync.rs +++ b/crates/matrix-sdk-base/src/sync.rs @@ -17,11 +17,11 @@ use std::{collections::BTreeMap, fmt}; use matrix_sdk_common::{debug::DebugRawEvent, deserialized_responses::TimelineEvent}; +pub use ruma::api::client::sync::sync_events::v3::{ + InvitedRoom as InvitedRoomUpdate, KnockedRoom as KnockedRoomUpdate, +}; use ruma::{ - api::client::sync::sync_events::{ - v3::{InvitedRoom as InvitedRoomUpdate, KnockedRoom as KnockedRoomUpdate}, - UnreadNotificationsCount as RumaUnreadNotificationsCount, - }, + api::client::sync::sync_events::UnreadNotificationsCount as RumaUnreadNotificationsCount, events::{ presence::PresenceEvent, AnyGlobalAccountDataEvent, AnyRoomAccountDataEvent, AnySyncEphemeralRoomEvent, AnySyncStateEvent, AnyToDeviceEvent, @@ -72,11 +72,11 @@ impl fmt::Debug for SyncResponse { #[derive(Clone, Default)] pub struct RoomUpdates { /// The rooms that the user has left or been banned from. - pub leave: BTreeMap, + pub left: BTreeMap, /// The rooms that the user has joined. - pub join: BTreeMap, + pub joined: BTreeMap, /// The rooms that the user has been invited to. - pub invite: BTreeMap, + pub invited: BTreeMap, /// The rooms that the user has knocked on. pub knocked: BTreeMap, } @@ -87,10 +87,10 @@ impl RoomUpdates { /// This will only fill the in-memory caches, not save the info on disk. pub(crate) async fn update_in_memory_caches(&self, store: &BaseStateStore) { for room in self - .leave + .left .keys() - .chain(self.join.keys()) - .chain(self.invite.keys()) + .chain(self.joined.keys()) + .chain(self.invited.keys()) .chain(self.knocked.keys()) .filter_map(|room_id| store.room(room_id)) { @@ -103,9 +103,9 @@ impl RoomUpdates { impl fmt::Debug for RoomUpdates { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { f.debug_struct("RoomUpdates") - .field("leave", &self.leave) - .field("join", &self.join) - .field("invite", &DebugInvitedRoomUpdates(&self.invite)) + .field("leave", &self.left) + .field("join", &self.joined) + .field("invite", &DebugInvitedRoomUpdates(&self.invited)) .field("knocked", &DebugKnockedRoomUpdates(&self.knocked)) .finish() } diff --git a/crates/matrix-sdk-crypto/CHANGELOG.md b/crates/matrix-sdk-crypto/CHANGELOG.md index 2d8095da7..d83a6e6e9 100644 --- a/crates/matrix-sdk-crypto/CHANGELOG.md +++ b/crates/matrix-sdk-crypto/CHANGELOG.md @@ -6,9 +6,16 @@ All notable changes to this project will be documented in this file. ## [Unreleased] - ReleaseDate +### Features + - Add experimental APIs for sharing encrypted room key history with new members, `Store::build_room_key_bundle` and `OlmMachine::share_room_key_bundle_data`. ([#4775](https://github.com/matrix-org/matrix-rust-sdk/pull/4775), [#4864](https://github.com/matrix-org/matrix-rust-sdk/pull/4864)) +- Check the `sender_device_keys` field on *all* incoming Olm-encrypted to-device messages + and ignore any to-device messages which include the field but whose data is invalid + (as per [MSC4147](https://github.com/matrix-org/matrix-spec-proposals/pull/4147)). + ([#4922](https://github.com/matrix-org/matrix-rust-sdk/pull/4922)) + ## [0.11.0] - 2025-04-11 ### Features diff --git a/crates/matrix-sdk-crypto/src/error.rs b/crates/matrix-sdk-crypto/src/error.rs index 6543d20d3..7d77144f7 100644 --- a/crates/matrix-sdk-crypto/src/error.rs +++ b/crates/matrix-sdk-crypto/src/error.rs @@ -207,6 +207,14 @@ pub enum EventError { decrypted event: expected {0}, got {1:?}" )] MismatchedRoom(OwnedRoomId, Option), + + /// The event includes `sender_device_keys` as per [MSC4147], but the + /// signature was invalid, or the ed25519 or curve25519 key did not + /// match other data in the event. + /// + /// [MSC4147]: https://github.com/matrix-org/matrix-spec-proposals/pull/4147 + #[error("the event included sender_device_keys which were invalid in some way")] + InvalidSenderDeviceKeys, } /// Error type describing different errors that can happen when we create an diff --git a/crates/matrix-sdk-crypto/src/machine/tests/megolm_sender_data.rs b/crates/matrix-sdk-crypto/src/machine/tests/megolm_sender_data.rs index 95d246b33..36477b241 100644 --- a/crates/matrix-sdk-crypto/src/machine/tests/megolm_sender_data.rs +++ b/crates/matrix-sdk-crypto/src/machine/tests/megolm_sender_data.rs @@ -18,7 +18,7 @@ use assert_matches::assert_matches; use futures_core::Stream; use futures_util::{FutureExt, StreamExt}; use matrix_sdk_test::async_test; -use ruma::{room_id, user_id, RoomId, TransactionId, UserId}; +use ruma::{events::AnyToDeviceEvent, room_id, serde::Raw, user_id, RoomId, TransactionId, UserId}; use serde::Serialize; use serde_json::json; use tokio_stream::wrappers::errors::BroadcastStreamRecvError; @@ -283,7 +283,10 @@ async fn create_and_share_session_without_sender_data( } /// Pipe a to-device event into an [`OlmMachine`]. -async fn receive_to_device_event(machine: &OlmMachine, event: &ToDeviceEvent) +pub async fn receive_to_device_event( + machine: &OlmMachine, + event: &ToDeviceEvent, +) -> (Vec>, Vec) where C: EventType + Serialize + Debug, { @@ -298,7 +301,7 @@ where next_batch_token: None, }) .await - .expect("Error receiving to-device event"); + .expect("Error receiving to-device event") } /// Given the `room_keys_received_stream`, check that there is a pending update, diff --git a/crates/matrix-sdk-crypto/src/machine/tests/olm_encryption.rs b/crates/matrix-sdk-crypto/src/machine/tests/olm_encryption.rs index afbc36017..478979a8c 100644 --- a/crates/matrix-sdk-crypto/src/machine/tests/olm_encryption.rs +++ b/crates/matrix-sdk-crypto/src/machine/tests/olm_encryption.rs @@ -24,12 +24,17 @@ use ruma::{ events::{dummy::ToDeviceDummyEventContent, AnyToDeviceEvent}, user_id, DeviceKeyAlgorithm, DeviceKeyId, SecondsSinceUnixEpoch, }; +use serde_json::json; use vodozemac::Ed25519SecretKey; use crate::{ machine::{ - test_helpers::{create_session, get_machine_pair, get_machine_pair_with_session}, + test_helpers::{ + create_session, get_machine_pair, get_machine_pair_with_session, + get_machine_pair_with_setup_sessions_test_helper, + }, tests, + tests::megolm_sender_data::receive_to_device_event, }, olm::utility::SignJson, store::Changes, @@ -242,3 +247,52 @@ async fn test_olm_encryption() { async fn test_olm_encryption_with_fallback_key() { olm_encryption_test_helper(true).await; } + +/// Encrypted to-device messages which hold a `sender_device_keys`, but that +/// data is unsigned, should not be decrypted. +#[async_test] +async fn test_decrypt_to_device_message_with_unsigned_sender_keys() { + let (alice, bob) = get_machine_pair_with_setup_sessions_test_helper( + tests::alice_id(), + tests::user_id(), + false, + ) + .await; + + let mut alice_session = alice + .get_device(bob.user_id(), bob.device_id(), None) + .await + .unwrap() + .unwrap() + .get_most_recent_session() + .await + .unwrap() + .unwrap(); + + let mut malformed_device_keys = alice_session.our_device_keys.clone(); + malformed_device_keys.signatures.clear(); + let plaintext = serde_json::to_string(&json!({ + "sender": alice.user_id(), + "sender_device": alice.device_id(), + "keys": { "ed25519": alice.identity_keys().ed25519.to_base64() }, + "recipient": bob.user_id(), + "recipient_keys": { "ed25519": bob.identity_keys().ed25519.to_base64() }, + "type": "org.matrix.test", + "content": {"a": "b"}, + "sender_device_keys": malformed_device_keys, + })) + .unwrap(); + + let ciphertext = alice_session.encrypt_helper(&plaintext).await; + let event = ToDeviceEvent::new( + alice.user_id().to_owned(), + alice_session.build_encrypted_event(ciphertext, None).await.unwrap(), + ); + + // Bob receives the to-device message + let (to_device_events, _) = receive_to_device_event(&bob, &event).await; + + // The to-device event should remain decrypted. + let event = to_device_events.first().expect("Bob did not get a to-device event"); + assert_eq!(event.get_field("type").unwrap(), Some("m.room.encrypted")); +} diff --git a/crates/matrix-sdk-crypto/src/olm/account.rs b/crates/matrix-sdk-crypto/src/olm/account.rs index 7f237f72e..ecd2d01b1 100644 --- a/crates/matrix-sdk-crypto/src/olm/account.rs +++ b/crates/matrix-sdk-crypto/src/olm/account.rs @@ -1464,6 +1464,9 @@ impl Account { ) .into()) } else { + // If the event contained sender_device_keys, check them now. + Self::check_sender_device_keys(event.as_ref(), sender_key)?; + // If this event is an `m.room_key` event, defer the check for the // Ed25519 key of the sender until we decrypt room events. This // ensures that we receive the room key even if we don't have access @@ -1496,6 +1499,57 @@ impl Account { } } + /// If the plaintext of the decrypted message includes a + /// `sender_device_keys` property per [MSC4147], check that it is valid. + /// + /// # Arguments + /// + /// * `event` - The decrypted and deserialized plaintext of the event. + /// * `sender_key` - The curve25519 key of the sender of the event. + /// + /// [MSC4147]: https://github.com/matrix-org/matrix-spec-proposals/pull/4147 + fn check_sender_device_keys( + event: &AnyDecryptedOlmEvent, + sender_key: Curve25519PublicKey, + ) -> OlmResult<()> { + let Some(sender_device_keys) = event.sender_device_keys() else { + return Ok(()); + }; + + // Check the signature within the device_keys structure + let sender_device_data = DeviceData::try_from(sender_device_keys).map_err(|err| { + warn!( + "Received a to-device message with sender_device_keys with invalid signature: {:?}", + err + ); + OlmError::EventError(EventError::InvalidSenderDeviceKeys) + })?; + + // Check that the Ed25519 key in the sender_device_keys matches the `ed25519` + // key in the `keys` field in the event. + if sender_device_data.ed25519_key() != Some(event.keys().ed25519) { + warn!( + "Received a to-device message with sender_device_keys with incorrect ed25519 key: expected {:?}, got {:?}", + event.keys().ed25519, + sender_device_data.ed25519_key(), + ); + return Err(OlmError::EventError(EventError::InvalidSenderDeviceKeys)); + } + + // Check that the Curve25519 key in the sender_device_keys matches the key that + // was used for the Olm session. + if sender_device_data.curve25519_key() != Some(sender_key) { + warn!( + "Received a to-device message with sender_device_keys with incorrect curve25519 key: expected {:?}, got {:?}", + sender_key, + sender_device_data.curve25519_key(), + ); + return Err(OlmError::EventError(EventError::InvalidSenderDeviceKeys)); + } + + Ok(()) + } + /// Internal use only. /// /// Cloning should only be done for testing purposes or when we are certain diff --git a/crates/matrix-sdk-crypto/src/olm/group_sessions/sender_data_finder.rs b/crates/matrix-sdk-crypto/src/olm/group_sessions/sender_data_finder.rs index 2a9ee9b96..142195203 100644 --- a/crates/matrix-sdk-crypto/src/olm/group_sessions/sender_data_finder.rs +++ b/crates/matrix-sdk-crypto/src/olm/group_sessions/sender_data_finder.rs @@ -174,7 +174,10 @@ impl<'a> SenderDataFinder<'a> { if let Some(sender_device_keys) = &room_key_event.sender_device_keys { // Yes: use the device keys to continue. - // Validate the signature of the DeviceKeys supplied. + // Validate the signature of the DeviceKeys supplied. (We've actually already + // done this when decrypting the event, but doing it again here is + // relatively harmless and is the easiest way of getting hold of a + // DeviceData so that we can follow the rest of this logic). let sender_device_data = DeviceData::try_from(sender_device_keys)?; Ok(self.have_device_data(sender_device_data).await?) } else { diff --git a/crates/matrix-sdk-crypto/src/types/events/olm_v1.rs b/crates/matrix-sdk-crypto/src/types/events/olm_v1.rs index d40c6e547..9dfd68e11 100644 --- a/crates/matrix-sdk-crypto/src/types/events/olm_v1.rs +++ b/crates/matrix-sdk-crypto/src/types/events/olm_v1.rs @@ -152,7 +152,7 @@ impl AnyDecryptedOlmEvent { /// The sender's device keys, if supplied in the message as per MSC4147 pub fn sender_device_keys(&self) -> Option<&DeviceKeys> { match self { - AnyDecryptedOlmEvent::Custom(_) => None, + AnyDecryptedOlmEvent::Custom(e) => e.sender_device_keys.as_ref(), AnyDecryptedOlmEvent::RoomKey(e) => e.sender_device_keys.as_ref(), AnyDecryptedOlmEvent::ForwardedRoomKey(e) => e.sender_device_keys.as_ref(), AnyDecryptedOlmEvent::SecretSend(e) => e.sender_device_keys.as_ref(), @@ -274,6 +274,9 @@ pub struct ToDeviceCustomEvent { pub keys: OlmV1Keys, /// The recipient's signing keys of the encrypted event. pub recipient_keys: OlmV1Keys, + /// The device keys if supplied as per MSC4147 + #[serde(alias = "org.matrix.msc4147.device_keys")] + pub sender_device_keys: Option, /// The type of the event. #[serde(rename = "type")] pub event_type: String, diff --git a/crates/matrix-sdk-ui/src/room_list_service/mod.rs b/crates/matrix-sdk-ui/src/room_list_service/mod.rs index 810da72dc..94c62bf73 100644 --- a/crates/matrix-sdk-ui/src/room_list_service/mod.rs +++ b/crates/matrix-sdk-ui/src/room_list_service/mod.rs @@ -161,7 +161,6 @@ impl RoomListService { .map(|(state_event, value)| (state_event.clone(), (*value).to_owned())) .collect(), ) - .include_heroes(Some(true)) .filters(Some(assign!(http::request::ListFilters::default(), { // As defined in the [SlidingSync MSC](https://github.com/matrix-org/matrix-spec-proposals/blob/9450ced7fb9cf5ea9077d029b3adf36aebfa8709/proposals/3575-sync.md?plain=1#L444) // If unset, both invited and joined rooms are returned. If false, no invited rooms are diff --git a/crates/matrix-sdk-ui/tests/integration/room_list_service.rs b/crates/matrix-sdk-ui/tests/integration/room_list_service.rs index e8c9512e8..e6963ba06 100644 --- a/crates/matrix-sdk-ui/tests/integration/room_list_service.rs +++ b/crates/matrix-sdk-ui/tests/integration/room_list_service.rs @@ -368,7 +368,6 @@ async fn test_sync_all_states() -> Result<(), Error> { ["m.room.history_visibility", ""], ["io.element.functional_members", ""], ], - "include_heroes": true, "filters": { "not_room_types": ["m.space"], }, diff --git a/crates/matrix-sdk/src/client/mod.rs b/crates/matrix-sdk/src/client/mod.rs index fc4d0c13c..0c9b5050f 100644 --- a/crates/matrix-sdk/src/client/mod.rs +++ b/crates/matrix-sdk/src/client/mod.rs @@ -2203,7 +2203,7 @@ impl Client { /// client /// .sync_with_callback(sync_settings, |response| async move { /// let channel = sync_channel; - /// for (room_id, room) in response.rooms.join { + /// for (room_id, room) in response.rooms.joined { /// for event in room.timeline.events { /// channel.send(event).await.unwrap(); /// } @@ -2283,7 +2283,7 @@ impl Client { /// .sync_with_result_callback(sync_settings, |response| async move { /// let channel = sync_channel; /// let sync_response = response?; - /// for (room_id, room) in sync_response.rooms.join { + /// for (room_id, room) in sync_response.rooms.joined { /// for event in room.timeline.events { /// channel.send(event).await.unwrap(); /// } @@ -2356,7 +2356,7 @@ impl Client { /// Box::pin(client.sync_stream(SyncSettings::default()).await); /// /// while let Some(Ok(response)) = sync_stream.next().await { - /// for room in response.rooms.join.values() { + /// for room in response.rooms.joined.values() { /// for e in &room.timeline.events { /// if let Ok(event) = e.raw().deserialize() { /// println!("Received event {:?}", event); diff --git a/crates/matrix-sdk/src/event_cache/mod.rs b/crates/matrix-sdk/src/event_cache/mod.rs index 354649179..2a4238e6d 100644 --- a/crates/matrix-sdk/src/event_cache/mod.rs +++ b/crates/matrix-sdk/src/event_cache/mod.rs @@ -538,7 +538,7 @@ impl EventCacheInner { let _lock = self.multiple_room_updates_lock.lock().await; // Left rooms. - for (room_id, left_room_update) in updates.leave { + for (room_id, left_room_update) in updates.left { let room = self.for_room(&room_id).await?; if let Err(err) = @@ -550,7 +550,7 @@ impl EventCacheInner { } // Joined rooms. - for (room_id, joined_room_update) in updates.join { + for (room_id, joined_room_update) in updates.joined { let room = self.for_room(&room_id).await?; if let Err(err) = @@ -810,8 +810,8 @@ mod tests { }; let mut updates = RoomUpdates::default(); - updates.join.insert(room_id1.to_owned(), joined_room_update1); - updates.join.insert(room_id2.to_owned(), joined_room_update2); + updates.joined.insert(room_id1.to_owned(), joined_room_update1); + updates.joined.insert(room_id2.to_owned(), joined_room_update2); // Have the event cache handle them. event_cache.inner.handle_room_updates(updates).await.unwrap(); diff --git a/crates/matrix-sdk/src/sliding_sync/client.rs b/crates/matrix-sdk/src/sliding_sync/client.rs index e7200dd37..c184770a2 100644 --- a/crates/matrix-sdk/src/sliding_sync/client.rs +++ b/crates/matrix-sdk/src/sliding_sync/client.rs @@ -248,7 +248,7 @@ impl<'a> SlidingSyncResponseProcessor<'a> { /// /// This will only fill the in-memory caches, not save the info on disk. async fn update_in_memory_caches(client: &Client, response: &SyncResponse) -> Result<()> { - for room_id in response.rooms.join.keys() { + for room_id in response.rooms.joined.keys() { let Some(room) = client.get_room(room_id) else { error!(room_id = ?room_id, "Cannot post process a room in sliding sync because it is missing"); continue; diff --git a/crates/matrix-sdk/src/sliding_sync/list/builder.rs b/crates/matrix-sdk/src/sliding_sync/list/builder.rs index 4d1a2924a..42a338f96 100644 --- a/crates/matrix-sdk/src/sliding_sync/list/builder.rs +++ b/crates/matrix-sdk/src/sliding_sync/list/builder.rs @@ -34,7 +34,6 @@ struct SlidingSyncListCachedData { pub struct SlidingSyncListBuilder { sync_mode: SlidingSyncMode, required_state: Vec<(StateEventType, String)>, - include_heroes: Option, filters: Option, timeline_limit: Bound, pub(crate) name: String, @@ -57,7 +56,6 @@ impl fmt::Debug for SlidingSyncListBuilder { .debug_struct("SlidingSyncListBuilder") .field("sync_mode", &self.sync_mode) .field("required_state", &self.required_state) - .field("include_heroes", &self.include_heroes) .field("filters", &self.filters) .field("timeline_limit", &self.timeline_limit) .field("name", &self.name) @@ -73,7 +71,6 @@ impl SlidingSyncListBuilder { (StateEventType::RoomEncryption, "".to_owned()), (StateEventType::RoomTombstone, "".to_owned()), ], - include_heroes: None, filters: None, timeline_limit: 1, name: name.into(), @@ -108,12 +105,6 @@ impl SlidingSyncListBuilder { self } - /// Include heroes. - pub fn include_heroes(mut self, value: Option) -> Self { - self.include_heroes = value; - self - } - /// Any filters to apply to the query. pub fn filters(mut self, value: Option) -> Self { self.filters = value; @@ -172,11 +163,7 @@ impl SlidingSyncListBuilder { // From the builder sticky: StdRwLock::new(SlidingSyncStickyManager::new( - SlidingSyncListStickyParameters::new( - self.required_state, - self.include_heroes, - self.filters, - ), + SlidingSyncListStickyParameters::new(self.required_state, self.filters), )), timeline_limit: StdRwLock::new(self.timeline_limit), name: self.name, diff --git a/crates/matrix-sdk/src/sliding_sync/list/sticky.rs b/crates/matrix-sdk/src/sliding_sync/list/sticky.rs index 495f7916b..960afa400 100644 --- a/crates/matrix-sdk/src/sliding_sync/list/sticky.rs +++ b/crates/matrix-sdk/src/sliding_sync/list/sticky.rs @@ -9,10 +9,6 @@ pub(super) struct SlidingSyncListStickyParameters { /// Required states to return per room. required_state: Vec<(StateEventType, String)>, - /// Return a stripped variant of membership events for the users used to - /// calculate the room name. - include_heroes: Option, - /// Any filters to apply to the query. filters: Option, } @@ -20,12 +16,11 @@ pub(super) struct SlidingSyncListStickyParameters { impl SlidingSyncListStickyParameters { pub fn new( required_state: Vec<(StateEventType, String)>, - include_heroes: Option, filters: Option, ) -> Self { // Consider that each list will have at least one parameter set, so invalidate // it by default. - Self { required_state, include_heroes, filters } + Self { required_state, filters } } } @@ -34,7 +29,6 @@ impl StickyData for SlidingSyncListStickyParameters { fn apply(&self, request: &mut Self::Request) { request.room_details.required_state = self.required_state.to_vec(); - request.include_heroes = self.include_heroes; request.filters = self.filters.clone(); } } diff --git a/crates/matrix-sdk/src/sliding_sync/mod.rs b/crates/matrix-sdk/src/sliding_sync/mod.rs index c1006d06e..e74ad81f8 100644 --- a/crates/matrix-sdk/src/sliding_sync/mod.rs +++ b/crates/matrix-sdk/src/sliding_sync/mod.rs @@ -321,14 +321,14 @@ impl SlidingSync { let updated_rooms = { let mut rooms_map = self.inner.rooms.write().await; - let mut updated_rooms = Vec::with_capacity(sync_response.rooms.join.len()); + let mut updated_rooms = Vec::with_capacity(sync_response.rooms.joined.len()); for (room_id, mut room_data) in sliding_sync_response.rooms.into_iter() { // `sync_response` contains the rooms with decrypted events if any, so look at // the timeline events here first if the room exists. // Otherwise, let's look at the timeline inside the `sliding_sync_response`. let timeline = - if let Some(joined_room) = sync_response.rooms.join.remove(&room_id) { + if let Some(joined_room) = sync_response.rooms.joined.remove(&room_id) { joined_room.timeline.events } else { room_data.timeline.drain(..).map(TimelineEvent::new).collect() @@ -363,7 +363,7 @@ impl SlidingSync { // Since we've removed rooms that were in the room subsection from // `sync_response.rooms.join`, the remaining ones aren't already present in // `updated_rooms` and wouldn't cause any duplicates. - updated_rooms.extend(sync_response.rooms.join.keys().cloned()); + updated_rooms.extend(sync_response.rooms.joined.keys().cloned()); updated_rooms }; diff --git a/crates/matrix-sdk/src/sync.rs b/crates/matrix-sdk/src/sync.rs index 61f54ccfe..2c97714b3 100644 --- a/crates/matrix-sdk/src/sync.rs +++ b/crates/matrix-sdk/src/sync.rs @@ -178,7 +178,7 @@ impl Client { // Ignore errors when there are no receivers. let _ = self.inner.room_updates_sender.send(rooms.clone()); - for (room_id, room_info) in &rooms.join { + for (room_id, room_info) in &rooms.joined { let Some(room) = self.get_room(room_id) else { error!(?room_id, "Can't call event handler, room not found"); continue; @@ -207,7 +207,7 @@ impl Client { self.handle_sync_events(HandlerKind::EphemeralRoomData, room, ephemeral).await?; } - for (room_id, room_info) in &rooms.leave { + for (room_id, room_info) in &rooms.left { let Some(room) = self.get_room(room_id) else { error!(?room_id, "Can't call event handler, room not found"); continue; @@ -226,7 +226,7 @@ impl Client { self.handle_sync_timeline_events(room, &timeline.events).await?; } - for (room_id, room_info) in &rooms.invite { + for (room_id, room_info) in &rooms.invited { let Some(room) = self.get_room(room_id) else { error!(?room_id, "Can't call event handler, room not found"); continue; diff --git a/crates/matrix-sdk/tests/integration/client.rs b/crates/matrix-sdk/tests/integration/client.rs index e58f0d9f7..91a967ed4 100644 --- a/crates/matrix-sdk/tests/integration/client.rs +++ b/crates/matrix-sdk/tests/integration/client.rs @@ -349,13 +349,13 @@ async fn test_subscribe_all_room_updates() { client.sync_once(sync_settings).await.unwrap(); let room_updates = rx.recv().now_or_never().unwrap().unwrap(); - assert_let!(RoomUpdates { leave, join, invite, knocked } = room_updates); + assert_let!(RoomUpdates { left, joined, invited, knocked } = room_updates); // Check the left room updates. { - assert_eq!(leave.len(), 1); + assert_eq!(left.len(), 1); - let (room_id, update) = leave.iter().next().unwrap(); + let (room_id, update) = left.iter().next().unwrap(); assert_eq!(room_id, *MIXED_LEFT_ROOM_ID); assert!(update.state.is_empty()); @@ -365,9 +365,9 @@ async fn test_subscribe_all_room_updates() { // Check the joined room updates. { - assert_eq!(join.len(), 1); + assert_eq!(joined.len(), 1); - let (room_id, update) = join.iter().next().unwrap(); + let (room_id, update) = joined.iter().next().unwrap(); assert_eq!(room_id, *MIXED_JOINED_ROOM_ID); @@ -385,9 +385,9 @@ async fn test_subscribe_all_room_updates() { // Check the invited room updates. { - assert_eq!(invite.len(), 1); + assert_eq!(invited.len(), 1); - let (room_id, update) = invite.iter().next().unwrap(); + let (room_id, update) = invited.iter().next().unwrap(); assert_eq!(room_id, *MIXED_INVITED_ROOM_ID); assert_eq!(update.invite_state.events.len(), 2); @@ -938,7 +938,7 @@ async fn test_test_ambiguity_changes() { let room = client.get_room(&DEFAULT_TEST_ROOM_ID).unwrap(); - let changes = &response.rooms.join.get(*DEFAULT_TEST_ROOM_ID).unwrap().ambiguity_changes; + let changes = &response.rooms.joined.get(*DEFAULT_TEST_ROOM_ID).unwrap().ambiguity_changes; // A new member always triggers an ambiguity change. let example_change = changes.get(event_id!("$151800140517rfvjc:localhost")).unwrap(); @@ -1009,7 +1009,7 @@ async fn test_test_ambiguity_changes() { let response = client.sync_once(SyncSettings::default()).await.unwrap(); server.reset().await; - let changes = &response.rooms.join.get(*DEFAULT_TEST_ROOM_ID).unwrap().ambiguity_changes; + let changes = &response.rooms.joined.get(*DEFAULT_TEST_ROOM_ID).unwrap().ambiguity_changes; // First joined member made both members ambiguous. let example_2_change = changes.get(example_2_rename_1_event_id).unwrap(); @@ -1068,7 +1068,7 @@ async fn test_test_ambiguity_changes() { let response = client.sync_once(SyncSettings::default()).await.unwrap(); server.reset().await; - let changes = &response.rooms.join.get(*DEFAULT_TEST_ROOM_ID).unwrap().ambiguity_changes; + let changes = &response.rooms.joined.get(*DEFAULT_TEST_ROOM_ID).unwrap().ambiguity_changes; // example 2 is not ambiguous anymore. let example_2_change = changes.get(example_2_rename_2_event_id).unwrap(); @@ -1114,7 +1114,7 @@ async fn test_test_ambiguity_changes() { let response = client.sync_once(SyncSettings::default()).await.unwrap(); server.reset().await; - let changes = &response.rooms.join.get(*DEFAULT_TEST_ROOM_ID).unwrap().ambiguity_changes; + let changes = &response.rooms.joined.get(*DEFAULT_TEST_ROOM_ID).unwrap().ambiguity_changes; // example 3 is now ambiguous with example 2, not example. let example_3_change = changes.get(example_3_rename_event_id).unwrap(); @@ -1160,7 +1160,7 @@ async fn test_test_ambiguity_changes() { let response = client.sync_once(SyncSettings::default()).await.unwrap(); server.reset().await; - let changes = &response.rooms.join.get(*DEFAULT_TEST_ROOM_ID).unwrap().ambiguity_changes; + let changes = &response.rooms.joined.get(*DEFAULT_TEST_ROOM_ID).unwrap().ambiguity_changes; // name change, even if still not ambiguous, triggers ambiguity change. let example_change = changes.get(example_rename_event_id).unwrap(); @@ -1207,7 +1207,7 @@ async fn test_test_ambiguity_changes() { server.reset().await; // Avatar change does not trigger ambiguity change. - assert!(response.rooms.join.get(*DEFAULT_TEST_ROOM_ID).unwrap().ambiguity_changes.is_empty()); + assert!(response.rooms.joined.get(*DEFAULT_TEST_ROOM_ID).unwrap().ambiguity_changes.is_empty()); let changes = assert_next_matches!(updates, Ok(RoomUpdate::Joined { updates, .. }) => updates.ambiguity_changes); assert!(changes.is_empty()); diff --git a/testing/matrix-sdk-integration-testing/src/tests/sliding_sync/notification_client.rs b/testing/matrix-sdk-integration-testing/src/tests/sliding_sync/notification_client.rs index a83555245..c036fd480 100644 --- a/testing/matrix-sdk-integration-testing/src/tests/sliding_sync/notification_client.rs +++ b/testing/matrix-sdk-integration-testing/src/tests/sliding_sync/notification_client.rs @@ -61,7 +61,7 @@ async fn test_notification() -> Result<()> { let bob_invite_response = bob.sync_once(Default::default()).await?; let sync_token = bob_invite_response.next_batch; - let mut invited_rooms = bob_invite_response.rooms.invite; + let mut invited_rooms = bob_invite_response.rooms.invited; assert_eq!(invited_rooms.len(), 1, "must be only one invitation"); @@ -145,7 +145,7 @@ async fn test_notification() -> Result<()> { // In this sync, Bob receives the message from Alice. let bob_response = bob.sync_once(SyncSettings::default().token(sync_token)).await?; - let mut joined_rooms = bob_response.rooms.join.into_iter(); + let mut joined_rooms = bob_response.rooms.joined.into_iter(); let (_, bob_room) = joined_rooms.next().expect("must have joined one room"); assert!(joined_rooms.next().is_none(), "no more joined rooms: {joined_rooms:#?}");