mirror of
https://github.com/matrix-org/matrix-rust-sdk.git
synced 2026-05-09 08:27:32 -04:00
timeline: prevent deadlock in replace_with_initial_events
The `state` lock was taken at the top level of this function, and indirectly implicitly in the `set_fully_read_event` function. This fixes it, and adds a regression test.
This commit is contained in:
@@ -32,7 +32,6 @@ use ruma::RoomId;
|
||||
use ruma::{
|
||||
api::client::receipt::create_receipt::v3::ReceiptType as SendReceiptType,
|
||||
events::{
|
||||
fully_read::FullyReadEvent,
|
||||
poll::unstable_start::UnstablePollStartEventContent,
|
||||
reaction::ReactionEventContent,
|
||||
receipt::{Receipt, ReceiptThread, ReceiptType},
|
||||
@@ -433,6 +432,44 @@ impl<P: RoomDataProvider> TimelineInner<P> {
|
||||
self.state.write().await.clear();
|
||||
}
|
||||
|
||||
/// Replaces the content of the current timeline with initial events.
|
||||
///
|
||||
/// Also sets up read receipts and the read marker for a live timeline of a
|
||||
/// room.
|
||||
///
|
||||
/// This is all done with a single lock guard, since we don't want the state
|
||||
/// to be modified between the clear and re-insertion of new events.
|
||||
pub(super) async fn replace_with_initial_events(&self, events: Vec<SyncTimelineEvent>) {
|
||||
let mut state = self.state.write().await;
|
||||
|
||||
state.clear();
|
||||
|
||||
let track_read_markers = self.settings.track_read_receipts;
|
||||
if track_read_markers {
|
||||
self.populate_initial_user_receipt(ReceiptType::Read).await;
|
||||
self.populate_initial_user_receipt(ReceiptType::ReadPrivate).await;
|
||||
}
|
||||
|
||||
if !events.is_empty() {
|
||||
state
|
||||
.add_events_at(
|
||||
events,
|
||||
TimelineEnd::Back { from_cache: true },
|
||||
&self.room_data_provider,
|
||||
&self.settings,
|
||||
)
|
||||
.await;
|
||||
}
|
||||
|
||||
if track_read_markers {
|
||||
if let Some(fully_read_event_id) =
|
||||
self.room_data_provider.load_fully_read_marker().await
|
||||
{
|
||||
state.set_fully_read_event(fully_read_event_id);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub(super) async fn handle_fully_read_marker(&self, fully_read_event_id: OwnedEventId) {
|
||||
self.state.write().await.handle_fully_read_marker(fully_read_event_id);
|
||||
}
|
||||
@@ -674,6 +711,7 @@ impl<P: RoomDataProvider> TimelineInner<P> {
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(any(test, feature = "testing"))]
|
||||
pub(super) async fn set_fully_read_event(&self, fully_read_event_id: OwnedEventId) {
|
||||
self.state.write().await.set_fully_read_event(fully_read_event_id);
|
||||
}
|
||||
@@ -950,65 +988,6 @@ impl TimelineInner {
|
||||
&self.room_data_provider
|
||||
}
|
||||
|
||||
/// Replaces the content of the current timeline with initial events.
|
||||
///
|
||||
/// Also sets up read receipts and the read marker for a live timeline of a
|
||||
/// room.
|
||||
///
|
||||
/// This is all done with a single lock guard, since we don't want the state
|
||||
/// to be modified between the clear and re-insertion of new events.
|
||||
pub(super) async fn replace_with_initial_events(&self, events: Vec<SyncTimelineEvent>) {
|
||||
let mut state = self.state.write().await;
|
||||
|
||||
state.clear();
|
||||
|
||||
let track_read_markers = self.settings.track_read_receipts;
|
||||
if track_read_markers {
|
||||
self.populate_initial_user_receipt(ReceiptType::Read).await;
|
||||
self.populate_initial_user_receipt(ReceiptType::ReadPrivate).await;
|
||||
}
|
||||
|
||||
if !events.is_empty() {
|
||||
state
|
||||
.add_events_at(
|
||||
events,
|
||||
TimelineEnd::Back { from_cache: true },
|
||||
&self.room_data_provider,
|
||||
&self.settings,
|
||||
)
|
||||
.await;
|
||||
}
|
||||
|
||||
if track_read_markers {
|
||||
self.load_fully_read_event().await;
|
||||
}
|
||||
}
|
||||
|
||||
/// Get the current fully-read event, from storage.
|
||||
pub(super) async fn fully_read_event(&self) -> Option<FullyReadEvent> {
|
||||
match self.room().account_data_static().await {
|
||||
Ok(Some(fully_read)) => match fully_read.deserialize() {
|
||||
Ok(fully_read) => Some(fully_read),
|
||||
Err(e) => {
|
||||
error!("Failed to deserialize fully-read account data: {e}");
|
||||
None
|
||||
}
|
||||
},
|
||||
Err(e) => {
|
||||
error!("Failed to get fully-read account data from the store: {e}");
|
||||
None
|
||||
}
|
||||
_ => None,
|
||||
}
|
||||
}
|
||||
|
||||
/// Load the current fully-read event in this inner timeline from storage.
|
||||
pub(super) async fn load_fully_read_event(&self) {
|
||||
if let Some(fully_read) = self.fully_read_event().await {
|
||||
self.set_fully_read_event(fully_read.content.event_id).await;
|
||||
}
|
||||
}
|
||||
|
||||
#[instrument(skip(self))]
|
||||
pub(super) async fn fetch_in_reply_to_details(
|
||||
&self,
|
||||
@@ -1126,10 +1105,10 @@ impl TimelineInner {
|
||||
}
|
||||
}
|
||||
SendReceiptType::FullyRead => {
|
||||
if let Some(old_fully_read) = self.fully_read_event().await {
|
||||
if let Some(relative_pos) = state
|
||||
.meta
|
||||
.compare_events_positions(&old_fully_read.content.event_id, event_id)
|
||||
if let Some(prev_event_id) = self.room_data_provider.load_fully_read_marker().await
|
||||
{
|
||||
if let Some(relative_pos) =
|
||||
state.meta.compare_events_positions(&prev_event_id, event_id)
|
||||
{
|
||||
return relative_pos == RelativePosition::After;
|
||||
}
|
||||
|
||||
@@ -15,6 +15,7 @@
|
||||
use assert_matches::assert_matches;
|
||||
use assert_matches2::assert_let;
|
||||
use eyeball_im::VectorDiff;
|
||||
use matrix_sdk::test_utils::events::EventFactory;
|
||||
use matrix_sdk_base::deserialized_responses::SyncTimelineEvent;
|
||||
use matrix_sdk_test::{async_test, sync_timeline_event, ALICE, BOB, CAROL};
|
||||
use ruma::{
|
||||
@@ -29,13 +30,16 @@ use ruma::{
|
||||
},
|
||||
FullStateEventContent,
|
||||
},
|
||||
owned_event_id,
|
||||
};
|
||||
use stream_assert::assert_next_matches;
|
||||
|
||||
use super::TestTimeline;
|
||||
use crate::timeline::{
|
||||
event_item::AnyOtherFullStateEventContent, inner::TimelineEnd, MembershipChange,
|
||||
TimelineDetails, TimelineItemContent, TimelineItemKind, VirtualTimelineItem,
|
||||
event_item::AnyOtherFullStateEventContent,
|
||||
inner::{TimelineEnd, TimelineInnerSettings},
|
||||
tests::TestRoomDataProvider,
|
||||
MembershipChange, TimelineDetails, TimelineItemContent, TimelineItemKind, VirtualTimelineItem,
|
||||
};
|
||||
|
||||
#[async_test]
|
||||
@@ -72,6 +76,33 @@ async fn test_initial_events() {
|
||||
assert_matches!(&item.kind, TimelineItemKind::Virtual(VirtualTimelineItem::DayDivider(_)));
|
||||
}
|
||||
|
||||
#[async_test]
|
||||
async fn test_replace_with_initial_events_and_read_marker() {
|
||||
let event_id = owned_event_id!("$1");
|
||||
let timeline = TestTimeline::with_room_data_provider(
|
||||
TestRoomDataProvider::default().with_fully_read_marker(event_id),
|
||||
)
|
||||
.with_settings(TimelineInnerSettings { track_read_receipts: true, ..Default::default() });
|
||||
|
||||
let factory = EventFactory::new();
|
||||
let ev = factory.text_msg("hey").sender(*ALICE).into_sync();
|
||||
|
||||
timeline.inner.add_events_at(vec![ev], TimelineEnd::Back { from_cache: false }).await;
|
||||
|
||||
let items = timeline.inner.items().await;
|
||||
assert_eq!(items.len(), 2);
|
||||
assert!(items[0].is_day_divider());
|
||||
assert_eq!(items[1].as_event().unwrap().content().as_message().unwrap().body(), "hey");
|
||||
|
||||
let ev = factory.text_msg("yo").sender(*BOB).into_sync();
|
||||
timeline.inner.replace_with_initial_events(vec![ev]).await;
|
||||
|
||||
let items = timeline.inner.items().await;
|
||||
assert_eq!(items.len(), 2);
|
||||
assert!(items[0].is_day_divider());
|
||||
assert_eq!(items[1].as_event().unwrap().content().as_message().unwrap().body(), "yo");
|
||||
}
|
||||
|
||||
#[async_test]
|
||||
async fn test_sticker() {
|
||||
let timeline = TestTimeline::new();
|
||||
|
||||
@@ -281,11 +281,17 @@ type ReadReceiptMap =
|
||||
#[derive(Clone, Default)]
|
||||
struct TestRoomDataProvider {
|
||||
initial_user_receipts: ReadReceiptMap,
|
||||
fully_read_marker: Option<OwnedEventId>,
|
||||
}
|
||||
|
||||
impl TestRoomDataProvider {
|
||||
fn with_initial_user_receipts(initial_user_receipts: ReadReceiptMap) -> Self {
|
||||
Self { initial_user_receipts }
|
||||
fn with_initial_user_receipts(mut self, initial_user_receipts: ReadReceiptMap) -> Self {
|
||||
self.initial_user_receipts = initial_user_receipts;
|
||||
self
|
||||
}
|
||||
fn with_fully_read_marker(mut self, event_id: OwnedEventId) -> Self {
|
||||
self.fully_read_marker = Some(event_id);
|
||||
self
|
||||
}
|
||||
}
|
||||
|
||||
@@ -345,6 +351,10 @@ impl RoomDataProvider for TestRoomDataProvider {
|
||||
|
||||
Some((push_rules, push_context))
|
||||
}
|
||||
|
||||
async fn load_fully_read_marker(&self) -> Option<OwnedEventId> {
|
||||
self.fully_read_marker.clone()
|
||||
}
|
||||
}
|
||||
|
||||
pub(super) async fn assert_event_is_updated(
|
||||
|
||||
@@ -490,7 +490,7 @@ async fn test_initial_public_unthreaded_receipt() {
|
||||
);
|
||||
|
||||
let timeline = TestTimeline::with_room_data_provider(
|
||||
TestRoomDataProvider::with_initial_user_receipts(initial_user_receipts),
|
||||
TestRoomDataProvider::default().with_initial_user_receipts(initial_user_receipts),
|
||||
)
|
||||
.with_settings(TimelineInnerSettings { track_read_receipts: true, ..Default::default() });
|
||||
|
||||
@@ -515,7 +515,7 @@ async fn test_initial_public_main_thread_receipt() {
|
||||
);
|
||||
|
||||
let timeline = TestTimeline::with_room_data_provider(
|
||||
TestRoomDataProvider::with_initial_user_receipts(initial_user_receipts),
|
||||
TestRoomDataProvider::default().with_initial_user_receipts(initial_user_receipts),
|
||||
)
|
||||
.with_settings(TimelineInnerSettings { track_read_receipts: true, ..Default::default() });
|
||||
|
||||
@@ -540,7 +540,7 @@ async fn test_initial_private_unthreaded_receipt() {
|
||||
);
|
||||
|
||||
let timeline = TestTimeline::with_room_data_provider(
|
||||
TestRoomDataProvider::with_initial_user_receipts(initial_user_receipts),
|
||||
TestRoomDataProvider::default().with_initial_user_receipts(initial_user_receipts),
|
||||
)
|
||||
.with_settings(TimelineInnerSettings { track_read_receipts: true, ..Default::default() });
|
||||
|
||||
@@ -565,7 +565,7 @@ async fn test_initial_private_main_thread_receipt() {
|
||||
);
|
||||
|
||||
let timeline = TestTimeline::with_room_data_provider(
|
||||
TestRoomDataProvider::with_initial_user_receipts(initial_user_receipts),
|
||||
TestRoomDataProvider::default().with_initial_user_receipts(initial_user_receipts),
|
||||
)
|
||||
.with_settings(TimelineInnerSettings { track_read_receipts: true, ..Default::default() });
|
||||
|
||||
|
||||
@@ -18,13 +18,16 @@ use indexmap::IndexMap;
|
||||
use matrix_sdk::{deserialized_responses::TimelineEvent, Result};
|
||||
use matrix_sdk::{event_cache, Room};
|
||||
use matrix_sdk_base::latest_event::LatestEvent;
|
||||
#[cfg(feature = "e2e-encryption")]
|
||||
use ruma::{events::AnySyncTimelineEvent, serde::Raw};
|
||||
use ruma::{
|
||||
events::receipt::{Receipt, ReceiptThread, ReceiptType},
|
||||
events::{
|
||||
fully_read::FullyReadEventContent,
|
||||
receipt::{Receipt, ReceiptThread, ReceiptType},
|
||||
},
|
||||
push::{PushConditionRoomCtx, Ruleset},
|
||||
EventId, OwnedEventId, OwnedUserId, RoomVersionId, UserId,
|
||||
};
|
||||
#[cfg(feature = "e2e-encryption")]
|
||||
use ruma::{events::AnySyncTimelineEvent, serde::Raw};
|
||||
use tracing::{debug, error};
|
||||
|
||||
use super::{Profile, TimelineBuilder};
|
||||
@@ -81,6 +84,9 @@ pub(super) trait RoomDataProvider: Clone + Send + Sync + 'static {
|
||||
/// Loads read receipts for an event from the storage backend.
|
||||
async fn load_event_receipts(&self, event_id: &EventId) -> IndexMap<OwnedUserId, Receipt>;
|
||||
|
||||
/// Load the current fully-read event id, from storage.
|
||||
async fn load_fully_read_marker(&self) -> Option<OwnedEventId>;
|
||||
|
||||
async fn push_rules_and_context(&self) -> Option<(Ruleset, PushConditionRoomCtx)>;
|
||||
}
|
||||
|
||||
@@ -188,6 +194,23 @@ impl RoomDataProvider for Room {
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
async fn load_fully_read_marker(&self) -> Option<OwnedEventId> {
|
||||
match self.account_data_static::<FullyReadEventContent>().await {
|
||||
Ok(Some(fully_read)) => match fully_read.deserialize() {
|
||||
Ok(fully_read) => Some(fully_read.content.event_id),
|
||||
Err(e) => {
|
||||
error!("Failed to deserialize fully-read account data: {e}");
|
||||
None
|
||||
}
|
||||
},
|
||||
Err(e) => {
|
||||
error!("Failed to get fully-read account data from the store: {e}");
|
||||
None
|
||||
}
|
||||
_ => None,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Internal helper to make most of retry_event_decryption independent of a room
|
||||
|
||||
Reference in New Issue
Block a user