optimize(timeline): don't eagerly clone an EventTimelineItem before applying an aggregation onto it

This commit is contained in:
Benjamin Bouvier
2025-05-15 16:29:06 +02:00
parent 16fe53c40d
commit 9cbc674cf2
3 changed files with 88 additions and 58 deletions

View File

@@ -37,8 +37,9 @@
//! to cater for the first use case, and to never lose any aggregations in the
//! second use case.
use std::collections::HashMap;
use std::{borrow::Cow, collections::HashMap};
use as_variant::as_variant;
use ruma::{
MilliSecondsSinceUnixEpoch, OwnedEventId, OwnedTransactionId, OwnedUserId, RoomVersionId,
};
@@ -107,18 +108,22 @@ pub(crate) struct Aggregation {
}
/// Get the poll state from a given [`TimelineItemContent`].
fn poll_state_from_item(
content: &mut TimelineItemContent,
) -> Result<&mut PollState, AggregationError> {
match content {
TimelineItemContent::MsgLike(MsgLikeContent {
kind: MsgLikeKind::Poll(poll_state),
..
}) => Ok(poll_state),
_ => Err(AggregationError::InvalidType {
fn poll_state_from_item<'a>(
event: &'a mut Cow<'_, EventTimelineItem>,
) -> Result<&'a mut PollState, AggregationError> {
if event.content().is_poll() {
// It was a poll! Now return the state as mutable.
let state = as_variant!(
event.to_mut().content_mut(),
TimelineItemContent::MsgLike(MsgLikeContent { kind: MsgLikeKind::Poll(s), ..}) => s
)
.expect("it was a poll just above");
Ok(state)
} else {
Err(AggregationError::InvalidType {
expected: "a poll".to_owned(),
actual: content.debug_string().to_owned(),
}),
actual: event.content().debug_string().to_owned(),
})
}
}
@@ -138,12 +143,12 @@ impl Aggregation {
/// couldn't be applied.
pub fn apply(
&self,
event: &mut EventTimelineItem,
event: &mut Cow<'_, EventTimelineItem>,
room_version: &RoomVersionId,
) -> ApplyAggregationResult {
match &self.kind {
AggregationKind::PollResponse { sender, timestamp, answers } => {
match poll_state_from_item(event.content_mut()) {
match poll_state_from_item(event) {
Ok(state) => {
state.add_response(sender.clone(), *timestamp, answers.clone());
ApplyAggregationResult::UpdatedItem
@@ -156,31 +161,29 @@ impl Aggregation {
if event.content().is_redacted() {
ApplyAggregationResult::LeftItemIntact
} else {
*event = event.redact(room_version);
let new_item = event.redact(room_version);
*event = Cow::Owned(new_item);
ApplyAggregationResult::UpdatedItem
}
}
AggregationKind::PollEnd { end_date } => {
match poll_state_from_item(event.content_mut()) {
Ok(state) => {
if !state.end(*end_date) {
return ApplyAggregationResult::Error(
AggregationError::PollAlreadyEnded,
);
}
ApplyAggregationResult::UpdatedItem
AggregationKind::PollEnd { end_date } => match poll_state_from_item(event) {
Ok(state) => {
if !state.end(*end_date) {
return ApplyAggregationResult::Error(AggregationError::PollAlreadyEnded);
}
Err(err) => ApplyAggregationResult::Error(err),
ApplyAggregationResult::UpdatedItem
}
}
Err(err) => ApplyAggregationResult::Error(err),
},
AggregationKind::Reaction { key, sender, timestamp, reaction_status } => {
let Some(reactions) = event.content_mut().reactions_mut() else {
let Some(reactions) = event.content().reactions() else {
// These items don't hold reactions.
return ApplyAggregationResult::LeftItemIntact;
};
let mut reactions = reactions.clone();
let previous_reaction = reactions.entry(key.clone()).or_default().insert(
sender.clone(),
ReactionInfo { timestamp: *timestamp, status: reaction_status.clone() },
@@ -205,6 +208,9 @@ impl Aggregation {
if is_same {
ApplyAggregationResult::LeftItemIntact
} else {
if let Some(reactions_mut) = event.to_mut().content_mut().reactions_mut() {
*reactions_mut = reactions;
}
ApplyAggregationResult::UpdatedItem
}
}
@@ -219,10 +225,10 @@ impl Aggregation {
///
/// In case of error, returns an error detailing why the aggregation
/// couldn't be unapplied.
pub fn unapply(&self, content: &mut TimelineItemContent) -> ApplyAggregationResult {
pub fn unapply(&self, event: &mut Cow<'_, EventTimelineItem>) -> ApplyAggregationResult {
match &self.kind {
AggregationKind::PollResponse { sender, timestamp, .. } => {
let state = match poll_state_from_item(content) {
let state = match poll_state_from_item(event) {
Ok(state) => state,
Err(err) => return ApplyAggregationResult::Error(err),
};
@@ -241,11 +247,12 @@ impl Aggregation {
}
AggregationKind::Reaction { key, sender, .. } => {
let Some(reactions) = content.reactions_mut() else {
let Some(reactions) = event.content().reactions() else {
// An item that doesn't hold any reactions.
return ApplyAggregationResult::LeftItemIntact;
};
let mut reactions = reactions.clone();
let by_user = reactions.get_mut(key);
let previous_entry = if let Some(by_user) = by_user {
let prev = by_user.swap_remove(sender);
@@ -259,6 +266,9 @@ impl Aggregation {
};
if previous_entry.is_some() {
if let Some(reactions_mut) = event.to_mut().content_mut().reactions_mut() {
*reactions_mut = reactions;
}
ApplyAggregationResult::UpdatedItem
} else {
ApplyAggregationResult::LeftItemIntact
@@ -353,21 +363,29 @@ impl Aggregations {
/// Will return an error at the first aggregation that couldn't be applied;
/// see [`Aggregation::apply`] which explains under which conditions it can
/// happen.
///
/// Returns a boolean indicating whether at least one aggregation was
/// applied.
pub fn apply_all(
&self,
item_id: &TimelineEventItemId,
event: &mut EventTimelineItem,
event: &mut Cow<'_, EventTimelineItem>,
room_version: &RoomVersionId,
) -> Result<(), AggregationError> {
) -> Result<bool, AggregationError> {
let Some(aggregations) = self.related_events.get(item_id) else {
return Ok(());
return Ok(false);
};
let mut changed = false;
for a in aggregations {
if let ApplyAggregationResult::Error(err) = a.apply(event, room_version) {
return Err(err);
match a.apply(event, room_version) {
ApplyAggregationResult::UpdatedItem => {
changed = true;
}
ApplyAggregationResult::LeftItemIntact => {}
ApplyAggregationResult::Error(err) => return Err(err),
}
}
Ok(())
Ok(changed)
}
/// Mark a target event as being sent (i.e. it transitions from an local
@@ -457,11 +475,11 @@ pub(crate) fn find_item_and_apply_aggregation(
return None;
};
let mut new_event_item = event_item.clone();
match aggregation.apply(&mut new_event_item, room_version) {
let mut cowed = Cow::Borrowed(&*event_item);
match aggregation.apply(&mut cowed, room_version) {
ApplyAggregationResult::UpdatedItem => {
trace!("applied aggregation");
let new_event_item = cowed.into_owned();
let new_item =
TimelineItem::new(new_event_item.clone(), event_item.internal_id.to_owned());
items.replace(idx, new_item);
@@ -491,7 +509,7 @@ pub(crate) enum MarkAggregationSentResult {
/// The result of applying (or unapplying) an aggregation onto a timeline item.
pub(crate) enum ApplyAggregationResult {
/// The item has been updated after applying the aggregation.
/// The passed `Cow<EventTimelineItem>` has been cloned and updated.
UpdatedItem,
/// The item hasn't been modified after applying the aggregation, because it

View File

@@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use std::{collections::BTreeSet, fmt, sync::Arc};
use std::{borrow::Cow, collections::BTreeSet, fmt, sync::Arc};
use as_variant::as_variant;
use decryption_retry_task::DecryptionRetryTask;
@@ -909,14 +909,14 @@ impl<P: RoomDataProvider, D: Decryptor> TimelineController<P, D> {
if let Some((item_pos, item)) =
rfind_event_by_item_id(&txn.items, &target)
{
let mut event_item = item.clone();
match aggregation.apply(&mut event_item, &txn.meta.room_version) {
let mut cowed = Cow::Borrowed(&*item);
match aggregation.apply(&mut cowed, &txn.meta.room_version) {
ApplyAggregationResult::UpdatedItem => {
trace!("reapplied aggregation in the event");
let internal_id = item.internal_id.to_owned();
txn.items.replace(
item_pos,
TimelineItem::new(event_item, internal_id),
TimelineItem::new(cowed.into_owned(), internal_id),
);
txn.commit();
}
@@ -1002,13 +1002,14 @@ impl<P: RoomDataProvider, D: Decryptor> TimelineController<P, D> {
return false;
};
let mut content = item.content().clone();
match aggregation.unapply(&mut content) {
let mut cowed = Cow::Borrowed(&*item);
match aggregation.unapply(&mut cowed) {
ApplyAggregationResult::UpdatedItem => {
trace!("removed local reaction to local echo");
let internal_id = item.internal_id.clone();
let new_item = item.with_content(content);
state.items.replace(item_pos, TimelineItem::new(new_item, internal_id));
state.items.replace(
item_pos,
TimelineItem::new(cowed.into_owned(), item.internal_id.clone()),
);
}
ApplyAggregationResult::LeftItemIntact => {}
ApplyAggregationResult::Error(err) => {

View File

@@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use std::sync::Arc;
use std::{borrow::Cow, sync::Arc};
use as_variant::as_variant;
use imbl::Vector;
@@ -1047,13 +1047,14 @@ impl<'a, 'o> TimelineEventHandler<'a, 'o> {
};
if let Some((item_pos, item)) = rfind_event_by_item_id(self.items, target) {
let mut content = item.content().clone();
match aggregation.unapply(&mut content) {
let mut cowed = Cow::Borrowed(&*item);
match aggregation.unapply(&mut cowed) {
ApplyAggregationResult::UpdatedItem => {
trace!("removed aggregation");
let internal_id = item.internal_id.to_owned();
let new_item = item.with_content(content);
self.items.replace(item_pos, TimelineItem::new(new_item, internal_id));
self.items.replace(
item_pos,
TimelineItem::new(cowed.into_owned(), item.internal_id.to_owned()),
);
}
ApplyAggregationResult::LeftItemIntact => {}
ApplyAggregationResult::Error(err) => {
@@ -1140,12 +1141,22 @@ impl<'a, 'o> TimelineEventHandler<'a, 'o> {
);
// Apply any pending or stashed aggregations.
if let Err(err) = self.meta.aggregations.apply_all(
let mut cowed = Cow::Borrowed(&item);
match self.meta.aggregations.apply_all(
&self.ctx.flow.timeline_item_id(),
&mut item,
&mut cowed,
&self.meta.room_version,
) {
warn!("discarding aggregations: {err}");
Ok(true) => {
// At least one aggregation has been applied.
item = cowed.into_owned();
}
Ok(false) => {
// No aggregations have been applied.
}
Err(err) => {
warn!("discarding aggregations: {err}");
}
}
match &self.ctx.flow {