fix(sdk): Fix a race-condition in EventCache.

This patch ensures that operations on `RoomEvents` happen in one block,
by sharing the same lock.

2 new methods are created: `replace_all_events_by` and
`append_new_events`.
This commit is contained in:
Ivan Enderlin
2024-03-20 21:54:34 +01:00
parent fa5bbadf57
commit f61de718b8

View File

@@ -62,7 +62,7 @@ use ruma::{
use tokio::{
sync::{
broadcast::{error::RecvError, Receiver, Sender},
Mutex, Notify, RwLock, RwLockReadGuard,
Mutex, Notify, RwLock, RwLockReadGuard, RwLockWriteGuard,
},
time::timeout,
};
@@ -249,13 +249,14 @@ impl EventCache {
// We could have received events during a previous sync; remove them all, since
// we can't know where to insert the "initial events" with respect to
// them.
room_cache.inner.events.write().await.reset();
// let mut room_events = room_cache.inner.events.write().await;
// room_events.reset();
let _ = room_cache.inner.sender.send(RoomEventCacheUpdate::Clear);
// let _ = room_cache.inner.sender.send(RoomEventCacheUpdate::Clear);
room_cache
.inner
.append_events(
.replace_all_events_by(
events,
prev_batch,
Default::default(),
@@ -482,26 +483,30 @@ impl RoomEventCacheInner {
// Ideally we'd try to reconcile existing events against those received in the
// timeline, but we're not there yet. In the meanwhile, clear the
// items from the room. TODO: implement Smart Matching™.
trace!("limited timeline, clearing all previous events");
trace!("limited timeline, clearing all previous events and pushing new events");
// Clear internal state (events, pagination tokens, etc.).
self.events.write().await.reset();
self.replace_all_events_by(
timeline.events,
timeline.prev_batch,
account_data,
ephemeral,
ambiguity_changes,
)
.await?;
} else {
// Add all the events to the backend.
trace!("adding new events");
// Propagate to observers.
let _ = self.sender.send(RoomEventCacheUpdate::Clear);
self.append_new_events(
timeline.events,
timeline.prev_batch,
account_data,
ephemeral,
ambiguity_changes,
)
.await?;
}
// Add all the events to the backend.
trace!("adding new events");
self.append_events(
timeline.events,
timeline.prev_batch,
account_data,
ephemeral,
ambiguity_changes,
)
.await?;
Ok(())
}
@@ -511,15 +516,71 @@ impl RoomEventCacheInner {
Ok(())
}
// Remove existing events, and append a set of events to the room cache and
// storage, notifying observers.
async fn replace_all_events_by(
&self,
events: Vec<SyncTimelineEvent>,
prev_batch: Option<String>,
account_data: Vec<Raw<AnyRoomAccountDataEvent>>,
ephemeral: Vec<Raw<AnySyncEphemeralRoomEvent>>,
ambiguity_changes: BTreeMap<OwnedEventId, AmbiguityChange>,
) -> Result<()> {
// Acquire the lock.
let mut room_events = self.events.write().await;
// Reset the events.
room_events.reset();
// Propagate to observers.
let _ = self.sender.send(RoomEventCacheUpdate::Clear);
// Push the new events.
self.append_events_locked_impl(
room_events,
events,
prev_batch,
account_data,
ephemeral,
ambiguity_changes,
)
.await
}
/// Append a set of events to the room cache and storage, notifying
/// observers.
async fn append_events(
async fn append_new_events(
&self,
events: Vec<SyncTimelineEvent>,
prev_batch: Option<String>,
account_data: Vec<Raw<AnyRoomAccountDataEvent>>,
ephemeral: Vec<Raw<AnySyncEphemeralRoomEvent>>,
ambiguity_changes: BTreeMap<OwnedEventId, AmbiguityChange>,
) -> Result<()> {
self.append_events_locked_impl(
self.events.write().await,
events,
prev_batch,
account_data,
ephemeral,
ambiguity_changes,
)
.await
}
/// Append a set of events, with an attached lock.
///
/// If the lock `room_events` is `None`, one will be created.
///
/// This is a private implementation. It must not be exposed publicly.
async fn append_events_locked_impl(
&self,
mut room_events: RwLockWriteGuard<'_, RoomEvents>,
events: Vec<SyncTimelineEvent>,
prev_batch: Option<String>,
account_data: Vec<Raw<AnyRoomAccountDataEvent>>,
ephemeral: Vec<Raw<AnySyncEphemeralRoomEvent>>,
ambiguity_changes: BTreeMap<OwnedEventId, AmbiguityChange>,
) -> Result<()> {
if events.is_empty()
&& prev_batch.is_none()
@@ -533,8 +594,6 @@ impl RoomEventCacheInner {
// Add the previous back-pagination token (if present), followed by the timeline
// events themselves.
{
let mut room_events = self.events.write().await;
if let Some(prev_token) = &prev_batch {
room_events.push_gap(Gap { prev_token: PaginationToken(prev_token.clone()) });
}