From 720d4434528bb22b1ead75f2f361963bac49751a Mon Sep 17 00:00:00 2001 From: Ivan Enderlin Date: Tue, 28 Jan 2025 15:05:52 +0100 Subject: [PATCH] chore(ui): Move `TimelineStateTransaction` into its own module. This patch moves `TimelineStateTransaction` and its implementation into its own module. The idea is to reduce the size of the `state.rs` module. --- .../src/timeline/controller/mod.rs | 7 +- .../src/timeline/controller/state.rs | 535 +---------------- .../timeline/controller/state_transaction.rs | 554 ++++++++++++++++++ 3 files changed, 562 insertions(+), 534 deletions(-) create mode 100644 crates/matrix-sdk-ui/src/timeline/controller/state_transaction.rs diff --git a/crates/matrix-sdk-ui/src/timeline/controller/mod.rs b/crates/matrix-sdk-ui/src/timeline/controller/mod.rs index b941c8042..e426aa872 100644 --- a/crates/matrix-sdk-ui/src/timeline/controller/mod.rs +++ b/crates/matrix-sdk-ui/src/timeline/controller/mod.rs @@ -61,10 +61,8 @@ pub(super) use self::{ AllRemoteEvents, ObservableItemsEntry, ObservableItemsTransaction, ObservableItemsTransactionEntry, }, - state::{ - FullEventMeta, PendingEdit, PendingEditKind, TimelineMetadata, TimelineState, - TimelineStateTransaction, - }, + state::{FullEventMeta, PendingEdit, PendingEditKind, TimelineMetadata, TimelineState}, + state_transaction::TimelineStateTransaction, }; use super::{ algorithms::{rfind_event_by_id, rfind_event_item}, @@ -91,6 +89,7 @@ use crate::{ mod observable_items; mod read_receipts; mod state; +mod state_transaction; /// Data associated to the current timeline focus. #[derive(Debug)] diff --git a/crates/matrix-sdk-ui/src/timeline/controller/state.rs b/crates/matrix-sdk-ui/src/timeline/controller/state.rs index df067a5fe..d44107078 100644 --- a/crates/matrix-sdk-ui/src/timeline/controller/state.rs +++ b/crates/matrix-sdk-ui/src/timeline/controller/state.rs @@ -20,7 +20,6 @@ use std::{ }; use eyeball_im::VectorDiff; -use itertools::Itertools as _; use matrix_sdk::{ deserialized_responses::TimelineEvent, ring_buffer::RingBuffer, send_queue::SendHandle, }; @@ -36,28 +35,24 @@ use ruma::{ room::message::RoomMessageEventContentWithoutRelation, AnySyncEphemeralRoomEvent, AnySyncTimelineEvent, }, - push::Action, serde::Raw, EventId, MilliSecondsSinceUnixEpoch, OwnedEventId, OwnedTransactionId, OwnedUserId, RoomVersionId, UserId, }; -use tracing::{debug, instrument, trace, warn}; +use tracing::{instrument, trace, warn}; use super::{ - observable_items::{ - AllRemoteEvents, ObservableItems, ObservableItemsTransaction, - ObservableItemsTransactionEntry, - }, + observable_items::{AllRemoteEvents, ObservableItems, ObservableItemsTransaction}, read_receipts::ReadReceipts, DateDividerMode, RelativePosition, TimelineFocusKind, TimelineSettings, + TimelineStateTransaction, }; use crate::{ - events::SyncTimelineEventWithoutContent, timeline::{ algorithms::rfind_event_by_id, date_dividers::DateDividerAdjuster, event_handler::{ - Flow, HandleEventResult, TimelineEventContext, TimelineEventHandler, TimelineEventKind, + Flow, TimelineEventContext, TimelineEventHandler, TimelineEventKind, TimelineItemPosition, }, event_item::{PollState, RemoteEventOrigin, ResponseData}, @@ -314,526 +309,6 @@ impl TimelineState { } } -pub(in crate::timeline) struct TimelineStateTransaction<'a> { - /// A vector transaction over the items themselves. Holds temporary state - /// until committed. - pub items: ObservableItemsTransaction<'a>, - - /// A clone of the previous meta, that we're operating on during the - /// transaction, and that will be committed to the previous meta location in - /// [`Self::commit`]. - pub meta: TimelineMetadata, - - /// Pointer to the previous meta, only used during [`Self::commit`]. - previous_meta: &'a mut TimelineMetadata, - - /// The kind of focus of this timeline. - timeline_focus: TimelineFocusKind, -} - -impl TimelineStateTransaction<'_> { - /// Handle updates on events as [`VectorDiff`]s. - pub(super) async fn handle_remote_events_with_diffs( - &mut self, - diffs: Vec>, - origin: RemoteEventOrigin, - room_data_provider: &RoomData, - settings: &TimelineSettings, - ) where - RoomData: RoomDataProvider, - { - let mut date_divider_adjuster = - DateDividerAdjuster::new(settings.date_divider_mode.clone()); - - for diff in diffs { - match diff { - VectorDiff::Append { values: events } => { - for event in events { - self.handle_remote_event( - event, - TimelineItemPosition::End { origin }, - room_data_provider, - settings, - &mut date_divider_adjuster, - ) - .await; - } - } - - VectorDiff::PushFront { value: event } => { - self.handle_remote_event( - event, - TimelineItemPosition::Start { origin }, - room_data_provider, - settings, - &mut date_divider_adjuster, - ) - .await; - } - - VectorDiff::PushBack { value: event } => { - self.handle_remote_event( - event, - TimelineItemPosition::End { origin }, - room_data_provider, - settings, - &mut date_divider_adjuster, - ) - .await; - } - - VectorDiff::Insert { index: event_index, value: event } => { - self.handle_remote_event( - event, - TimelineItemPosition::At { event_index, origin }, - room_data_provider, - settings, - &mut date_divider_adjuster, - ) - .await; - } - - VectorDiff::Set { index: event_index, value: event } => { - if let Some(timeline_item_index) = self - .items - .all_remote_events() - .get(event_index) - .and_then(|meta| meta.timeline_item_index) - { - self.handle_remote_event( - event, - TimelineItemPosition::UpdateAt { timeline_item_index }, - room_data_provider, - settings, - &mut date_divider_adjuster, - ) - .await; - } else { - warn!(event_index, "Set update dropped because there wasn't any attached timeline item index."); - } - } - - VectorDiff::Remove { index: event_index } => { - self.remove_timeline_item(event_index, &mut date_divider_adjuster); - } - - VectorDiff::Clear => { - self.clear(); - } - - v => unimplemented!("{v:?}"), - } - } - - self.adjust_date_dividers(date_divider_adjuster); - self.check_no_unused_unique_ids(); - } - - fn check_no_unused_unique_ids(&self) { - let duplicates = self - .items - .iter() - .duplicates_by(|item| item.unique_id()) - .map(|item| item.unique_id()) - .collect::>(); - - if !duplicates.is_empty() { - #[cfg(any(debug_assertions, test))] - panic!("duplicate unique ids in this timeline:{:?}\n{:?}", duplicates, self.items); - - #[cfg(not(any(debug_assertions, test)))] - tracing::error!( - "duplicate unique ids in this timeline:{:?}\n{:?}", - duplicates, - self.items - ); - } - } - - /// Handle a remote event. - /// - /// Returns the number of timeline updates that were made. - async fn handle_remote_event( - &mut self, - event: TimelineEvent, - position: TimelineItemPosition, - room_data_provider: &P, - settings: &TimelineSettings, - date_divider_adjuster: &mut DateDividerAdjuster, - ) -> HandleEventResult { - let TimelineEvent { push_actions, kind } = event; - let encryption_info = kind.encryption_info().cloned(); - - let (raw, utd_info) = match kind { - matrix_sdk::deserialized_responses::TimelineEventKind::UnableToDecrypt { - utd_info, - event, - } => (event, Some(utd_info)), - _ => (kind.into_raw(), None), - }; - - let (event_id, sender, timestamp, txn_id, event_kind, should_add) = match raw.deserialize() - { - // Classical path: the event is valid, can be deserialized, everything is alright. - Ok(event) => { - let event_id = event.event_id().to_owned(); - let room_version = room_data_provider.room_version(); - - let mut should_add = (settings.event_filter)(&event, &room_version); - - if should_add { - // Retrieve the origin of the event. - let origin = match position { - TimelineItemPosition::End { origin } - | TimelineItemPosition::Start { origin } - | TimelineItemPosition::At { origin, .. } => origin, - - TimelineItemPosition::UpdateAt { timeline_item_index: idx } => self - .items - .get(idx) - .and_then(|item| item.as_event()) - .and_then(|item| item.as_remote()) - .map_or(RemoteEventOrigin::Unknown, |item| item.origin), - }; - - // If the event should be added according to the general event filter, use a - // second filter to decide whether it should be added depending on the timeline - // focus and events origin, if needed - match self.timeline_focus { - TimelineFocusKind::PinnedEvents => { - // Only add pinned events for the pinned events timeline - should_add = room_data_provider.is_pinned_event(&event_id); - } - TimelineFocusKind::Live => { - match origin { - RemoteEventOrigin::Sync | RemoteEventOrigin::Unknown => { - // Always add new items to a live timeline receiving items from - // sync. - should_add = true; - } - RemoteEventOrigin::Cache | RemoteEventOrigin::Pagination => { - // Forward the previous decision to add it. - } - } - } - TimelineFocusKind::Event => { - match origin { - RemoteEventOrigin::Sync | RemoteEventOrigin::Unknown => { - // Never add any item to a focused timeline when the item comes - // down from the sync. - should_add = false; - } - RemoteEventOrigin::Cache | RemoteEventOrigin::Pagination => { - // Forward the previous decision to add it. - } - } - } - } - } - - ( - event_id, - event.sender().to_owned(), - event.origin_server_ts(), - event.transaction_id().map(ToOwned::to_owned), - TimelineEventKind::from_event(event, &raw, room_data_provider, utd_info).await, - should_add, - ) - } - - // The event seems invalid… - Err(e) => match raw.deserialize_as::() { - // The event can be partially deserialized, and it is allowed to be added to the - // timeline. - Ok(event) if settings.add_failed_to_parse => ( - event.event_id().to_owned(), - event.sender().to_owned(), - event.origin_server_ts(), - event.transaction_id().map(ToOwned::to_owned), - TimelineEventKind::failed_to_parse(event, e), - true, - ), - - // The event can be partially deserialized, but it is NOT allowed to be added to - // the timeline. - Ok(event) => { - let event_type = event.event_type(); - let event_id = event.event_id(); - warn!(%event_type, %event_id, "Failed to deserialize timeline event: {e}"); - - let is_own_event = event.sender() == room_data_provider.own_user_id(); - let event_meta = FullEventMeta { - event_id, - sender: Some(event.sender()), - is_own_event, - timestamp: Some(event.origin_server_ts()), - visible: false, - }; - - // Remember the event before returning prematurely. - // See [`ObservableItems::all_remote_events`]. - self.add_or_update_remote_event( - event_meta, - position, - room_data_provider, - settings, - ) - .await; - - return HandleEventResult::default(); - } - - // The event can NOT be partially deserialized, it seems really broken. - Err(e) => { - let event_type: Option = raw.get_field("type").ok().flatten(); - let event_id: Option = raw.get_field("event_id").ok().flatten(); - warn!( - event_type, - event_id, "Failed to deserialize timeline event even without content: {e}" - ); - - let event_id = event_id.and_then(|s| EventId::parse(s).ok()); - - if let Some(event_id) = &event_id { - let sender: Option = raw.get_field("sender").ok().flatten(); - let is_own_event = - sender.as_ref().is_some_and(|s| s == room_data_provider.own_user_id()); - let timestamp: Option = - raw.get_field("origin_server_ts").ok().flatten(); - - let event_meta = FullEventMeta { - event_id, - sender: sender.as_deref(), - is_own_event, - timestamp, - visible: false, - }; - - // Remember the event before returning prematurely. - // See [`ObservableItems::all_remote_events`]. - self.add_or_update_remote_event( - event_meta, - position, - room_data_provider, - settings, - ) - .await; - } - - return HandleEventResult::default(); - } - }, - }; - - let is_own_event = sender == room_data_provider.own_user_id(); - - let event_meta = FullEventMeta { - event_id: &event_id, - sender: Some(&sender), - is_own_event, - timestamp: Some(timestamp), - visible: should_add, - }; - - // Remember the event. - // See [`ObservableItems::all_remote_events`]. - self.add_or_update_remote_event(event_meta, position, room_data_provider, settings).await; - - let sender_profile = room_data_provider.profile_from_user_id(&sender).await; - let ctx = TimelineEventContext { - sender, - sender_profile, - timestamp, - is_own_event, - read_receipts: if settings.track_read_receipts && should_add { - self.meta.read_receipts.compute_event_receipts( - &event_id, - self.items.all_remote_events(), - matches!(position, TimelineItemPosition::End { .. }), - ) - } else { - Default::default() - }, - is_highlighted: push_actions - .as_ref() - .is_some_and(|actions| actions.iter().any(Action::is_highlight)), - flow: Flow::Remote { - event_id: event_id.clone(), - raw_event: raw, - encryption_info, - txn_id, - position, - }, - should_add_new_items: should_add, - }; - - // Handle the event to create or update a timeline item. - TimelineEventHandler::new(self, ctx).handle_event(date_divider_adjuster, event_kind).await - } - - /// Remove one timeline item by its `event_index`. - fn remove_timeline_item( - &mut self, - event_index: usize, - day_divider_adjuster: &mut DateDividerAdjuster, - ) { - day_divider_adjuster.mark_used(); - - // We need to be careful here. - // - // We must first remove the timeline item, which will update the mapping between - // remote events and timeline items. Removing the timeline item will “unlink” - // this mapping as the remote event will be updated to map to nothing. Only - // after that, we can remove the remote event. Doing this in the other order - // will update the mapping twice, and will result in a corrupted state. - - // Remove the timeline item first. - if let Some(event_meta) = self.items.all_remote_events().get(event_index) { - // Fetch the `timeline_item_index` associated to the remote event. - if let Some(timeline_item_index) = event_meta.timeline_item_index { - let _removed_timeline_item = self.items.remove(timeline_item_index); - } - - // Now we can remove the remote event. - self.items.remove_remote_event(event_index); - } - } - - fn clear(&mut self) { - let has_local_echoes = self.items.iter().any(|item| item.is_local_echo()); - - // By first checking if there are any local echoes first, we do a bit - // more work in case some are found, but it should be worth it because - // there will often not be any, and only emitting a single - // `VectorDiff::Clear` should be much more efficient to process for - // subscribers. - if has_local_echoes { - // Remove all remote events and the read marker - self.items.for_each(|entry| { - if entry.is_remote_event() || entry.is_read_marker() { - ObservableItemsTransactionEntry::remove(entry); - } - }); - - // Remove stray date dividers - let mut idx = 0; - while idx < self.items.len() { - if self.items[idx].is_date_divider() - && self.items.get(idx + 1).is_none_or(|item| item.is_date_divider()) - { - self.items.remove(idx); - // don't increment idx because all elements have shifted - } else { - idx += 1; - } - } - } else { - self.items.clear(); - } - - self.meta.clear(); - - debug!(remaining_items = self.items.len(), "Timeline cleared"); - } - - #[instrument(skip_all)] - fn set_fully_read_event(&mut self, fully_read_event_id: OwnedEventId) { - // A similar event has been handled already. We can ignore it. - if self.meta.fully_read_event.as_ref().is_some_and(|id| *id == fully_read_event_id) { - return; - } - - self.meta.fully_read_event = Some(fully_read_event_id); - self.meta.update_read_marker(&mut self.items); - } - - pub(super) fn commit(self) { - let Self { items, previous_meta, meta, .. } = self; - - // Replace the pointer to the previous meta with the new one. - *previous_meta = meta; - - items.commit(); - } - - /// Add or update a remote event in the - /// [`ObservableItems::all_remote_events`] collection. - /// - /// This method also adjusts read receipt if needed. - async fn add_or_update_remote_event( - &mut self, - event_meta: FullEventMeta<'_>, - position: TimelineItemPosition, - room_data_provider: &P, - settings: &TimelineSettings, - ) { - match position { - TimelineItemPosition::Start { .. } => { - self.items.push_front_remote_event(event_meta.base_meta()) - } - - TimelineItemPosition::End { .. } => { - self.items.push_back_remote_event(event_meta.base_meta()); - } - - TimelineItemPosition::At { event_index, .. } => { - self.items.insert_remote_event(event_index, event_meta.base_meta()); - } - - TimelineItemPosition::UpdateAt { .. } => { - if let Some(event) = - self.items.get_remote_event_by_event_id_mut(event_meta.event_id) - { - if event.visible != event_meta.visible { - event.visible = event_meta.visible; - - if settings.track_read_receipts { - // Since the event's visibility changed, we need to update the read - // receipts of the previous visible event. - self.maybe_update_read_receipts_of_prev_event(event_meta.event_id); - } - } - } - } - } - - if settings.track_read_receipts - && matches!( - position, - TimelineItemPosition::Start { .. } - | TimelineItemPosition::End { .. } - | TimelineItemPosition::At { .. } - ) - { - self.load_read_receipts_for_event(event_meta.event_id, room_data_provider).await; - - self.maybe_add_implicit_read_receipt(event_meta); - } - } - - fn adjust_date_dividers(&mut self, mut adjuster: DateDividerAdjuster) { - adjuster.run(&mut self.items, &mut self.meta); - } - - /// This method replaces the `is_room_encrypted` value for all timeline - /// items to its updated version and creates a `VectorDiff::Set` operation - /// for each item which will be added to this transaction. - fn update_all_events_is_room_encrypted(&mut self, is_encrypted: Option) { - for idx in 0..self.items.len() { - let item = &self.items[idx]; - - if let Some(event) = item.as_event() { - let mut cloned_event = event.clone(); - cloned_event.is_room_encrypted = is_encrypted; - - // Replace the existing item with a new version with the right encryption flag - let item = item.with_kind(cloned_event); - self.items.replace(idx, item); - } - } - } -} - /// Cache holding poll response and end events handled before their poll start /// event has been handled. #[derive(Clone, Debug, Default)] @@ -1184,7 +659,7 @@ pub(crate) struct FullEventMeta<'a> { } impl FullEventMeta<'_> { - fn base_meta(&self) -> EventMeta { + pub(super) fn base_meta(&self) -> EventMeta { EventMeta { event_id: self.event_id.to_owned(), visible: self.visible, diff --git a/crates/matrix-sdk-ui/src/timeline/controller/state_transaction.rs b/crates/matrix-sdk-ui/src/timeline/controller/state_transaction.rs new file mode 100644 index 000000000..f54e0ce70 --- /dev/null +++ b/crates/matrix-sdk-ui/src/timeline/controller/state_transaction.rs @@ -0,0 +1,554 @@ +// Copyright 2023 The Matrix.org Foundation C.I.C. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use eyeball_im::VectorDiff; +use itertools::Itertools as _; +use matrix_sdk::deserialized_responses::TimelineEvent; +use ruma::{push::Action, EventId, MilliSecondsSinceUnixEpoch, OwnedEventId, OwnedUserId}; +use tracing::{debug, instrument, warn}; + +use super::{ + super::{ + controller::{FullEventMeta, ObservableItemsTransactionEntry}, + date_dividers::DateDividerAdjuster, + event_handler::{ + Flow, HandleEventResult, TimelineEventContext, TimelineEventHandler, TimelineEventKind, + TimelineItemPosition, + }, + event_item::RemoteEventOrigin, + traits::RoomDataProvider, + }, + ObservableItemsTransaction, TimelineFocusKind, TimelineMetadata, TimelineSettings, +}; +use crate::events::SyncTimelineEventWithoutContent; + +pub(in crate::timeline) struct TimelineStateTransaction<'a> { + /// A vector transaction over the items themselves. Holds temporary state + /// until committed. + pub items: ObservableItemsTransaction<'a>, + + /// A clone of the previous meta, that we're operating on during the + /// transaction, and that will be committed to the previous meta location in + /// [`Self::commit`]. + pub meta: TimelineMetadata, + + /// Pointer to the previous meta, only used during [`Self::commit`]. + pub(super) previous_meta: &'a mut TimelineMetadata, + + /// The kind of focus of this timeline. + pub(super) timeline_focus: TimelineFocusKind, +} + +impl TimelineStateTransaction<'_> { + /// Handle updates on events as [`VectorDiff`]s. + pub(super) async fn handle_remote_events_with_diffs( + &mut self, + diffs: Vec>, + origin: RemoteEventOrigin, + room_data_provider: &RoomData, + settings: &TimelineSettings, + ) where + RoomData: RoomDataProvider, + { + let mut date_divider_adjuster = + DateDividerAdjuster::new(settings.date_divider_mode.clone()); + + for diff in diffs { + match diff { + VectorDiff::Append { values: events } => { + for event in events { + self.handle_remote_event( + event, + TimelineItemPosition::End { origin }, + room_data_provider, + settings, + &mut date_divider_adjuster, + ) + .await; + } + } + + VectorDiff::PushFront { value: event } => { + self.handle_remote_event( + event, + TimelineItemPosition::Start { origin }, + room_data_provider, + settings, + &mut date_divider_adjuster, + ) + .await; + } + + VectorDiff::PushBack { value: event } => { + self.handle_remote_event( + event, + TimelineItemPosition::End { origin }, + room_data_provider, + settings, + &mut date_divider_adjuster, + ) + .await; + } + + VectorDiff::Insert { index: event_index, value: event } => { + self.handle_remote_event( + event, + TimelineItemPosition::At { event_index, origin }, + room_data_provider, + settings, + &mut date_divider_adjuster, + ) + .await; + } + + VectorDiff::Set { index: event_index, value: event } => { + if let Some(timeline_item_index) = self + .items + .all_remote_events() + .get(event_index) + .and_then(|meta| meta.timeline_item_index) + { + self.handle_remote_event( + event, + TimelineItemPosition::UpdateAt { timeline_item_index }, + room_data_provider, + settings, + &mut date_divider_adjuster, + ) + .await; + } else { + warn!(event_index, "Set update dropped because there wasn't any attached timeline item index."); + } + } + + VectorDiff::Remove { index: event_index } => { + self.remove_timeline_item(event_index, &mut date_divider_adjuster); + } + + VectorDiff::Clear => { + self.clear(); + } + + v => unimplemented!("{v:?}"), + } + } + + self.adjust_date_dividers(date_divider_adjuster); + self.check_no_unused_unique_ids(); + } + + fn check_no_unused_unique_ids(&self) { + let duplicates = self + .items + .iter() + .duplicates_by(|item| item.unique_id()) + .map(|item| item.unique_id()) + .collect::>(); + + if !duplicates.is_empty() { + #[cfg(any(debug_assertions, test))] + panic!("duplicate unique ids in this timeline:{:?}\n{:?}", duplicates, self.items); + + #[cfg(not(any(debug_assertions, test)))] + tracing::error!( + "duplicate unique ids in this timeline:{:?}\n{:?}", + duplicates, + self.items + ); + } + } + + /// Handle a remote event. + /// + /// Returns the number of timeline updates that were made. + pub(super) async fn handle_remote_event( + &mut self, + event: TimelineEvent, + position: TimelineItemPosition, + room_data_provider: &P, + settings: &TimelineSettings, + date_divider_adjuster: &mut DateDividerAdjuster, + ) -> HandleEventResult { + let TimelineEvent { push_actions, kind } = event; + let encryption_info = kind.encryption_info().cloned(); + + let (raw, utd_info) = match kind { + matrix_sdk::deserialized_responses::TimelineEventKind::UnableToDecrypt { + utd_info, + event, + } => (event, Some(utd_info)), + _ => (kind.into_raw(), None), + }; + + let (event_id, sender, timestamp, txn_id, event_kind, should_add) = match raw.deserialize() + { + // Classical path: the event is valid, can be deserialized, everything is alright. + Ok(event) => { + let event_id = event.event_id().to_owned(); + let room_version = room_data_provider.room_version(); + + let mut should_add = (settings.event_filter)(&event, &room_version); + + if should_add { + // Retrieve the origin of the event. + let origin = match position { + TimelineItemPosition::End { origin } + | TimelineItemPosition::Start { origin } + | TimelineItemPosition::At { origin, .. } => origin, + + TimelineItemPosition::UpdateAt { timeline_item_index: idx } => self + .items + .get(idx) + .and_then(|item| item.as_event()) + .and_then(|item| item.as_remote()) + .map_or(RemoteEventOrigin::Unknown, |item| item.origin), + }; + + // If the event should be added according to the general event filter, use a + // second filter to decide whether it should be added depending on the timeline + // focus and events origin, if needed + match self.timeline_focus { + TimelineFocusKind::PinnedEvents => { + // Only add pinned events for the pinned events timeline + should_add = room_data_provider.is_pinned_event(&event_id); + } + TimelineFocusKind::Live => { + match origin { + RemoteEventOrigin::Sync | RemoteEventOrigin::Unknown => { + // Always add new items to a live timeline receiving items from + // sync. + should_add = true; + } + RemoteEventOrigin::Cache | RemoteEventOrigin::Pagination => { + // Forward the previous decision to add it. + } + } + } + TimelineFocusKind::Event => { + match origin { + RemoteEventOrigin::Sync | RemoteEventOrigin::Unknown => { + // Never add any item to a focused timeline when the item comes + // down from the sync. + should_add = false; + } + RemoteEventOrigin::Cache | RemoteEventOrigin::Pagination => { + // Forward the previous decision to add it. + } + } + } + } + } + + ( + event_id, + event.sender().to_owned(), + event.origin_server_ts(), + event.transaction_id().map(ToOwned::to_owned), + TimelineEventKind::from_event(event, &raw, room_data_provider, utd_info).await, + should_add, + ) + } + + // The event seems invalid… + Err(e) => match raw.deserialize_as::() { + // The event can be partially deserialized, and it is allowed to be added to the + // timeline. + Ok(event) if settings.add_failed_to_parse => ( + event.event_id().to_owned(), + event.sender().to_owned(), + event.origin_server_ts(), + event.transaction_id().map(ToOwned::to_owned), + TimelineEventKind::failed_to_parse(event, e), + true, + ), + + // The event can be partially deserialized, but it is NOT allowed to be added to + // the timeline. + Ok(event) => { + let event_type = event.event_type(); + let event_id = event.event_id(); + warn!(%event_type, %event_id, "Failed to deserialize timeline event: {e}"); + + let is_own_event = event.sender() == room_data_provider.own_user_id(); + let event_meta = FullEventMeta { + event_id, + sender: Some(event.sender()), + is_own_event, + timestamp: Some(event.origin_server_ts()), + visible: false, + }; + + // Remember the event before returning prematurely. + // See [`ObservableItems::all_remote_events`]. + self.add_or_update_remote_event( + event_meta, + position, + room_data_provider, + settings, + ) + .await; + + return HandleEventResult::default(); + } + + // The event can NOT be partially deserialized, it seems really broken. + Err(e) => { + let event_type: Option = raw.get_field("type").ok().flatten(); + let event_id: Option = raw.get_field("event_id").ok().flatten(); + warn!( + event_type, + event_id, "Failed to deserialize timeline event even without content: {e}" + ); + + let event_id = event_id.and_then(|s| EventId::parse(s).ok()); + + if let Some(event_id) = &event_id { + let sender: Option = raw.get_field("sender").ok().flatten(); + let is_own_event = + sender.as_ref().is_some_and(|s| s == room_data_provider.own_user_id()); + let timestamp: Option = + raw.get_field("origin_server_ts").ok().flatten(); + + let event_meta = FullEventMeta { + event_id, + sender: sender.as_deref(), + is_own_event, + timestamp, + visible: false, + }; + + // Remember the event before returning prematurely. + // See [`ObservableItems::all_remote_events`]. + self.add_or_update_remote_event( + event_meta, + position, + room_data_provider, + settings, + ) + .await; + } + + return HandleEventResult::default(); + } + }, + }; + + let is_own_event = sender == room_data_provider.own_user_id(); + + let event_meta = FullEventMeta { + event_id: &event_id, + sender: Some(&sender), + is_own_event, + timestamp: Some(timestamp), + visible: should_add, + }; + + // Remember the event. + // See [`ObservableItems::all_remote_events`]. + self.add_or_update_remote_event(event_meta, position, room_data_provider, settings).await; + + let sender_profile = room_data_provider.profile_from_user_id(&sender).await; + let ctx = TimelineEventContext { + sender, + sender_profile, + timestamp, + is_own_event, + read_receipts: if settings.track_read_receipts && should_add { + self.meta.read_receipts.compute_event_receipts( + &event_id, + self.items.all_remote_events(), + matches!(position, TimelineItemPosition::End { .. }), + ) + } else { + Default::default() + }, + is_highlighted: push_actions + .as_ref() + .is_some_and(|actions| actions.iter().any(Action::is_highlight)), + flow: Flow::Remote { + event_id: event_id.clone(), + raw_event: raw, + encryption_info, + txn_id, + position, + }, + should_add_new_items: should_add, + }; + + // Handle the event to create or update a timeline item. + TimelineEventHandler::new(self, ctx).handle_event(date_divider_adjuster, event_kind).await + } + + /// Remove one timeline item by its `event_index`. + fn remove_timeline_item( + &mut self, + event_index: usize, + day_divider_adjuster: &mut DateDividerAdjuster, + ) { + day_divider_adjuster.mark_used(); + + // We need to be careful here. + // + // We must first remove the timeline item, which will update the mapping between + // remote events and timeline items. Removing the timeline item will “unlink” + // this mapping as the remote event will be updated to map to nothing. Only + // after that, we can remove the remote event. Doing this in the other order + // will update the mapping twice, and will result in a corrupted state. + + // Remove the timeline item first. + if let Some(event_meta) = self.items.all_remote_events().get(event_index) { + // Fetch the `timeline_item_index` associated to the remote event. + if let Some(timeline_item_index) = event_meta.timeline_item_index { + let _removed_timeline_item = self.items.remove(timeline_item_index); + } + + // Now we can remove the remote event. + self.items.remove_remote_event(event_index); + } + } + + pub(super) fn clear(&mut self) { + let has_local_echoes = self.items.iter().any(|item| item.is_local_echo()); + + // By first checking if there are any local echoes first, we do a bit + // more work in case some are found, but it should be worth it because + // there will often not be any, and only emitting a single + // `VectorDiff::Clear` should be much more efficient to process for + // subscribers. + if has_local_echoes { + // Remove all remote events and the read marker + self.items.for_each(|entry| { + if entry.is_remote_event() || entry.is_read_marker() { + ObservableItemsTransactionEntry::remove(entry); + } + }); + + // Remove stray date dividers + let mut idx = 0; + while idx < self.items.len() { + if self.items[idx].is_date_divider() + && self.items.get(idx + 1).is_none_or(|item| item.is_date_divider()) + { + self.items.remove(idx); + // don't increment idx because all elements have shifted + } else { + idx += 1; + } + } + } else { + self.items.clear(); + } + + self.meta.clear(); + + debug!(remaining_items = self.items.len(), "Timeline cleared"); + } + + #[instrument(skip_all)] + pub(super) fn set_fully_read_event(&mut self, fully_read_event_id: OwnedEventId) { + // A similar event has been handled already. We can ignore it. + if self.meta.fully_read_event.as_ref().is_some_and(|id| *id == fully_read_event_id) { + return; + } + + self.meta.fully_read_event = Some(fully_read_event_id); + self.meta.update_read_marker(&mut self.items); + } + + pub(super) fn commit(self) { + let Self { items, previous_meta, meta, .. } = self; + + // Replace the pointer to the previous meta with the new one. + *previous_meta = meta; + + items.commit(); + } + + /// Add or update a remote event in the + /// [`ObservableItems::all_remote_events`] collection. + /// + /// This method also adjusts read receipt if needed. + async fn add_or_update_remote_event( + &mut self, + event_meta: FullEventMeta<'_>, + position: TimelineItemPosition, + room_data_provider: &P, + settings: &TimelineSettings, + ) { + match position { + TimelineItemPosition::Start { .. } => { + self.items.push_front_remote_event(event_meta.base_meta()) + } + + TimelineItemPosition::End { .. } => { + self.items.push_back_remote_event(event_meta.base_meta()); + } + + TimelineItemPosition::At { event_index, .. } => { + self.items.insert_remote_event(event_index, event_meta.base_meta()); + } + + TimelineItemPosition::UpdateAt { .. } => { + if let Some(event) = + self.items.get_remote_event_by_event_id_mut(event_meta.event_id) + { + if event.visible != event_meta.visible { + event.visible = event_meta.visible; + + if settings.track_read_receipts { + // Since the event's visibility changed, we need to update the read + // receipts of the previous visible event. + self.maybe_update_read_receipts_of_prev_event(event_meta.event_id); + } + } + } + } + } + + if settings.track_read_receipts + && matches!( + position, + TimelineItemPosition::Start { .. } + | TimelineItemPosition::End { .. } + | TimelineItemPosition::At { .. } + ) + { + self.load_read_receipts_for_event(event_meta.event_id, room_data_provider).await; + + self.maybe_add_implicit_read_receipt(event_meta); + } + } + + pub(super) fn adjust_date_dividers(&mut self, mut adjuster: DateDividerAdjuster) { + adjuster.run(&mut self.items, &mut self.meta); + } + + /// This method replaces the `is_room_encrypted` value for all timeline + /// items to its updated version and creates a `VectorDiff::Set` operation + /// for each item which will be added to this transaction. + pub(super) fn update_all_events_is_room_encrypted(&mut self, is_encrypted: Option) { + for idx in 0..self.items.len() { + let item = &self.items[idx]; + + if let Some(event) = item.as_event() { + let mut cloned_event = event.clone(); + cloned_event.is_room_encrypted = is_encrypted; + + // Replace the existing item with a new version with the right encryption flag + let item = item.with_kind(cloned_event); + self.items.replace(idx, item); + } + } + } +}