mirror of
https://github.com/matrix-org/matrix-rust-sdk.git
synced 2026-05-19 06:04:31 -04:00
refactor(sdk): Hold locks for the full lifetime of TimelineEventHandler
Not really necessary now, but required for thread-safe bulk updates like we want for retrying decryption.
This commit is contained in:
committed by
Jonas Platte
parent
2bc0ac8fd7
commit
ac33ca5fa0
@@ -12,7 +12,7 @@
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
use std::sync::{Arc, MutexGuard};
|
||||
use std::{collections::HashMap, sync::Arc};
|
||||
|
||||
use futures_signals::signal_vec::MutableVecLockMut;
|
||||
use indexmap::map::Entry;
|
||||
@@ -20,7 +20,7 @@ use matrix_sdk_base::deserialized_responses::EncryptionInfo;
|
||||
use ruma::{
|
||||
events::{
|
||||
fully_read::FullyReadEvent,
|
||||
reaction::ReactionEventContent,
|
||||
reaction::{ReactionEventContent, Relation as AnnotationRelation},
|
||||
room::{
|
||||
encrypted::{self, RoomEncryptedEventContent},
|
||||
message::{self, MessageType, Replacement, RoomMessageEventContent},
|
||||
@@ -59,7 +59,7 @@ impl TimelineInner {
|
||||
content: AnyMessageLikeEventContent,
|
||||
own_user_id: &UserId,
|
||||
) {
|
||||
let meta = TimelineEventMetadata {
|
||||
let event_meta = TimelineEventMetadata {
|
||||
sender: own_user_id.to_owned(),
|
||||
is_own_event: true,
|
||||
relations: None,
|
||||
@@ -70,7 +70,10 @@ impl TimelineInner {
|
||||
let flow = Flow::Local { txn_id };
|
||||
let kind = TimelineEventKind::Message { content };
|
||||
|
||||
TimelineEventHandler::new(meta, flow, self).handle_event(kind)
|
||||
let timeline_items = self.items.lock_mut();
|
||||
let mut timeline_meta = self.metadata.lock().unwrap();
|
||||
TimelineEventHandler::new(event_meta, flow, timeline_items, &mut timeline_meta)
|
||||
.handle_event(kind);
|
||||
}
|
||||
|
||||
pub(super) fn handle_back_paginated_event(
|
||||
@@ -100,7 +103,7 @@ impl TimelineInner {
|
||||
let sender = event.sender().to_owned();
|
||||
let is_own_event = sender == own_user_id;
|
||||
|
||||
let meta = TimelineEventMetadata {
|
||||
let event_meta = TimelineEventMetadata {
|
||||
sender,
|
||||
is_own_event,
|
||||
relations: event.relations().cloned(),
|
||||
@@ -114,7 +117,10 @@ impl TimelineInner {
|
||||
position,
|
||||
};
|
||||
|
||||
TimelineEventHandler::new(meta, flow, self).handle_event(event.into())
|
||||
let timeline_items = self.items.lock_mut();
|
||||
let mut timeline_meta = self.metadata.lock().unwrap();
|
||||
TimelineEventHandler::new(event_meta, flow, timeline_items, &mut timeline_meta)
|
||||
.handle_event(event.into())
|
||||
}
|
||||
|
||||
pub(super) fn handle_fully_read(&self, raw: Raw<FullyReadEvent>) {
|
||||
@@ -138,29 +144,35 @@ impl TimelineInner {
|
||||
|
||||
metadata_lock.fully_read_event = Some(fully_read_event_id);
|
||||
|
||||
let items_lock = self.items.lock_mut();
|
||||
update_fully_read_item(metadata_lock, items_lock);
|
||||
let mut items_lock = self.items.lock_mut();
|
||||
let metadata = &mut *metadata_lock;
|
||||
update_fully_read_item(
|
||||
&mut items_lock,
|
||||
metadata.fully_read_event.as_deref(),
|
||||
&mut metadata.fully_read_event_in_timeline,
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
fn update_fully_read_item(
|
||||
mut metadata_lock: MutexGuard<'_, TimelineInnerMetadata>,
|
||||
mut items_lock: MutableVecLockMut<'_, Arc<TimelineItem>>,
|
||||
items_lock: &mut MutableVecLockMut<'_, Arc<TimelineItem>>,
|
||||
fully_read_event: Option<&EventId>,
|
||||
fully_read_event_in_timeline: &mut bool,
|
||||
) {
|
||||
let Some(fully_read_event) = &metadata_lock.fully_read_event else { return };
|
||||
let old_idx = find_fully_read(&items_lock);
|
||||
let new_idx = find_event(&items_lock, fully_read_event).map(|(idx, _)| idx + 1);
|
||||
let Some(fully_read_event) = fully_read_event else { return };
|
||||
let old_idx = find_fully_read(items_lock);
|
||||
let new_idx = find_event(items_lock, fully_read_event).map(|(idx, _)| idx + 1);
|
||||
match (old_idx, new_idx) {
|
||||
(None, None) => {}
|
||||
(None, Some(idx)) => {
|
||||
metadata_lock.fully_read_event_in_timeline = true;
|
||||
*fully_read_event_in_timeline = true;
|
||||
let item = TimelineItem::Virtual(VirtualTimelineItem::ReadMarker);
|
||||
items_lock.insert_cloned(idx, item.into());
|
||||
}
|
||||
(Some(_), None) => {
|
||||
// Keep the current position of the read marker, hopefully we
|
||||
// should have a new position later.
|
||||
metadata_lock.fully_read_event_in_timeline = false;
|
||||
*fully_read_event_in_timeline = false;
|
||||
}
|
||||
(Some(from), Some(to)) => {
|
||||
items_lock.move_from_to(from, to);
|
||||
@@ -255,13 +267,29 @@ enum TimelineItemPosition {
|
||||
struct TimelineEventHandler<'a> {
|
||||
meta: TimelineEventMetadata,
|
||||
flow: Flow,
|
||||
timeline: &'a TimelineInner,
|
||||
timeline_items: MutableVecLockMut<'a, Arc<TimelineItem>>,
|
||||
reaction_map: &'a mut HashMap<TimelineKey, (OwnedUserId, AnnotationRelation)>,
|
||||
fully_read_event: &'a mut Option<OwnedEventId>,
|
||||
fully_read_event_in_timeline: &'a mut bool,
|
||||
event_added: bool,
|
||||
}
|
||||
|
||||
impl<'a> TimelineEventHandler<'a> {
|
||||
fn new(meta: TimelineEventMetadata, flow: Flow, timeline: &'a TimelineInner) -> Self {
|
||||
Self { meta, flow, timeline, event_added: false }
|
||||
fn new(
|
||||
event_meta: TimelineEventMetadata,
|
||||
flow: Flow,
|
||||
timeline_items: MutableVecLockMut<'a, Arc<TimelineItem>>,
|
||||
timeline_meta: &'a mut TimelineInnerMetadata,
|
||||
) -> Self {
|
||||
Self {
|
||||
meta: event_meta,
|
||||
flow,
|
||||
timeline_items,
|
||||
reaction_map: &mut timeline_meta.reaction_map,
|
||||
fully_read_event: &mut timeline_meta.fully_read_event,
|
||||
fully_read_event_in_timeline: &mut timeline_meta.fully_read_event_in_timeline,
|
||||
event_added: false,
|
||||
}
|
||||
}
|
||||
|
||||
fn handle_event(mut self, event_kind: TimelineEventKind) {
|
||||
@@ -302,7 +330,7 @@ impl<'a> TimelineEventHandler<'a> {
|
||||
fn handle_room_message_edit(&mut self, replacement: Replacement<MessageType>) {
|
||||
let event_id = &replacement.event_id;
|
||||
|
||||
self.maybe_update_timeline_item(event_id, "edit", |item| {
|
||||
maybe_update_timeline_item(&mut self.timeline_items, event_id, "edit", |item| {
|
||||
if self.meta.sender != item.sender() {
|
||||
info!(
|
||||
%event_id, original_sender = %item.sender(), edit_sender = %self.meta.sender,
|
||||
@@ -343,13 +371,8 @@ impl<'a> TimelineEventHandler<'a> {
|
||||
fn handle_reaction(&mut self, c: ReactionEventContent) {
|
||||
let event_id: &EventId = &c.relates_to.event_id;
|
||||
|
||||
// This lock should never be contended, same as the timeline item lock.
|
||||
// If this is ever run in parallel for some reason though, make sure the
|
||||
// reaction lock is held for the entire time of the timeline items being
|
||||
// locked so these two things can't get out of sync.
|
||||
let mut lock = self.timeline.metadata.lock().unwrap();
|
||||
|
||||
let did_update = self.maybe_update_timeline_item(event_id, "reaction", |item| {
|
||||
let items = &mut self.timeline_items;
|
||||
let did_update = maybe_update_timeline_item(items, event_id, "reaction", |item| {
|
||||
// Handling of reactions on redacted events is an open question.
|
||||
// For now, ignore reactions on redacted events like Element does.
|
||||
if let TimelineItemContent::RedactedMessage = item.content {
|
||||
@@ -370,7 +393,7 @@ impl<'a> TimelineEventHandler<'a> {
|
||||
});
|
||||
|
||||
if did_update {
|
||||
lock.reaction_map.insert(self.flow.to_key(), (self.meta.sender.clone(), c.relates_to));
|
||||
self.reaction_map.insert(self.flow.to_key(), (self.meta.sender.clone(), c.relates_to));
|
||||
}
|
||||
}
|
||||
|
||||
@@ -390,13 +413,11 @@ impl<'a> TimelineEventHandler<'a> {
|
||||
fn handle_redaction(&mut self, redacts: OwnedEventId, _content: RoomRedactionEventContent) {
|
||||
let mut did_update = false;
|
||||
|
||||
// Don't release this lock until after update_timeline_item.
|
||||
// See first comment in handle_reaction for why.
|
||||
let mut lock = self.timeline.metadata.lock().unwrap();
|
||||
if let Some((sender, rel)) =
|
||||
lock.reaction_map.remove(&TimelineKey::EventId(redacts.clone()))
|
||||
self.reaction_map.remove(&TimelineKey::EventId(redacts.clone()))
|
||||
{
|
||||
did_update = self.maybe_update_timeline_item(&rel.event_id, "redaction", |item| {
|
||||
let items = &mut self.timeline_items;
|
||||
did_update = maybe_update_timeline_item(items, &rel.event_id, "redaction", |item| {
|
||||
let mut reactions = item.reactions.clone();
|
||||
|
||||
let Entry::Occupied(mut details_entry) = reactions.bundled.entry(rel.key) else {
|
||||
@@ -447,7 +468,8 @@ impl<'a> TimelineEventHandler<'a> {
|
||||
// Even if the event being redacted is a reaction (found in
|
||||
// `reaction_map`), it can still be present in the timeline items
|
||||
// directly with the raw event timeline feature (not yet implemented).
|
||||
did_update |= self.update_timeline_item(&redacts, "redaction", |item| item.to_redacted());
|
||||
let items = &mut self.timeline_items;
|
||||
did_update |= update_timeline_item(items, &redacts, "redaction", |item| item.to_redacted());
|
||||
|
||||
if !did_update {
|
||||
// We will want to know this when debugging redaction issues.
|
||||
@@ -472,17 +494,16 @@ impl<'a> TimelineEventHandler<'a> {
|
||||
};
|
||||
|
||||
let item = Arc::new(TimelineItem::Event(item));
|
||||
let mut lock = self.timeline.items.lock_mut();
|
||||
match &self.flow {
|
||||
Flow::Local { .. } => {
|
||||
lock.push_cloned(item);
|
||||
self.timeline_items.push_cloned(item);
|
||||
}
|
||||
Flow::Remote { txn_id, event_id, position, raw_event, .. } => {
|
||||
if let Some(txn_id) = txn_id {
|
||||
if let Some((idx, _old_item)) = find_event(&lock, txn_id) {
|
||||
if let Some((idx, _old_item)) = find_event(&self.timeline_items, txn_id) {
|
||||
// TODO: Check whether anything is different about the
|
||||
// old and new item?
|
||||
lock.set_cloned(idx, item);
|
||||
self.timeline_items.set_cloned(idx, item);
|
||||
return;
|
||||
} else {
|
||||
warn!(
|
||||
@@ -493,7 +514,7 @@ impl<'a> TimelineEventHandler<'a> {
|
||||
}
|
||||
}
|
||||
|
||||
if let Some((idx, old_item)) = find_event(&lock, event_id) {
|
||||
if let Some((idx, old_item)) = find_event(&self.timeline_items, event_id) {
|
||||
warn!(
|
||||
?item,
|
||||
?old_item,
|
||||
@@ -504,62 +525,55 @@ impl<'a> TimelineEventHandler<'a> {
|
||||
// With /messages and /sync sometimes disagreeing on order
|
||||
// of messages, we might want to change the position in some
|
||||
// circumstances, but for now this should be good enough.
|
||||
lock.set_cloned(idx, item);
|
||||
self.timeline_items.set_cloned(idx, item);
|
||||
return;
|
||||
}
|
||||
|
||||
match position {
|
||||
TimelineItemPosition::Start => lock.insert_cloned(0, item),
|
||||
TimelineItemPosition::End => lock.push_cloned(item),
|
||||
TimelineItemPosition::Start => self.timeline_items.insert_cloned(0, item),
|
||||
TimelineItemPosition::End => self.timeline_items.push_cloned(item),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
drop(lock);
|
||||
|
||||
let metadata_lock = self.timeline.metadata.lock().unwrap();
|
||||
// See if we got the event corresponding to the fully read marker now.
|
||||
if !metadata_lock.fully_read_event_in_timeline {
|
||||
let items_lock = self.timeline.items.lock_mut();
|
||||
update_fully_read_item(metadata_lock, items_lock);
|
||||
if !*self.fully_read_event_in_timeline {
|
||||
update_fully_read_item(
|
||||
&mut self.timeline_items,
|
||||
self.fully_read_event.as_deref(),
|
||||
self.fully_read_event_in_timeline,
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Returns whether an update happened
|
||||
fn maybe_update_timeline_item(
|
||||
&self,
|
||||
event_id: &EventId,
|
||||
action: &str,
|
||||
update: impl FnOnce(&EventTimelineItem) -> Option<EventTimelineItem>,
|
||||
) -> bool {
|
||||
// No point in trying to update items with relations when back-
|
||||
// paginating, the event the relation applies to can't be processed yet.
|
||||
if matches!(self.flow, Flow::Remote { position: TimelineItemPosition::Start, .. }) {
|
||||
return false;
|
||||
/// Returns whether an update happened
|
||||
fn maybe_update_timeline_item(
|
||||
timeline_items: &mut MutableVecLockMut<'_, Arc<TimelineItem>>,
|
||||
event_id: &EventId,
|
||||
action: &str,
|
||||
update: impl FnOnce(&EventTimelineItem) -> Option<EventTimelineItem>,
|
||||
) -> bool {
|
||||
if let Some((idx, item)) = find_event(timeline_items, event_id) {
|
||||
if let Some(new_item) = update(item) {
|
||||
timeline_items.set_cloned(idx, Arc::new(TimelineItem::Event(new_item)));
|
||||
return true;
|
||||
}
|
||||
|
||||
let mut lock = self.timeline.items.lock_mut();
|
||||
if let Some((idx, item)) = find_event(&lock, event_id) {
|
||||
if let Some(new_item) = update(item) {
|
||||
lock.set_cloned(idx, Arc::new(TimelineItem::Event(new_item)));
|
||||
return true;
|
||||
}
|
||||
} else {
|
||||
debug!(%event_id, "Timeline item not found, discarding {action}");
|
||||
}
|
||||
|
||||
false
|
||||
} else {
|
||||
debug!(%event_id, "Timeline item not found, discarding {action}");
|
||||
}
|
||||
|
||||
/// Returns whether an update happened
|
||||
fn update_timeline_item(
|
||||
&self,
|
||||
event_id: &EventId,
|
||||
action: &str,
|
||||
update: impl FnOnce(&EventTimelineItem) -> EventTimelineItem,
|
||||
) -> bool {
|
||||
self.maybe_update_timeline_item(event_id, action, move |item| Some(update(item)))
|
||||
}
|
||||
false
|
||||
}
|
||||
|
||||
/// Returns whether an update happened
|
||||
fn update_timeline_item(
|
||||
timeline_items: &mut MutableVecLockMut<'_, Arc<TimelineItem>>,
|
||||
event_id: &EventId,
|
||||
action: &str,
|
||||
update: impl FnOnce(&EventTimelineItem) -> EventTimelineItem,
|
||||
) -> bool {
|
||||
maybe_update_timeline_item(timeline_items, event_id, action, move |item| Some(update(item)))
|
||||
}
|
||||
|
||||
struct NewEventTimelineItem {
|
||||
|
||||
Reference in New Issue
Block a user