mirror of
https://github.com/matrix-org/matrix-rust-sdk.git
synced 2026-06-10 17:34:20 -04:00
refactor: ensure all room info updates are atomic
Signed-off-by: Michael Goldenberg <m@mgoldenberg.net>
This commit is contained in:
committed by
Damir Jelić
parent
efd96d9299
commit
4298648d05
@@ -405,24 +405,22 @@ impl BaseClient {
|
||||
let room = self.state_store.get_or_create_room(room_id, RoomState::Knocked);
|
||||
|
||||
if room.state() != RoomState::Knocked {
|
||||
let _state_store_lock = self.state_store_lock().lock().await;
|
||||
|
||||
let mut room_info = room.clone_info();
|
||||
room_info.mark_as_knocked();
|
||||
room_info.mark_state_partially_synced();
|
||||
room_info.mark_members_missing(); // the own member event changed
|
||||
let store_guard = self.state_store.lock().lock().await;
|
||||
|
||||
// We are no longer joined to the room, so the invite acceptance details are no
|
||||
// longer relevant.
|
||||
#[cfg(feature = "e2e-encryption")]
|
||||
if let Some(olm_machine) = self.olm_machine().await.as_ref() {
|
||||
olm_machine.store().clear_room_pending_key_bundle(room_info.room_id()).await?
|
||||
olm_machine.store().clear_room_pending_key_bundle(room_id).await?
|
||||
}
|
||||
|
||||
let mut changes = StateChanges::default();
|
||||
changes.add_room(room_info.clone());
|
||||
self.state_store.save_changes(&changes).await?; // Update the store
|
||||
room.set_room_info(room_info, RoomInfoNotableUpdateReasons::MEMBERSHIP);
|
||||
room.update_and_save_room_info_with_store_guard(&store_guard, |mut info| {
|
||||
info.mark_as_knocked();
|
||||
info.mark_state_partially_synced();
|
||||
info.mark_members_missing(); // the own member event changed
|
||||
(info, RoomInfoNotableUpdateReasons::MEMBERSHIP)
|
||||
})
|
||||
.await?;
|
||||
}
|
||||
|
||||
Ok(room)
|
||||
@@ -481,13 +479,7 @@ impl BaseClient {
|
||||
// If the state isn't `RoomState::Joined` then this means that we knew about
|
||||
// this room before. Let's modify the existing state now.
|
||||
if room.state() != RoomState::Joined {
|
||||
let _state_store_lock = self.state_store_lock().lock().await;
|
||||
|
||||
let mut room_info = room.clone_info();
|
||||
|
||||
room_info.mark_as_joined();
|
||||
room_info.mark_state_partially_synced();
|
||||
room_info.mark_members_missing(); // the own member event changed
|
||||
let store_guard = self.state_store_lock().lock().await;
|
||||
|
||||
#[cfg(feature = "e2e-encryption")]
|
||||
{
|
||||
@@ -515,12 +507,13 @@ impl BaseClient {
|
||||
let _ = inviter;
|
||||
}
|
||||
|
||||
let mut changes = StateChanges::default();
|
||||
changes.add_room(room_info.clone());
|
||||
|
||||
self.state_store.save_changes(&changes).await?; // Update the store
|
||||
|
||||
room.set_room_info(room_info, RoomInfoNotableUpdateReasons::MEMBERSHIP);
|
||||
room.update_and_save_room_info_with_store_guard(&store_guard, |mut info| {
|
||||
info.mark_as_joined();
|
||||
info.mark_state_partially_synced();
|
||||
info.mark_members_missing(); // the own member event changed
|
||||
(info, RoomInfoNotableUpdateReasons::MEMBERSHIP)
|
||||
})
|
||||
.await?;
|
||||
}
|
||||
|
||||
Ok(room)
|
||||
@@ -533,24 +526,22 @@ impl BaseClient {
|
||||
let room = self.state_store.get_or_create_room(room_id, RoomState::Left);
|
||||
|
||||
if room.state() != RoomState::Left {
|
||||
let _state_store_lock = self.state_store_lock().lock().await;
|
||||
|
||||
let mut room_info = room.clone_info();
|
||||
room_info.mark_as_left();
|
||||
room_info.mark_state_partially_synced();
|
||||
room_info.mark_members_missing(); // the own member event changed
|
||||
let store_guard = self.state_store.lock().lock().await;
|
||||
|
||||
// We are no longer joined to the room, so the invite acceptance details are no
|
||||
// longer relevant.
|
||||
#[cfg(feature = "e2e-encryption")]
|
||||
if let Some(olm_machine) = self.olm_machine().await.as_ref() {
|
||||
olm_machine.store().clear_room_pending_key_bundle(room_info.room_id()).await?
|
||||
olm_machine.store().clear_room_pending_key_bundle(room_id).await?
|
||||
}
|
||||
|
||||
let mut changes = StateChanges::default();
|
||||
changes.add_room(room_info.clone());
|
||||
self.state_store.save_changes(&changes).await?; // Update the store
|
||||
room.set_room_info(room_info, RoomInfoNotableUpdateReasons::MEMBERSHIP);
|
||||
room.update_and_save_room_info_with_store_guard(&store_guard, |mut info| {
|
||||
info.mark_as_left();
|
||||
info.mark_state_partially_synced();
|
||||
info.mark_members_missing(); // the own member event changed
|
||||
(info, RoomInfoNotableUpdateReasons::MEMBERSHIP)
|
||||
})
|
||||
.await?;
|
||||
}
|
||||
|
||||
Ok(())
|
||||
|
||||
@@ -33,7 +33,7 @@ pub async fn save_only(context: Context, state_store: &BaseStateStore) -> Result
|
||||
let _timer = timer!(tracing::Level::TRACE, "_method");
|
||||
|
||||
save_changes(&context, state_store, None).await?;
|
||||
broadcast_room_info_notable_updates(&context, state_store);
|
||||
broadcast_room_info_notable_updates(&context, state_store).await;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
@@ -56,7 +56,7 @@ pub async fn save_and_apply(
|
||||
|
||||
save_changes(&context, state_store, sync_token).await?;
|
||||
apply_changes(&context, ignore_user_list_changes, previous_ignored_user_list);
|
||||
broadcast_room_info_notable_updates(&context, state_store);
|
||||
broadcast_room_info_notable_updates(&context, state_store).await;
|
||||
|
||||
trace!("applied changes");
|
||||
|
||||
@@ -118,13 +118,12 @@ fn apply_changes(
|
||||
}
|
||||
}
|
||||
|
||||
fn broadcast_room_info_notable_updates(context: &Context, state_store: &BaseStateStore) {
|
||||
async fn broadcast_room_info_notable_updates(context: &Context, state_store: &BaseStateStore) {
|
||||
for (room_id, room_info) in &context.state_changes.room_infos {
|
||||
if let Some(room) = state_store.room(room_id) {
|
||||
let room_info_notable_update_reasons =
|
||||
context.room_info_notable_updates.get(room_id).copied().unwrap_or_default();
|
||||
|
||||
room.set_room_info(room_info.clone(), room_info_notable_update_reasons)
|
||||
room.update_room_info(|_| (room_info.clone(), room_info_notable_update_reasons)).await;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -156,7 +156,7 @@ impl Room {
|
||||
}
|
||||
|
||||
/// Update the summary with given RoomInfo.
|
||||
pub fn set_room_info(
|
||||
fn set_room_info(
|
||||
&self,
|
||||
room_info: RoomInfo,
|
||||
room_info_notable_update_reasons: RoomInfoNotableUpdateReasons,
|
||||
|
||||
@@ -163,17 +163,21 @@ mod tests {
|
||||
})
|
||||
}
|
||||
|
||||
fn set_latest_event_value(room: &mut RoomListItem, latest_event_value: LatestEventValue) {
|
||||
let mut room_info = room.clone_info();
|
||||
room_info.set_latest_event(latest_event_value);
|
||||
room.set_room_info(room_info, RoomInfoNotableUpdateReasons::LATEST_EVENT);
|
||||
async fn set_latest_event_value(room: &mut RoomListItem, latest_event_value: LatestEventValue) {
|
||||
room.update_room_info(|mut info| {
|
||||
info.set_latest_event(latest_event_value);
|
||||
(info, RoomInfoNotableUpdateReasons::LATEST_EVENT)
|
||||
})
|
||||
.await;
|
||||
room.refresh_cached_data();
|
||||
}
|
||||
|
||||
fn set_recency_stamp(room: &mut RoomListItem, recency_stamp: RoomRecencyStamp) {
|
||||
let mut room_info = room.clone_info();
|
||||
room_info.update_recency_stamp(recency_stamp);
|
||||
room.set_room_info(room_info, RoomInfoNotableUpdateReasons::RECENCY_STAMP);
|
||||
async fn set_recency_stamp(room: &mut RoomListItem, recency_stamp: RoomRecencyStamp) {
|
||||
room.update_room_info(|mut info| {
|
||||
info.update_recency_stamp(recency_stamp);
|
||||
(info, RoomInfoNotableUpdateReasons::RECENCY_STAMP)
|
||||
})
|
||||
.await;
|
||||
room.refresh_cached_data();
|
||||
}
|
||||
|
||||
@@ -184,15 +188,15 @@ mod tests {
|
||||
let [mut room_a, mut room_b] =
|
||||
new_rooms([room_id!("!a:b.c"), room_id!("!d:e.f")], &client, &server).await;
|
||||
|
||||
set_recency_stamp(&mut room_a, 1.into());
|
||||
set_recency_stamp(&mut room_b, 2.into());
|
||||
set_recency_stamp(&mut room_a, 1.into()).await;
|
||||
set_recency_stamp(&mut room_b, 2.into()).await;
|
||||
|
||||
// Both rooms have a `LatestEventValue::None`.
|
||||
//
|
||||
// Because there is no latest event, the recency stamp MUST BE USED.
|
||||
{
|
||||
set_latest_event_value(&mut room_a, none());
|
||||
set_latest_event_value(&mut room_b, none());
|
||||
set_latest_event_value(&mut room_a, none()).await;
|
||||
set_latest_event_value(&mut room_b, none()).await;
|
||||
|
||||
assert_eq!(extract_scores(&room_a, &room_b), (Some(1), Some(2)));
|
||||
}
|
||||
@@ -201,8 +205,8 @@ mod tests {
|
||||
//
|
||||
// One of the room has a latest event, so the recency stamp MUST BE IGNORED.
|
||||
{
|
||||
set_latest_event_value(&mut room_a, none());
|
||||
set_latest_event_value(&mut room_b, remote(3));
|
||||
set_latest_event_value(&mut room_a, none()).await;
|
||||
set_latest_event_value(&mut room_b, remote(3)).await;
|
||||
|
||||
assert_eq!(extract_scores(&room_a, &room_b), (None, Some(3)));
|
||||
}
|
||||
@@ -211,8 +215,8 @@ mod tests {
|
||||
//
|
||||
// One of the room has a latest event, so the recency stamp MUST BE IGNORED.
|
||||
{
|
||||
set_latest_event_value(&mut room_a, remote(3));
|
||||
set_latest_event_value(&mut room_b, none());
|
||||
set_latest_event_value(&mut room_a, remote(3)).await;
|
||||
set_latest_event_value(&mut room_b, none()).await;
|
||||
|
||||
assert_eq!(extract_scores(&room_a, &room_b), (Some(3), None));
|
||||
}
|
||||
@@ -225,8 +229,8 @@ mod tests {
|
||||
let [mut room_a, mut room_b] =
|
||||
new_rooms([room_id!("!a:b.c"), room_id!("!d:e.f")], &client, &server).await;
|
||||
|
||||
set_recency_stamp(&mut room_a, 1.into());
|
||||
set_recency_stamp(&mut room_b, 2.into());
|
||||
set_recency_stamp(&mut room_a, 1.into()).await;
|
||||
set_recency_stamp(&mut room_b, 2.into()).await;
|
||||
|
||||
// `room_a` and `room_b` has either `Remote` or `Local*`.
|
||||
//
|
||||
@@ -236,8 +240,8 @@ mod tests {
|
||||
for latest_event_value_b in
|
||||
[remote(4), local_is_sending(4), local_cannot_be_sent(4)]
|
||||
{
|
||||
set_latest_event_value(&mut room_a, latest_event_value_a.clone());
|
||||
set_latest_event_value(&mut room_b, latest_event_value_b);
|
||||
set_latest_event_value(&mut room_a, latest_event_value_a.clone()).await;
|
||||
set_latest_event_value(&mut room_b, latest_event_value_b).await;
|
||||
|
||||
assert_eq!(extract_scores(&room_a, &room_b), (Some(3), Some(4)));
|
||||
}
|
||||
|
||||
@@ -23,8 +23,7 @@ use std::{
|
||||
use eyeball::SharedObservable;
|
||||
use eyeball_im::VectorDiff;
|
||||
use matrix_sdk_base::{
|
||||
RoomInfoNotableUpdateReasons, StateChanges, apply_redaction,
|
||||
check_validity_of_replacement_events,
|
||||
RoomInfoNotableUpdateReasons, apply_redaction, check_validity_of_replacement_events,
|
||||
deserialized_responses::{ThreadSummary, ThreadSummaryStatus},
|
||||
event_cache::{
|
||||
Event, Gap,
|
||||
@@ -1059,27 +1058,15 @@ impl<'a> RoomEventCacheStateLockWriteGuard<'a> {
|
||||
// The read receipt has changed! Do a little dance to update the `RoomInfo` in
|
||||
// the state store, and then in the room itself, so that observers
|
||||
// can be notified of the change.
|
||||
let client = room.client();
|
||||
|
||||
// Take the state store lock.
|
||||
let _state_store_lock = client.base_client().state_store_lock().lock().await;
|
||||
|
||||
// Don't reuse the room info from above, as it might have changed in the
|
||||
// meanwhile. This access is somewhat protected by the state store locking, even
|
||||
// though other code may call `set_room_info` concurrently.
|
||||
let mut room_info = room.clone_info();
|
||||
room_info.set_read_receipts(read_receipts);
|
||||
|
||||
let mut state_changes = StateChanges::default();
|
||||
state_changes.add_room(room_info.clone());
|
||||
|
||||
// Update the `RoomInfo` in the state store.
|
||||
if let Err(error) = client.state_store().save_changes(&state_changes).await {
|
||||
let result = room
|
||||
.update_and_save_room_info(|mut room_info| {
|
||||
room_info.set_read_receipts(read_receipts);
|
||||
(room_info, RoomInfoNotableUpdateReasons::READ_RECEIPT)
|
||||
})
|
||||
.await;
|
||||
if let Err(error) = result {
|
||||
error!(room_id = ?room.room_id(), ?error, "Failed to save the changes");
|
||||
}
|
||||
|
||||
// Update the `RoomInfo` of the room.
|
||||
room.set_room_info(room_info, RoomInfoNotableUpdateReasons::READ_RECEIPT);
|
||||
}
|
||||
|
||||
Ok(())
|
||||
|
||||
@@ -21,7 +21,7 @@ use eyeball::{AsyncLock, ObservableWriteGuard, SharedObservable, Subscriber};
|
||||
pub use matrix_sdk_base::latest_event::{
|
||||
LatestEventValue, LocalLatestEventValue, RemoteLatestEventValue,
|
||||
};
|
||||
use matrix_sdk_base::{RoomInfoNotableUpdateReasons, RoomState, StateChanges};
|
||||
use matrix_sdk_base::{RoomInfoNotableUpdateReasons, RoomState};
|
||||
use ruma::{EventId, OwnedEventId, UserId, events::room::power_levels::RoomPowerLevels};
|
||||
use tracing::{error, info, instrument, trace, warn};
|
||||
|
||||
@@ -246,26 +246,15 @@ impl LatestEvent {
|
||||
warn!(room_id = ?self.weak_room.room_id(), "Cannot store the latest event value because the room cannot be accessed");
|
||||
return;
|
||||
};
|
||||
|
||||
let client = room.client();
|
||||
|
||||
// Take the state store lock.
|
||||
let _state_store_lock = client.base_client().state_store_lock().lock().await;
|
||||
|
||||
// Compute a new `RoomInfo`.
|
||||
let mut room_info = room.clone_info();
|
||||
room_info.set_latest_event(new_value);
|
||||
|
||||
let mut state_changes = StateChanges::default();
|
||||
state_changes.add_room(room_info.clone());
|
||||
|
||||
// Update the `RoomInfo` in the state store.
|
||||
if let Err(error) = client.state_store().save_changes(&state_changes).await {
|
||||
let result = room
|
||||
.update_and_save_room_info(|mut info| {
|
||||
info.set_latest_event(new_value);
|
||||
(info, RoomInfoNotableUpdateReasons::LATEST_EVENT)
|
||||
})
|
||||
.await;
|
||||
if let Err(error) = result {
|
||||
error!(room_id = ?room.room_id(), ?error, "Failed to save the changes");
|
||||
}
|
||||
|
||||
// Update the `RoomInfo` of the room.
|
||||
room.set_room_info(room_info, RoomInfoNotableUpdateReasons::LATEST_EVENT);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -386,9 +375,11 @@ mod tests_latest_event {
|
||||
|
||||
// Set the `RoomInfo`.
|
||||
{
|
||||
let mut room_info = room.clone_info();
|
||||
room_info.set_latest_event(LatestEventValue::LocalIsSending(local_room_message("foo")));
|
||||
room.set_room_info(room_info, Default::default());
|
||||
room.update_room_info(|mut info| {
|
||||
info.set_latest_event(LatestEventValue::LocalIsSending(local_room_message("foo")));
|
||||
(info, Default::default())
|
||||
})
|
||||
.await;
|
||||
}
|
||||
|
||||
// Second time. We get `LocalIsSending` from `RoomInfo`.
|
||||
|
||||
@@ -1389,7 +1389,7 @@ mod tests {
|
||||
// `room_0` always has a `LatestEventValue::None` as its the default value.
|
||||
let mut room_info_1 = room_0.clone_info();
|
||||
room_info_1.set_latest_event(LatestEventValue::LocalIsSending(local_room_message("foo")));
|
||||
room_1.set_room_info(room_info_1, Default::default());
|
||||
room_1.update_room_info(|_| (room_info_1, Default::default())).await;
|
||||
|
||||
let weak_client = WeakClient::from_client(&client);
|
||||
|
||||
|
||||
@@ -41,7 +41,7 @@ use matrix_sdk_base::crypto::{
|
||||
pub use matrix_sdk_base::store::StoredThreadSubscription;
|
||||
use matrix_sdk_base::{
|
||||
ComposerDraft, EncryptionState, RoomInfoNotableUpdateReasons, RoomMemberships, SendOutsideWasm,
|
||||
StateChanges, StateStoreDataKey, StateStoreDataValue,
|
||||
StateStoreDataKey, StateStoreDataValue,
|
||||
deserialized_responses::{
|
||||
RawAnySyncOrStrippedState, RawSyncOrStrippedState, SyncOrStrippedState,
|
||||
},
|
||||
@@ -1101,18 +1101,14 @@ impl Room {
|
||||
Err(err) => return Err(err.into()),
|
||||
};
|
||||
|
||||
let _state_store_lock = self.client.base_client().state_store_lock().lock().await;
|
||||
|
||||
// Persist the event and the fact that we requested it from the server in
|
||||
// `RoomInfo`.
|
||||
let mut room_info = self.clone_info();
|
||||
room_info.mark_encryption_state_synced();
|
||||
room_info.set_encryption_event(response);
|
||||
let mut changes = StateChanges::default();
|
||||
changes.add_room(room_info.clone());
|
||||
|
||||
self.client.state_store().save_changes(&changes).await?;
|
||||
self.set_room_info(room_info, RoomInfoNotableUpdateReasons::empty());
|
||||
self.update_and_save_room_info(|mut room_info| {
|
||||
room_info.mark_encryption_state_synced();
|
||||
room_info.set_encryption_event(response);
|
||||
(room_info, RoomInfoNotableUpdateReasons::empty())
|
||||
})
|
||||
.await?;
|
||||
|
||||
Ok(())
|
||||
})
|
||||
@@ -2266,7 +2262,7 @@ impl Room {
|
||||
)
|
||||
.await;
|
||||
|
||||
let _state_store_lock = self.client.base_client().state_store_lock().lock().await;
|
||||
let store_guard = self.client.base_client().state_store_lock().lock().await;
|
||||
|
||||
// If encryption was enabled, return.
|
||||
#[cfg(not(feature = "experimental-encrypted-state-events"))]
|
||||
@@ -2294,13 +2290,11 @@ impl Room {
|
||||
// assuming it's sync'd and correct (and not encrypted).
|
||||
debug!("still not marked as encrypted, marking encryption state as missing");
|
||||
|
||||
let mut room_info = self.clone_info();
|
||||
room_info.mark_encryption_state_missing();
|
||||
let mut changes = StateChanges::default();
|
||||
changes.add_room(room_info.clone());
|
||||
|
||||
self.client.state_store().save_changes(&changes).await?;
|
||||
self.set_room_info(room_info, RoomInfoNotableUpdateReasons::empty());
|
||||
self.update_and_save_room_info_with_store_guard(&store_guard, |mut info| {
|
||||
info.mark_encryption_state_missing();
|
||||
(info, RoomInfoNotableUpdateReasons::empty())
|
||||
})
|
||||
.await?;
|
||||
}
|
||||
|
||||
Ok(())
|
||||
|
||||
@@ -144,9 +144,11 @@ async fn test_forget_banned_room() {
|
||||
|
||||
// Make the room banned
|
||||
let room = client.get_room(&DEFAULT_TEST_ROOM_ID).unwrap();
|
||||
let mut room_info = room.clone_info();
|
||||
room_info.mark_as_banned();
|
||||
room.set_room_info(room_info, RoomInfoNotableUpdateReasons::MEMBERSHIP);
|
||||
room.update_room_info(|mut room_info| {
|
||||
room_info.mark_as_banned();
|
||||
(room_info, RoomInfoNotableUpdateReasons::MEMBERSHIP)
|
||||
})
|
||||
.await;
|
||||
assert_eq!(room.state(), RoomState::Banned);
|
||||
|
||||
room.forget().await.unwrap();
|
||||
|
||||
Reference in New Issue
Block a user