diff --git a/crates/matrix-sdk-base/src/client.rs b/crates/matrix-sdk-base/src/client.rs index 1f5ac5aee..64d872496 100644 --- a/crates/matrix-sdk-base/src/client.rs +++ b/crates/matrix-sdk-base/src/client.rs @@ -65,7 +65,7 @@ use crate::{ Result as StoreResult, RoomLoadSettings, StateChanges, StateStoreDataKey, StateStoreDataValue, StateStoreExt, StoreConfig, }, - sync::{JoinedRoomUpdate, LeftRoomUpdate, RoomUpdates, SyncResponse}, + sync::{LeftRoomUpdate, RoomUpdates, SyncResponse}, RoomStateFilter, SessionMeta, }; @@ -539,71 +539,23 @@ impl BaseClient { let mut updated_members_in_room: BTreeMap> = BTreeMap::new(); - for (room_id, new_info) in response.rooms.join { - let room = self.state_store.get_or_create_room( + for (room_id, joined_room) in response.rooms.join { + let joined_room_update = processors::room::sync_v2::update_joined_room( + &mut context, &room_id, - RoomState::Joined, + joined_room, + requested_required_states, + &self.state_store, self.room_info_notable_update_sender.clone(), - ); - - let mut room_info = room.clone_info(); - - room_info.mark_as_joined(); - room_info.update_from_ruma_summary(&new_info.summary); - room_info.set_prev_batch(new_info.timeline.prev_batch.as_deref()); - room_info.mark_state_fully_synced(); - room_info.handle_encryption_state(requested_required_states.for_room(&room_id)); - - let (raw_state_events, state_events) = - processors::state_events::sync::collect(&mut context, &new_info.state.events); - - let mut new_user_ids = processors::state_events::sync::dispatch_and_get_new_users( - &mut context, - (&raw_state_events, &state_events), - &mut room_info, &mut ambiguity_cache, - ) - .await?; - - processors::ephemeral_events::dispatch( - &mut context, - &new_info.ephemeral.events, - &room_id, - ); - - if new_info.timeline.limited { - room_info.mark_members_missing(); - } - - let (raw_state_events_from_timeline, state_events_from_timeline) = - processors::state_events::sync::collect_from_timeline( - &mut context, - &new_info.timeline.events, - ); - - let mut other_new_user_ids = - processors::state_events::sync::dispatch_and_get_new_users( - &mut context, - (&raw_state_events_from_timeline, &state_events_from_timeline), - &mut room_info, - &mut ambiguity_cache, - ) - .await?; - new_user_ids.append(&mut other_new_user_ids); - updated_members_in_room.insert(room_id.to_owned(), new_user_ids.clone()); - - let timeline = processors::timeline::build( - &mut context, - &room, - &mut room_info, - processors::timeline::builder::Timeline::from(new_info.timeline), + &mut updated_members_in_room, processors::timeline::builder::Notification::new( &push_rules, &mut notifications, &self.state_store, ), #[cfg(feature = "e2e-encryption")] - processors::timeline::builder::E2EE::new( + processors::e2ee::E2EE::new( olm_machine.as_ref(), self.decryption_trust_requirement, self.handle_verification_events, @@ -611,59 +563,7 @@ impl BaseClient { ) .await?; - // Save the new `RoomInfo`. - context.state_changes.add_room(room_info); - - processors::account_data::for_room( - &mut context, - &room_id, - &new_info.account_data.events, - &self.state_store, - ) - .await; - - // `processors::account_data::from_room` might have updated the `RoomInfo`. - // Let's fetch it again. - // - // SAFETY: `expect` is safe because the `RoomInfo` has been inserted 2 lines - // above. - let mut room_info = context - .state_changes - .room_infos - .get(&room_id) - .expect("`RoomInfo` must exist in `StateChanges` at this point") - .clone(); - - #[cfg(feature = "e2e-encryption")] - processors::e2ee::tracked_users::update_or_set_if_room_is_newly_encrypted( - &mut context, - olm_machine.as_ref(), - &new_user_ids, - room_info.encryption_state(), - room.encryption_state(), - &room_id, - &self.state_store, - ) - .await?; - - let notification_count = new_info.unread_notifications.into(); - room_info.update_notification_count(notification_count); - - let ambiguity_changes = ambiguity_cache.changes.remove(&room_id).unwrap_or_default(); - - new_rooms.join.insert( - room_id, - JoinedRoomUpdate::new( - timeline, - new_info.state.events, - new_info.account_data.events, - new_info.ephemeral.events, - notification_count, - ambiguity_changes, - ), - ); - - context.state_changes.add_room(room_info); + new_rooms.join.insert(room_id, joined_room_update); } for (room_id, new_info) in response.rooms.leave { diff --git a/crates/matrix-sdk-base/src/response_processors/mod.rs b/crates/matrix-sdk-base/src/response_processors/mod.rs index 2eec001f8..fea890d50 100644 --- a/crates/matrix-sdk-base/src/response_processors/mod.rs +++ b/crates/matrix-sdk-base/src/response_processors/mod.rs @@ -20,6 +20,7 @@ pub mod ephemeral_events; #[cfg(feature = "e2e-encryption")] pub mod latest_event; pub mod profiles; +pub mod room; pub mod state_events; pub mod timeline; #[cfg(feature = "e2e-encryption")] diff --git a/crates/matrix-sdk-base/src/response_processors/room/mod.rs b/crates/matrix-sdk-base/src/response_processors/room/mod.rs new file mode 100644 index 000000000..681b43bd8 --- /dev/null +++ b/crates/matrix-sdk-base/src/response_processors/room/mod.rs @@ -0,0 +1,15 @@ +// Copyright 2025 The Matrix.org Foundation C.I.C. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +pub mod sync_v2; diff --git a/crates/matrix-sdk-base/src/response_processors/room/sync_v2.rs b/crates/matrix-sdk-base/src/response_processors/room/sync_v2.rs new file mode 100644 index 000000000..54a05be7b --- /dev/null +++ b/crates/matrix-sdk-base/src/response_processors/room/sync_v2.rs @@ -0,0 +1,140 @@ +// Copyright 2025 The Matrix.org Foundation C.I.C. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::collections::{BTreeMap, BTreeSet}; + +use ruma::{api::client::sync::sync_events::v3::JoinedRoom, OwnedRoomId, OwnedUserId, RoomId}; +use tokio::sync::broadcast::Sender; + +#[cfg(feature = "e2e-encryption")] +use super::super::e2ee; +use super::super::{account_data, ephemeral_events, state_events, timeline, Context}; +use crate::{ + store::{ambiguity_map::AmbiguityCache, BaseStateStore}, + sync::JoinedRoomUpdate, + RequestedRequiredStates, Result, RoomInfoNotableUpdate, RoomState, +}; + +/// Process updates of a joined room. +#[allow(clippy::too_many_arguments)] +pub async fn update_joined_room( + context: &mut Context, + room_id: &RoomId, + joined_room: JoinedRoom, + requested_required_states: &RequestedRequiredStates, + state_store: &BaseStateStore, + room_info_notable_update_sender: Sender, + ambiguity_cache: &mut AmbiguityCache, + updated_members_in_room: &mut BTreeMap>, + notification: timeline::builder::Notification<'_>, + #[cfg(feature = "e2e-encryption")] e2ee: e2ee::E2EE<'_>, +) -> Result { + let room = + state_store.get_or_create_room(room_id, RoomState::Joined, room_info_notable_update_sender); + + let mut room_info = room.clone_info(); + + room_info.mark_as_joined(); + room_info.update_from_ruma_summary(&joined_room.summary); + room_info.set_prev_batch(joined_room.timeline.prev_batch.as_deref()); + room_info.mark_state_fully_synced(); + room_info.handle_encryption_state(requested_required_states.for_room(room_id)); + + let (raw_state_events, state_events) = + state_events::sync::collect(context, &joined_room.state.events); + + let mut new_user_ids = state_events::sync::dispatch_and_get_new_users( + context, + (&raw_state_events, &state_events), + &mut room_info, + ambiguity_cache, + ) + .await?; + + ephemeral_events::dispatch(context, &joined_room.ephemeral.events, room_id); + + if joined_room.timeline.limited { + room_info.mark_members_missing(); + } + + let (raw_state_events_from_timeline, state_events_from_timeline) = + state_events::sync::collect_from_timeline(context, &joined_room.timeline.events); + + let mut other_new_user_ids = state_events::sync::dispatch_and_get_new_users( + context, + (&raw_state_events_from_timeline, &state_events_from_timeline), + &mut room_info, + ambiguity_cache, + ) + .await?; + new_user_ids.append(&mut other_new_user_ids); + updated_members_in_room.insert(room_id.to_owned(), new_user_ids.clone()); + + #[cfg(feature = "e2e-encryption")] + let olm_machine = e2ee.olm_machine.clone(); + + let timeline = timeline::build( + context, + &room, + &mut room_info, + timeline::builder::Timeline::from(joined_room.timeline), + notification, + #[cfg(feature = "e2e-encryption")] + e2ee, + ) + .await?; + + // Save the new `RoomInfo`. + context.state_changes.add_room(room_info); + + account_data::for_room(context, room_id, &joined_room.account_data.events, state_store).await; + + // `processors::account_data::from_room` might have updated the `RoomInfo`. + // Let's fetch it again. + // + // SAFETY: `expect` is safe because the `RoomInfo` has been inserted 2 lines + // above. + let mut room_info = context + .state_changes + .room_infos + .get(room_id) + .expect("`RoomInfo` must exist in `StateChanges` at this point") + .clone(); + + #[cfg(feature = "e2e-encryption")] + e2ee::tracked_users::update_or_set_if_room_is_newly_encrypted( + context, + olm_machine, + &new_user_ids, + room_info.encryption_state(), + room.encryption_state(), + room_id, + state_store, + ) + .await?; + + let notification_count = joined_room.unread_notifications.into(); + room_info.update_notification_count(notification_count); + + context.state_changes.add_room(room_info); + + Ok(JoinedRoomUpdate::new( + timeline, + joined_room.state.events, + joined_room.account_data.events, + joined_room.ephemeral.events, + notification_count, + ambiguity_cache.changes.remove(room_id).unwrap_or_default(), + )) +}