timeline: prevent deadlock in replace_with_initial_events

The `state` lock was taken at the top level of this function, and
indirectly implicitly in the `set_fully_read_event` function. This fixes
it, and adds a regression test.
This commit is contained in:
Benjamin Bouvier
2024-04-22 19:47:21 +02:00
parent c471ee42ab
commit 13cc7962e7
5 changed files with 118 additions and 75 deletions

View File

@@ -32,7 +32,6 @@ use ruma::RoomId;
use ruma::{
api::client::receipt::create_receipt::v3::ReceiptType as SendReceiptType,
events::{
fully_read::FullyReadEvent,
poll::unstable_start::UnstablePollStartEventContent,
reaction::ReactionEventContent,
receipt::{Receipt, ReceiptThread, ReceiptType},
@@ -433,6 +432,44 @@ impl<P: RoomDataProvider> TimelineInner<P> {
self.state.write().await.clear();
}
/// Replaces the content of the current timeline with initial events.
///
/// Also sets up read receipts and the read marker for a live timeline of a
/// room.
///
/// This is all done with a single lock guard, since we don't want the state
/// to be modified between the clear and re-insertion of new events.
pub(super) async fn replace_with_initial_events(&self, events: Vec<SyncTimelineEvent>) {
let mut state = self.state.write().await;
state.clear();
let track_read_markers = self.settings.track_read_receipts;
if track_read_markers {
self.populate_initial_user_receipt(ReceiptType::Read).await;
self.populate_initial_user_receipt(ReceiptType::ReadPrivate).await;
}
if !events.is_empty() {
state
.add_events_at(
events,
TimelineEnd::Back { from_cache: true },
&self.room_data_provider,
&self.settings,
)
.await;
}
if track_read_markers {
if let Some(fully_read_event_id) =
self.room_data_provider.load_fully_read_marker().await
{
state.set_fully_read_event(fully_read_event_id);
}
}
}
pub(super) async fn handle_fully_read_marker(&self, fully_read_event_id: OwnedEventId) {
self.state.write().await.handle_fully_read_marker(fully_read_event_id);
}
@@ -674,6 +711,7 @@ impl<P: RoomDataProvider> TimelineInner<P> {
}
}
#[cfg(any(test, feature = "testing"))]
pub(super) async fn set_fully_read_event(&self, fully_read_event_id: OwnedEventId) {
self.state.write().await.set_fully_read_event(fully_read_event_id);
}
@@ -950,65 +988,6 @@ impl TimelineInner {
&self.room_data_provider
}
/// Replaces the content of the current timeline with initial events.
///
/// Also sets up read receipts and the read marker for a live timeline of a
/// room.
///
/// This is all done with a single lock guard, since we don't want the state
/// to be modified between the clear and re-insertion of new events.
pub(super) async fn replace_with_initial_events(&self, events: Vec<SyncTimelineEvent>) {
let mut state = self.state.write().await;
state.clear();
let track_read_markers = self.settings.track_read_receipts;
if track_read_markers {
self.populate_initial_user_receipt(ReceiptType::Read).await;
self.populate_initial_user_receipt(ReceiptType::ReadPrivate).await;
}
if !events.is_empty() {
state
.add_events_at(
events,
TimelineEnd::Back { from_cache: true },
&self.room_data_provider,
&self.settings,
)
.await;
}
if track_read_markers {
self.load_fully_read_event().await;
}
}
/// Get the current fully-read event, from storage.
pub(super) async fn fully_read_event(&self) -> Option<FullyReadEvent> {
match self.room().account_data_static().await {
Ok(Some(fully_read)) => match fully_read.deserialize() {
Ok(fully_read) => Some(fully_read),
Err(e) => {
error!("Failed to deserialize fully-read account data: {e}");
None
}
},
Err(e) => {
error!("Failed to get fully-read account data from the store: {e}");
None
}
_ => None,
}
}
/// Load the current fully-read event in this inner timeline from storage.
pub(super) async fn load_fully_read_event(&self) {
if let Some(fully_read) = self.fully_read_event().await {
self.set_fully_read_event(fully_read.content.event_id).await;
}
}
#[instrument(skip(self))]
pub(super) async fn fetch_in_reply_to_details(
&self,
@@ -1126,10 +1105,10 @@ impl TimelineInner {
}
}
SendReceiptType::FullyRead => {
if let Some(old_fully_read) = self.fully_read_event().await {
if let Some(relative_pos) = state
.meta
.compare_events_positions(&old_fully_read.content.event_id, event_id)
if let Some(prev_event_id) = self.room_data_provider.load_fully_read_marker().await
{
if let Some(relative_pos) =
state.meta.compare_events_positions(&prev_event_id, event_id)
{
return relative_pos == RelativePosition::After;
}

View File

@@ -15,6 +15,7 @@
use assert_matches::assert_matches;
use assert_matches2::assert_let;
use eyeball_im::VectorDiff;
use matrix_sdk::test_utils::events::EventFactory;
use matrix_sdk_base::deserialized_responses::SyncTimelineEvent;
use matrix_sdk_test::{async_test, sync_timeline_event, ALICE, BOB, CAROL};
use ruma::{
@@ -29,13 +30,16 @@ use ruma::{
},
FullStateEventContent,
},
owned_event_id,
};
use stream_assert::assert_next_matches;
use super::TestTimeline;
use crate::timeline::{
event_item::AnyOtherFullStateEventContent, inner::TimelineEnd, MembershipChange,
TimelineDetails, TimelineItemContent, TimelineItemKind, VirtualTimelineItem,
event_item::AnyOtherFullStateEventContent,
inner::{TimelineEnd, TimelineInnerSettings},
tests::TestRoomDataProvider,
MembershipChange, TimelineDetails, TimelineItemContent, TimelineItemKind, VirtualTimelineItem,
};
#[async_test]
@@ -72,6 +76,33 @@ async fn test_initial_events() {
assert_matches!(&item.kind, TimelineItemKind::Virtual(VirtualTimelineItem::DayDivider(_)));
}
#[async_test]
async fn test_replace_with_initial_events_and_read_marker() {
let event_id = owned_event_id!("$1");
let timeline = TestTimeline::with_room_data_provider(
TestRoomDataProvider::default().with_fully_read_marker(event_id),
)
.with_settings(TimelineInnerSettings { track_read_receipts: true, ..Default::default() });
let factory = EventFactory::new();
let ev = factory.text_msg("hey").sender(*ALICE).into_sync();
timeline.inner.add_events_at(vec![ev], TimelineEnd::Back { from_cache: false }).await;
let items = timeline.inner.items().await;
assert_eq!(items.len(), 2);
assert!(items[0].is_day_divider());
assert_eq!(items[1].as_event().unwrap().content().as_message().unwrap().body(), "hey");
let ev = factory.text_msg("yo").sender(*BOB).into_sync();
timeline.inner.replace_with_initial_events(vec![ev]).await;
let items = timeline.inner.items().await;
assert_eq!(items.len(), 2);
assert!(items[0].is_day_divider());
assert_eq!(items[1].as_event().unwrap().content().as_message().unwrap().body(), "yo");
}
#[async_test]
async fn test_sticker() {
let timeline = TestTimeline::new();

View File

@@ -281,11 +281,17 @@ type ReadReceiptMap =
#[derive(Clone, Default)]
struct TestRoomDataProvider {
initial_user_receipts: ReadReceiptMap,
fully_read_marker: Option<OwnedEventId>,
}
impl TestRoomDataProvider {
fn with_initial_user_receipts(initial_user_receipts: ReadReceiptMap) -> Self {
Self { initial_user_receipts }
fn with_initial_user_receipts(mut self, initial_user_receipts: ReadReceiptMap) -> Self {
self.initial_user_receipts = initial_user_receipts;
self
}
fn with_fully_read_marker(mut self, event_id: OwnedEventId) -> Self {
self.fully_read_marker = Some(event_id);
self
}
}
@@ -345,6 +351,10 @@ impl RoomDataProvider for TestRoomDataProvider {
Some((push_rules, push_context))
}
async fn load_fully_read_marker(&self) -> Option<OwnedEventId> {
self.fully_read_marker.clone()
}
}
pub(super) async fn assert_event_is_updated(

View File

@@ -490,7 +490,7 @@ async fn test_initial_public_unthreaded_receipt() {
);
let timeline = TestTimeline::with_room_data_provider(
TestRoomDataProvider::with_initial_user_receipts(initial_user_receipts),
TestRoomDataProvider::default().with_initial_user_receipts(initial_user_receipts),
)
.with_settings(TimelineInnerSettings { track_read_receipts: true, ..Default::default() });
@@ -515,7 +515,7 @@ async fn test_initial_public_main_thread_receipt() {
);
let timeline = TestTimeline::with_room_data_provider(
TestRoomDataProvider::with_initial_user_receipts(initial_user_receipts),
TestRoomDataProvider::default().with_initial_user_receipts(initial_user_receipts),
)
.with_settings(TimelineInnerSettings { track_read_receipts: true, ..Default::default() });
@@ -540,7 +540,7 @@ async fn test_initial_private_unthreaded_receipt() {
);
let timeline = TestTimeline::with_room_data_provider(
TestRoomDataProvider::with_initial_user_receipts(initial_user_receipts),
TestRoomDataProvider::default().with_initial_user_receipts(initial_user_receipts),
)
.with_settings(TimelineInnerSettings { track_read_receipts: true, ..Default::default() });
@@ -565,7 +565,7 @@ async fn test_initial_private_main_thread_receipt() {
);
let timeline = TestTimeline::with_room_data_provider(
TestRoomDataProvider::with_initial_user_receipts(initial_user_receipts),
TestRoomDataProvider::default().with_initial_user_receipts(initial_user_receipts),
)
.with_settings(TimelineInnerSettings { track_read_receipts: true, ..Default::default() });

View File

@@ -18,13 +18,16 @@ use indexmap::IndexMap;
use matrix_sdk::{deserialized_responses::TimelineEvent, Result};
use matrix_sdk::{event_cache, Room};
use matrix_sdk_base::latest_event::LatestEvent;
#[cfg(feature = "e2e-encryption")]
use ruma::{events::AnySyncTimelineEvent, serde::Raw};
use ruma::{
events::receipt::{Receipt, ReceiptThread, ReceiptType},
events::{
fully_read::FullyReadEventContent,
receipt::{Receipt, ReceiptThread, ReceiptType},
},
push::{PushConditionRoomCtx, Ruleset},
EventId, OwnedEventId, OwnedUserId, RoomVersionId, UserId,
};
#[cfg(feature = "e2e-encryption")]
use ruma::{events::AnySyncTimelineEvent, serde::Raw};
use tracing::{debug, error};
use super::{Profile, TimelineBuilder};
@@ -81,6 +84,9 @@ pub(super) trait RoomDataProvider: Clone + Send + Sync + 'static {
/// Loads read receipts for an event from the storage backend.
async fn load_event_receipts(&self, event_id: &EventId) -> IndexMap<OwnedUserId, Receipt>;
/// Load the current fully-read event id, from storage.
async fn load_fully_read_marker(&self) -> Option<OwnedEventId>;
async fn push_rules_and_context(&self) -> Option<(Ruleset, PushConditionRoomCtx)>;
}
@@ -188,6 +194,23 @@ impl RoomDataProvider for Room {
}
}
}
async fn load_fully_read_marker(&self) -> Option<OwnedEventId> {
match self.account_data_static::<FullyReadEventContent>().await {
Ok(Some(fully_read)) => match fully_read.deserialize() {
Ok(fully_read) => Some(fully_read.content.event_id),
Err(e) => {
error!("Failed to deserialize fully-read account data: {e}");
None
}
},
Err(e) => {
error!("Failed to get fully-read account data from the store: {e}");
None
}
_ => None,
}
}
}
// Internal helper to make most of retry_event_decryption independent of a room