From 4298648d05c6c4599c19b5a8cb678b35dbe2dc6f Mon Sep 17 00:00:00 2001 From: Michael Goldenberg Date: Sat, 18 Apr 2026 15:43:01 -0400 Subject: [PATCH] refactor: ensure all room info updates are atomic Signed-off-by: Michael Goldenberg --- crates/matrix-sdk-base/src/client.rs | 61 ++++++++----------- .../src/response_processors/changes.rs | 9 ++- crates/matrix-sdk-base/src/room/room_info.rs | 2 +- .../src/room_list_service/sorters/recency.rs | 44 +++++++------ .../src/event_cache/caches/room/state.rs | 29 +++------ .../src/latest_events/latest_event/mod.rs | 35 ++++------- crates/matrix-sdk/src/latest_events/mod.rs | 2 +- crates/matrix-sdk/src/room/mod.rs | 32 ++++------ .../matrix-sdk/tests/integration/room/left.rs | 8 ++- 9 files changed, 95 insertions(+), 127 deletions(-) diff --git a/crates/matrix-sdk-base/src/client.rs b/crates/matrix-sdk-base/src/client.rs index df0983aad..ded92d1d1 100644 --- a/crates/matrix-sdk-base/src/client.rs +++ b/crates/matrix-sdk-base/src/client.rs @@ -405,24 +405,22 @@ impl BaseClient { let room = self.state_store.get_or_create_room(room_id, RoomState::Knocked); if room.state() != RoomState::Knocked { - let _state_store_lock = self.state_store_lock().lock().await; - - let mut room_info = room.clone_info(); - room_info.mark_as_knocked(); - room_info.mark_state_partially_synced(); - room_info.mark_members_missing(); // the own member event changed + let store_guard = self.state_store.lock().lock().await; // We are no longer joined to the room, so the invite acceptance details are no // longer relevant. #[cfg(feature = "e2e-encryption")] if let Some(olm_machine) = self.olm_machine().await.as_ref() { - olm_machine.store().clear_room_pending_key_bundle(room_info.room_id()).await? + olm_machine.store().clear_room_pending_key_bundle(room_id).await? } - let mut changes = StateChanges::default(); - changes.add_room(room_info.clone()); - self.state_store.save_changes(&changes).await?; // Update the store - room.set_room_info(room_info, RoomInfoNotableUpdateReasons::MEMBERSHIP); + room.update_and_save_room_info_with_store_guard(&store_guard, |mut info| { + info.mark_as_knocked(); + info.mark_state_partially_synced(); + info.mark_members_missing(); // the own member event changed + (info, RoomInfoNotableUpdateReasons::MEMBERSHIP) + }) + .await?; } Ok(room) @@ -481,13 +479,7 @@ impl BaseClient { // If the state isn't `RoomState::Joined` then this means that we knew about // this room before. Let's modify the existing state now. if room.state() != RoomState::Joined { - let _state_store_lock = self.state_store_lock().lock().await; - - let mut room_info = room.clone_info(); - - room_info.mark_as_joined(); - room_info.mark_state_partially_synced(); - room_info.mark_members_missing(); // the own member event changed + let store_guard = self.state_store_lock().lock().await; #[cfg(feature = "e2e-encryption")] { @@ -515,12 +507,13 @@ impl BaseClient { let _ = inviter; } - let mut changes = StateChanges::default(); - changes.add_room(room_info.clone()); - - self.state_store.save_changes(&changes).await?; // Update the store - - room.set_room_info(room_info, RoomInfoNotableUpdateReasons::MEMBERSHIP); + room.update_and_save_room_info_with_store_guard(&store_guard, |mut info| { + info.mark_as_joined(); + info.mark_state_partially_synced(); + info.mark_members_missing(); // the own member event changed + (info, RoomInfoNotableUpdateReasons::MEMBERSHIP) + }) + .await?; } Ok(room) @@ -533,24 +526,22 @@ impl BaseClient { let room = self.state_store.get_or_create_room(room_id, RoomState::Left); if room.state() != RoomState::Left { - let _state_store_lock = self.state_store_lock().lock().await; - - let mut room_info = room.clone_info(); - room_info.mark_as_left(); - room_info.mark_state_partially_synced(); - room_info.mark_members_missing(); // the own member event changed + let store_guard = self.state_store.lock().lock().await; // We are no longer joined to the room, so the invite acceptance details are no // longer relevant. #[cfg(feature = "e2e-encryption")] if let Some(olm_machine) = self.olm_machine().await.as_ref() { - olm_machine.store().clear_room_pending_key_bundle(room_info.room_id()).await? + olm_machine.store().clear_room_pending_key_bundle(room_id).await? } - let mut changes = StateChanges::default(); - changes.add_room(room_info.clone()); - self.state_store.save_changes(&changes).await?; // Update the store - room.set_room_info(room_info, RoomInfoNotableUpdateReasons::MEMBERSHIP); + room.update_and_save_room_info_with_store_guard(&store_guard, |mut info| { + info.mark_as_left(); + info.mark_state_partially_synced(); + info.mark_members_missing(); // the own member event changed + (info, RoomInfoNotableUpdateReasons::MEMBERSHIP) + }) + .await?; } Ok(()) diff --git a/crates/matrix-sdk-base/src/response_processors/changes.rs b/crates/matrix-sdk-base/src/response_processors/changes.rs index e70446bfd..af052b46e 100644 --- a/crates/matrix-sdk-base/src/response_processors/changes.rs +++ b/crates/matrix-sdk-base/src/response_processors/changes.rs @@ -33,7 +33,7 @@ pub async fn save_only(context: Context, state_store: &BaseStateStore) -> Result let _timer = timer!(tracing::Level::TRACE, "_method"); save_changes(&context, state_store, None).await?; - broadcast_room_info_notable_updates(&context, state_store); + broadcast_room_info_notable_updates(&context, state_store).await; Ok(()) } @@ -56,7 +56,7 @@ pub async fn save_and_apply( save_changes(&context, state_store, sync_token).await?; apply_changes(&context, ignore_user_list_changes, previous_ignored_user_list); - broadcast_room_info_notable_updates(&context, state_store); + broadcast_room_info_notable_updates(&context, state_store).await; trace!("applied changes"); @@ -118,13 +118,12 @@ fn apply_changes( } } -fn broadcast_room_info_notable_updates(context: &Context, state_store: &BaseStateStore) { +async fn broadcast_room_info_notable_updates(context: &Context, state_store: &BaseStateStore) { for (room_id, room_info) in &context.state_changes.room_infos { if let Some(room) = state_store.room(room_id) { let room_info_notable_update_reasons = context.room_info_notable_updates.get(room_id).copied().unwrap_or_default(); - - room.set_room_info(room_info.clone(), room_info_notable_update_reasons) + room.update_room_info(|_| (room_info.clone(), room_info_notable_update_reasons)).await; } } } diff --git a/crates/matrix-sdk-base/src/room/room_info.rs b/crates/matrix-sdk-base/src/room/room_info.rs index cdd1a4566..7bfeec22b 100644 --- a/crates/matrix-sdk-base/src/room/room_info.rs +++ b/crates/matrix-sdk-base/src/room/room_info.rs @@ -156,7 +156,7 @@ impl Room { } /// Update the summary with given RoomInfo. - pub fn set_room_info( + fn set_room_info( &self, room_info: RoomInfo, room_info_notable_update_reasons: RoomInfoNotableUpdateReasons, diff --git a/crates/matrix-sdk-ui/src/room_list_service/sorters/recency.rs b/crates/matrix-sdk-ui/src/room_list_service/sorters/recency.rs index 2db1003d9..388741525 100644 --- a/crates/matrix-sdk-ui/src/room_list_service/sorters/recency.rs +++ b/crates/matrix-sdk-ui/src/room_list_service/sorters/recency.rs @@ -163,17 +163,21 @@ mod tests { }) } - fn set_latest_event_value(room: &mut RoomListItem, latest_event_value: LatestEventValue) { - let mut room_info = room.clone_info(); - room_info.set_latest_event(latest_event_value); - room.set_room_info(room_info, RoomInfoNotableUpdateReasons::LATEST_EVENT); + async fn set_latest_event_value(room: &mut RoomListItem, latest_event_value: LatestEventValue) { + room.update_room_info(|mut info| { + info.set_latest_event(latest_event_value); + (info, RoomInfoNotableUpdateReasons::LATEST_EVENT) + }) + .await; room.refresh_cached_data(); } - fn set_recency_stamp(room: &mut RoomListItem, recency_stamp: RoomRecencyStamp) { - let mut room_info = room.clone_info(); - room_info.update_recency_stamp(recency_stamp); - room.set_room_info(room_info, RoomInfoNotableUpdateReasons::RECENCY_STAMP); + async fn set_recency_stamp(room: &mut RoomListItem, recency_stamp: RoomRecencyStamp) { + room.update_room_info(|mut info| { + info.update_recency_stamp(recency_stamp); + (info, RoomInfoNotableUpdateReasons::RECENCY_STAMP) + }) + .await; room.refresh_cached_data(); } @@ -184,15 +188,15 @@ mod tests { let [mut room_a, mut room_b] = new_rooms([room_id!("!a:b.c"), room_id!("!d:e.f")], &client, &server).await; - set_recency_stamp(&mut room_a, 1.into()); - set_recency_stamp(&mut room_b, 2.into()); + set_recency_stamp(&mut room_a, 1.into()).await; + set_recency_stamp(&mut room_b, 2.into()).await; // Both rooms have a `LatestEventValue::None`. // // Because there is no latest event, the recency stamp MUST BE USED. { - set_latest_event_value(&mut room_a, none()); - set_latest_event_value(&mut room_b, none()); + set_latest_event_value(&mut room_a, none()).await; + set_latest_event_value(&mut room_b, none()).await; assert_eq!(extract_scores(&room_a, &room_b), (Some(1), Some(2))); } @@ -201,8 +205,8 @@ mod tests { // // One of the room has a latest event, so the recency stamp MUST BE IGNORED. { - set_latest_event_value(&mut room_a, none()); - set_latest_event_value(&mut room_b, remote(3)); + set_latest_event_value(&mut room_a, none()).await; + set_latest_event_value(&mut room_b, remote(3)).await; assert_eq!(extract_scores(&room_a, &room_b), (None, Some(3))); } @@ -211,8 +215,8 @@ mod tests { // // One of the room has a latest event, so the recency stamp MUST BE IGNORED. { - set_latest_event_value(&mut room_a, remote(3)); - set_latest_event_value(&mut room_b, none()); + set_latest_event_value(&mut room_a, remote(3)).await; + set_latest_event_value(&mut room_b, none()).await; assert_eq!(extract_scores(&room_a, &room_b), (Some(3), None)); } @@ -225,8 +229,8 @@ mod tests { let [mut room_a, mut room_b] = new_rooms([room_id!("!a:b.c"), room_id!("!d:e.f")], &client, &server).await; - set_recency_stamp(&mut room_a, 1.into()); - set_recency_stamp(&mut room_b, 2.into()); + set_recency_stamp(&mut room_a, 1.into()).await; + set_recency_stamp(&mut room_b, 2.into()).await; // `room_a` and `room_b` has either `Remote` or `Local*`. // @@ -236,8 +240,8 @@ mod tests { for latest_event_value_b in [remote(4), local_is_sending(4), local_cannot_be_sent(4)] { - set_latest_event_value(&mut room_a, latest_event_value_a.clone()); - set_latest_event_value(&mut room_b, latest_event_value_b); + set_latest_event_value(&mut room_a, latest_event_value_a.clone()).await; + set_latest_event_value(&mut room_b, latest_event_value_b).await; assert_eq!(extract_scores(&room_a, &room_b), (Some(3), Some(4))); } diff --git a/crates/matrix-sdk/src/event_cache/caches/room/state.rs b/crates/matrix-sdk/src/event_cache/caches/room/state.rs index addcfe736..dee454995 100644 --- a/crates/matrix-sdk/src/event_cache/caches/room/state.rs +++ b/crates/matrix-sdk/src/event_cache/caches/room/state.rs @@ -23,8 +23,7 @@ use std::{ use eyeball::SharedObservable; use eyeball_im::VectorDiff; use matrix_sdk_base::{ - RoomInfoNotableUpdateReasons, StateChanges, apply_redaction, - check_validity_of_replacement_events, + RoomInfoNotableUpdateReasons, apply_redaction, check_validity_of_replacement_events, deserialized_responses::{ThreadSummary, ThreadSummaryStatus}, event_cache::{ Event, Gap, @@ -1059,27 +1058,15 @@ impl<'a> RoomEventCacheStateLockWriteGuard<'a> { // The read receipt has changed! Do a little dance to update the `RoomInfo` in // the state store, and then in the room itself, so that observers // can be notified of the change. - let client = room.client(); - - // Take the state store lock. - let _state_store_lock = client.base_client().state_store_lock().lock().await; - - // Don't reuse the room info from above, as it might have changed in the - // meanwhile. This access is somewhat protected by the state store locking, even - // though other code may call `set_room_info` concurrently. - let mut room_info = room.clone_info(); - room_info.set_read_receipts(read_receipts); - - let mut state_changes = StateChanges::default(); - state_changes.add_room(room_info.clone()); - - // Update the `RoomInfo` in the state store. - if let Err(error) = client.state_store().save_changes(&state_changes).await { + let result = room + .update_and_save_room_info(|mut room_info| { + room_info.set_read_receipts(read_receipts); + (room_info, RoomInfoNotableUpdateReasons::READ_RECEIPT) + }) + .await; + if let Err(error) = result { error!(room_id = ?room.room_id(), ?error, "Failed to save the changes"); } - - // Update the `RoomInfo` of the room. - room.set_room_info(room_info, RoomInfoNotableUpdateReasons::READ_RECEIPT); } Ok(()) diff --git a/crates/matrix-sdk/src/latest_events/latest_event/mod.rs b/crates/matrix-sdk/src/latest_events/latest_event/mod.rs index 65b16382e..8d484ec56 100644 --- a/crates/matrix-sdk/src/latest_events/latest_event/mod.rs +++ b/crates/matrix-sdk/src/latest_events/latest_event/mod.rs @@ -21,7 +21,7 @@ use eyeball::{AsyncLock, ObservableWriteGuard, SharedObservable, Subscriber}; pub use matrix_sdk_base::latest_event::{ LatestEventValue, LocalLatestEventValue, RemoteLatestEventValue, }; -use matrix_sdk_base::{RoomInfoNotableUpdateReasons, RoomState, StateChanges}; +use matrix_sdk_base::{RoomInfoNotableUpdateReasons, RoomState}; use ruma::{EventId, OwnedEventId, UserId, events::room::power_levels::RoomPowerLevels}; use tracing::{error, info, instrument, trace, warn}; @@ -246,26 +246,15 @@ impl LatestEvent { warn!(room_id = ?self.weak_room.room_id(), "Cannot store the latest event value because the room cannot be accessed"); return; }; - - let client = room.client(); - - // Take the state store lock. - let _state_store_lock = client.base_client().state_store_lock().lock().await; - - // Compute a new `RoomInfo`. - let mut room_info = room.clone_info(); - room_info.set_latest_event(new_value); - - let mut state_changes = StateChanges::default(); - state_changes.add_room(room_info.clone()); - - // Update the `RoomInfo` in the state store. - if let Err(error) = client.state_store().save_changes(&state_changes).await { + let result = room + .update_and_save_room_info(|mut info| { + info.set_latest_event(new_value); + (info, RoomInfoNotableUpdateReasons::LATEST_EVENT) + }) + .await; + if let Err(error) = result { error!(room_id = ?room.room_id(), ?error, "Failed to save the changes"); } - - // Update the `RoomInfo` of the room. - room.set_room_info(room_info, RoomInfoNotableUpdateReasons::LATEST_EVENT); } } @@ -386,9 +375,11 @@ mod tests_latest_event { // Set the `RoomInfo`. { - let mut room_info = room.clone_info(); - room_info.set_latest_event(LatestEventValue::LocalIsSending(local_room_message("foo"))); - room.set_room_info(room_info, Default::default()); + room.update_room_info(|mut info| { + info.set_latest_event(LatestEventValue::LocalIsSending(local_room_message("foo"))); + (info, Default::default()) + }) + .await; } // Second time. We get `LocalIsSending` from `RoomInfo`. diff --git a/crates/matrix-sdk/src/latest_events/mod.rs b/crates/matrix-sdk/src/latest_events/mod.rs index 8831df913..3d7bcf246 100644 --- a/crates/matrix-sdk/src/latest_events/mod.rs +++ b/crates/matrix-sdk/src/latest_events/mod.rs @@ -1389,7 +1389,7 @@ mod tests { // `room_0` always has a `LatestEventValue::None` as its the default value. let mut room_info_1 = room_0.clone_info(); room_info_1.set_latest_event(LatestEventValue::LocalIsSending(local_room_message("foo"))); - room_1.set_room_info(room_info_1, Default::default()); + room_1.update_room_info(|_| (room_info_1, Default::default())).await; let weak_client = WeakClient::from_client(&client); diff --git a/crates/matrix-sdk/src/room/mod.rs b/crates/matrix-sdk/src/room/mod.rs index db4448557..17d2e9edc 100644 --- a/crates/matrix-sdk/src/room/mod.rs +++ b/crates/matrix-sdk/src/room/mod.rs @@ -41,7 +41,7 @@ use matrix_sdk_base::crypto::{ pub use matrix_sdk_base::store::StoredThreadSubscription; use matrix_sdk_base::{ ComposerDraft, EncryptionState, RoomInfoNotableUpdateReasons, RoomMemberships, SendOutsideWasm, - StateChanges, StateStoreDataKey, StateStoreDataValue, + StateStoreDataKey, StateStoreDataValue, deserialized_responses::{ RawAnySyncOrStrippedState, RawSyncOrStrippedState, SyncOrStrippedState, }, @@ -1101,18 +1101,14 @@ impl Room { Err(err) => return Err(err.into()), }; - let _state_store_lock = self.client.base_client().state_store_lock().lock().await; - // Persist the event and the fact that we requested it from the server in // `RoomInfo`. - let mut room_info = self.clone_info(); - room_info.mark_encryption_state_synced(); - room_info.set_encryption_event(response); - let mut changes = StateChanges::default(); - changes.add_room(room_info.clone()); - - self.client.state_store().save_changes(&changes).await?; - self.set_room_info(room_info, RoomInfoNotableUpdateReasons::empty()); + self.update_and_save_room_info(|mut room_info| { + room_info.mark_encryption_state_synced(); + room_info.set_encryption_event(response); + (room_info, RoomInfoNotableUpdateReasons::empty()) + }) + .await?; Ok(()) }) @@ -2266,7 +2262,7 @@ impl Room { ) .await; - let _state_store_lock = self.client.base_client().state_store_lock().lock().await; + let store_guard = self.client.base_client().state_store_lock().lock().await; // If encryption was enabled, return. #[cfg(not(feature = "experimental-encrypted-state-events"))] @@ -2294,13 +2290,11 @@ impl Room { // assuming it's sync'd and correct (and not encrypted). debug!("still not marked as encrypted, marking encryption state as missing"); - let mut room_info = self.clone_info(); - room_info.mark_encryption_state_missing(); - let mut changes = StateChanges::default(); - changes.add_room(room_info.clone()); - - self.client.state_store().save_changes(&changes).await?; - self.set_room_info(room_info, RoomInfoNotableUpdateReasons::empty()); + self.update_and_save_room_info_with_store_guard(&store_guard, |mut info| { + info.mark_encryption_state_missing(); + (info, RoomInfoNotableUpdateReasons::empty()) + }) + .await?; } Ok(()) diff --git a/crates/matrix-sdk/tests/integration/room/left.rs b/crates/matrix-sdk/tests/integration/room/left.rs index 02faa287d..a962dcfcf 100644 --- a/crates/matrix-sdk/tests/integration/room/left.rs +++ b/crates/matrix-sdk/tests/integration/room/left.rs @@ -144,9 +144,11 @@ async fn test_forget_banned_room() { // Make the room banned let room = client.get_room(&DEFAULT_TEST_ROOM_ID).unwrap(); - let mut room_info = room.clone_info(); - room_info.mark_as_banned(); - room.set_room_info(room_info, RoomInfoNotableUpdateReasons::MEMBERSHIP); + room.update_room_info(|mut room_info| { + room_info.mark_as_banned(); + (room_info, RoomInfoNotableUpdateReasons::MEMBERSHIP) + }) + .await; assert_eq!(room.state(), RoomState::Banned); room.forget().await.unwrap();