From ac33ca5fa0623144750ac8b5bae76d58b82818ea Mon Sep 17 00:00:00 2001 From: Jonas Platte Date: Thu, 17 Nov 2022 13:18:52 +0100 Subject: [PATCH] refactor(sdk): Hold locks for the full lifetime of TimelineEventHandler Not really necessary now, but required for thread-safe bulk updates like we want for retrying decryption. --- .../src/room/timeline/event_handler.rs | 172 ++++++++++-------- 1 file changed, 93 insertions(+), 79 deletions(-) diff --git a/crates/matrix-sdk/src/room/timeline/event_handler.rs b/crates/matrix-sdk/src/room/timeline/event_handler.rs index 3ac047736..47c2a4b46 100644 --- a/crates/matrix-sdk/src/room/timeline/event_handler.rs +++ b/crates/matrix-sdk/src/room/timeline/event_handler.rs @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::sync::{Arc, MutexGuard}; +use std::{collections::HashMap, sync::Arc}; use futures_signals::signal_vec::MutableVecLockMut; use indexmap::map::Entry; @@ -20,7 +20,7 @@ use matrix_sdk_base::deserialized_responses::EncryptionInfo; use ruma::{ events::{ fully_read::FullyReadEvent, - reaction::ReactionEventContent, + reaction::{ReactionEventContent, Relation as AnnotationRelation}, room::{ encrypted::{self, RoomEncryptedEventContent}, message::{self, MessageType, Replacement, RoomMessageEventContent}, @@ -59,7 +59,7 @@ impl TimelineInner { content: AnyMessageLikeEventContent, own_user_id: &UserId, ) { - let meta = TimelineEventMetadata { + let event_meta = TimelineEventMetadata { sender: own_user_id.to_owned(), is_own_event: true, relations: None, @@ -70,7 +70,10 @@ impl TimelineInner { let flow = Flow::Local { txn_id }; let kind = TimelineEventKind::Message { content }; - TimelineEventHandler::new(meta, flow, self).handle_event(kind) + let timeline_items = self.items.lock_mut(); + let mut timeline_meta = self.metadata.lock().unwrap(); + TimelineEventHandler::new(event_meta, flow, timeline_items, &mut timeline_meta) + .handle_event(kind); } pub(super) fn handle_back_paginated_event( @@ -100,7 +103,7 @@ impl TimelineInner { let sender = event.sender().to_owned(); let is_own_event = sender == own_user_id; - let meta = TimelineEventMetadata { + let event_meta = TimelineEventMetadata { sender, is_own_event, relations: event.relations().cloned(), @@ -114,7 +117,10 @@ impl TimelineInner { position, }; - TimelineEventHandler::new(meta, flow, self).handle_event(event.into()) + let timeline_items = self.items.lock_mut(); + let mut timeline_meta = self.metadata.lock().unwrap(); + TimelineEventHandler::new(event_meta, flow, timeline_items, &mut timeline_meta) + .handle_event(event.into()) } pub(super) fn handle_fully_read(&self, raw: Raw) { @@ -138,29 +144,35 @@ impl TimelineInner { metadata_lock.fully_read_event = Some(fully_read_event_id); - let items_lock = self.items.lock_mut(); - update_fully_read_item(metadata_lock, items_lock); + let mut items_lock = self.items.lock_mut(); + let metadata = &mut *metadata_lock; + update_fully_read_item( + &mut items_lock, + metadata.fully_read_event.as_deref(), + &mut metadata.fully_read_event_in_timeline, + ); } } fn update_fully_read_item( - mut metadata_lock: MutexGuard<'_, TimelineInnerMetadata>, - mut items_lock: MutableVecLockMut<'_, Arc>, + items_lock: &mut MutableVecLockMut<'_, Arc>, + fully_read_event: Option<&EventId>, + fully_read_event_in_timeline: &mut bool, ) { - let Some(fully_read_event) = &metadata_lock.fully_read_event else { return }; - let old_idx = find_fully_read(&items_lock); - let new_idx = find_event(&items_lock, fully_read_event).map(|(idx, _)| idx + 1); + let Some(fully_read_event) = fully_read_event else { return }; + let old_idx = find_fully_read(items_lock); + let new_idx = find_event(items_lock, fully_read_event).map(|(idx, _)| idx + 1); match (old_idx, new_idx) { (None, None) => {} (None, Some(idx)) => { - metadata_lock.fully_read_event_in_timeline = true; + *fully_read_event_in_timeline = true; let item = TimelineItem::Virtual(VirtualTimelineItem::ReadMarker); items_lock.insert_cloned(idx, item.into()); } (Some(_), None) => { // Keep the current position of the read marker, hopefully we // should have a new position later. - metadata_lock.fully_read_event_in_timeline = false; + *fully_read_event_in_timeline = false; } (Some(from), Some(to)) => { items_lock.move_from_to(from, to); @@ -255,13 +267,29 @@ enum TimelineItemPosition { struct TimelineEventHandler<'a> { meta: TimelineEventMetadata, flow: Flow, - timeline: &'a TimelineInner, + timeline_items: MutableVecLockMut<'a, Arc>, + reaction_map: &'a mut HashMap, + fully_read_event: &'a mut Option, + fully_read_event_in_timeline: &'a mut bool, event_added: bool, } impl<'a> TimelineEventHandler<'a> { - fn new(meta: TimelineEventMetadata, flow: Flow, timeline: &'a TimelineInner) -> Self { - Self { meta, flow, timeline, event_added: false } + fn new( + event_meta: TimelineEventMetadata, + flow: Flow, + timeline_items: MutableVecLockMut<'a, Arc>, + timeline_meta: &'a mut TimelineInnerMetadata, + ) -> Self { + Self { + meta: event_meta, + flow, + timeline_items, + reaction_map: &mut timeline_meta.reaction_map, + fully_read_event: &mut timeline_meta.fully_read_event, + fully_read_event_in_timeline: &mut timeline_meta.fully_read_event_in_timeline, + event_added: false, + } } fn handle_event(mut self, event_kind: TimelineEventKind) { @@ -302,7 +330,7 @@ impl<'a> TimelineEventHandler<'a> { fn handle_room_message_edit(&mut self, replacement: Replacement) { let event_id = &replacement.event_id; - self.maybe_update_timeline_item(event_id, "edit", |item| { + maybe_update_timeline_item(&mut self.timeline_items, event_id, "edit", |item| { if self.meta.sender != item.sender() { info!( %event_id, original_sender = %item.sender(), edit_sender = %self.meta.sender, @@ -343,13 +371,8 @@ impl<'a> TimelineEventHandler<'a> { fn handle_reaction(&mut self, c: ReactionEventContent) { let event_id: &EventId = &c.relates_to.event_id; - // This lock should never be contended, same as the timeline item lock. - // If this is ever run in parallel for some reason though, make sure the - // reaction lock is held for the entire time of the timeline items being - // locked so these two things can't get out of sync. - let mut lock = self.timeline.metadata.lock().unwrap(); - - let did_update = self.maybe_update_timeline_item(event_id, "reaction", |item| { + let items = &mut self.timeline_items; + let did_update = maybe_update_timeline_item(items, event_id, "reaction", |item| { // Handling of reactions on redacted events is an open question. // For now, ignore reactions on redacted events like Element does. if let TimelineItemContent::RedactedMessage = item.content { @@ -370,7 +393,7 @@ impl<'a> TimelineEventHandler<'a> { }); if did_update { - lock.reaction_map.insert(self.flow.to_key(), (self.meta.sender.clone(), c.relates_to)); + self.reaction_map.insert(self.flow.to_key(), (self.meta.sender.clone(), c.relates_to)); } } @@ -390,13 +413,11 @@ impl<'a> TimelineEventHandler<'a> { fn handle_redaction(&mut self, redacts: OwnedEventId, _content: RoomRedactionEventContent) { let mut did_update = false; - // Don't release this lock until after update_timeline_item. - // See first comment in handle_reaction for why. - let mut lock = self.timeline.metadata.lock().unwrap(); if let Some((sender, rel)) = - lock.reaction_map.remove(&TimelineKey::EventId(redacts.clone())) + self.reaction_map.remove(&TimelineKey::EventId(redacts.clone())) { - did_update = self.maybe_update_timeline_item(&rel.event_id, "redaction", |item| { + let items = &mut self.timeline_items; + did_update = maybe_update_timeline_item(items, &rel.event_id, "redaction", |item| { let mut reactions = item.reactions.clone(); let Entry::Occupied(mut details_entry) = reactions.bundled.entry(rel.key) else { @@ -447,7 +468,8 @@ impl<'a> TimelineEventHandler<'a> { // Even if the event being redacted is a reaction (found in // `reaction_map`), it can still be present in the timeline items // directly with the raw event timeline feature (not yet implemented). - did_update |= self.update_timeline_item(&redacts, "redaction", |item| item.to_redacted()); + let items = &mut self.timeline_items; + did_update |= update_timeline_item(items, &redacts, "redaction", |item| item.to_redacted()); if !did_update { // We will want to know this when debugging redaction issues. @@ -472,17 +494,16 @@ impl<'a> TimelineEventHandler<'a> { }; let item = Arc::new(TimelineItem::Event(item)); - let mut lock = self.timeline.items.lock_mut(); match &self.flow { Flow::Local { .. } => { - lock.push_cloned(item); + self.timeline_items.push_cloned(item); } Flow::Remote { txn_id, event_id, position, raw_event, .. } => { if let Some(txn_id) = txn_id { - if let Some((idx, _old_item)) = find_event(&lock, txn_id) { + if let Some((idx, _old_item)) = find_event(&self.timeline_items, txn_id) { // TODO: Check whether anything is different about the // old and new item? - lock.set_cloned(idx, item); + self.timeline_items.set_cloned(idx, item); return; } else { warn!( @@ -493,7 +514,7 @@ impl<'a> TimelineEventHandler<'a> { } } - if let Some((idx, old_item)) = find_event(&lock, event_id) { + if let Some((idx, old_item)) = find_event(&self.timeline_items, event_id) { warn!( ?item, ?old_item, @@ -504,62 +525,55 @@ impl<'a> TimelineEventHandler<'a> { // With /messages and /sync sometimes disagreeing on order // of messages, we might want to change the position in some // circumstances, but for now this should be good enough. - lock.set_cloned(idx, item); + self.timeline_items.set_cloned(idx, item); return; } match position { - TimelineItemPosition::Start => lock.insert_cloned(0, item), - TimelineItemPosition::End => lock.push_cloned(item), + TimelineItemPosition::Start => self.timeline_items.insert_cloned(0, item), + TimelineItemPosition::End => self.timeline_items.push_cloned(item), } } } - drop(lock); - - let metadata_lock = self.timeline.metadata.lock().unwrap(); // See if we got the event corresponding to the fully read marker now. - if !metadata_lock.fully_read_event_in_timeline { - let items_lock = self.timeline.items.lock_mut(); - update_fully_read_item(metadata_lock, items_lock); + if !*self.fully_read_event_in_timeline { + update_fully_read_item( + &mut self.timeline_items, + self.fully_read_event.as_deref(), + self.fully_read_event_in_timeline, + ); } } +} - /// Returns whether an update happened - fn maybe_update_timeline_item( - &self, - event_id: &EventId, - action: &str, - update: impl FnOnce(&EventTimelineItem) -> Option, - ) -> bool { - // No point in trying to update items with relations when back- - // paginating, the event the relation applies to can't be processed yet. - if matches!(self.flow, Flow::Remote { position: TimelineItemPosition::Start, .. }) { - return false; +/// Returns whether an update happened +fn maybe_update_timeline_item( + timeline_items: &mut MutableVecLockMut<'_, Arc>, + event_id: &EventId, + action: &str, + update: impl FnOnce(&EventTimelineItem) -> Option, +) -> bool { + if let Some((idx, item)) = find_event(timeline_items, event_id) { + if let Some(new_item) = update(item) { + timeline_items.set_cloned(idx, Arc::new(TimelineItem::Event(new_item))); + return true; } - - let mut lock = self.timeline.items.lock_mut(); - if let Some((idx, item)) = find_event(&lock, event_id) { - if let Some(new_item) = update(item) { - lock.set_cloned(idx, Arc::new(TimelineItem::Event(new_item))); - return true; - } - } else { - debug!(%event_id, "Timeline item not found, discarding {action}"); - } - - false + } else { + debug!(%event_id, "Timeline item not found, discarding {action}"); } - /// Returns whether an update happened - fn update_timeline_item( - &self, - event_id: &EventId, - action: &str, - update: impl FnOnce(&EventTimelineItem) -> EventTimelineItem, - ) -> bool { - self.maybe_update_timeline_item(event_id, action, move |item| Some(update(item))) - } + false +} + +/// Returns whether an update happened +fn update_timeline_item( + timeline_items: &mut MutableVecLockMut<'_, Arc>, + event_id: &EventId, + action: &str, + update: impl FnOnce(&EventTimelineItem) -> EventTimelineItem, +) -> bool { + maybe_update_timeline_item(timeline_items, event_id, action, move |item| Some(update(item))) } struct NewEventTimelineItem {