From e8877fd9871c43cdaf2f0a73da57d1082abc7445 Mon Sep 17 00:00:00 2001 From: Benjamin Bouvier Date: Tue, 1 Jul 2025 11:02:53 +0200 Subject: [PATCH] refactor(timeline): move long-lived tasks to a new `tasks` mod Only code motion. --- crates/matrix-sdk-ui/src/timeline/builder.rs | 168 +---------------- crates/matrix-sdk-ui/src/timeline/mod.rs | 1 + crates/matrix-sdk-ui/src/timeline/tasks.rs | 181 +++++++++++++++++++ 3 files changed, 191 insertions(+), 159 deletions(-) create mode 100644 crates/matrix-sdk-ui/src/timeline/tasks.rs diff --git a/crates/matrix-sdk-ui/src/timeline/builder.rs b/crates/matrix-sdk-ui/src/timeline/builder.rs index e1d286f17..5638291f8 100644 --- a/crates/matrix-sdk-ui/src/timeline/builder.rs +++ b/crates/matrix-sdk-ui/src/timeline/builder.rs @@ -20,25 +20,24 @@ use std::{ use futures_core::Stream; use futures_util::{pin_mut, StreamExt}; use matrix_sdk::{ - crypto::store::types::RoomKeyInfo, - encryption::backups::BackupState, - event_cache::{EventsOrigin, RoomEventCache, RoomEventCacheSubscriber, RoomEventCacheUpdate}, - executor::spawn, - send_queue::RoomSendQueueUpdate, - Room, + crypto::store::types::RoomKeyInfo, encryption::backups::BackupState, executor::spawn, Room, }; use matrix_sdk_base::{SendOutsideWasm, SyncOutsideWasm}; -use ruma::{events::AnySyncTimelineEvent, OwnedEventId, RoomVersionId}; -use tokio::sync::broadcast::{error::RecvError, Receiver}; +use ruma::{events::AnySyncTimelineEvent, RoomVersionId}; use tokio_stream::wrappers::errors::BroadcastStreamRecvError; -use tracing::{info_span, instrument, trace, warn, Instrument, Span}; +use tracing::{info_span, warn, Instrument, Span}; use super::{ controller::{TimelineController, TimelineSettings}, to_device::{handle_forwarded_room_key_event, handle_room_key_event}, DateDividerMode, Error, Timeline, TimelineDropHandle, TimelineFocus, }; -use crate::{timeline::event_item::RemoteEventOrigin, unable_to_decrypt_hook::UtdHookManager}; +use crate::{ + timeline::tasks::{ + pinned_events_task, room_event_cache_updates_task, room_send_queue_update_task, + }, + unable_to_decrypt_hook::UtdHookManager, +}; /// Builder that allows creating and configuring various parts of a /// [`Timeline`]. @@ -313,155 +312,6 @@ impl TimelineBuilder { } } -/// The task that handles the pinned event IDs updates. -#[instrument( - skip_all, - fields( - room_id = %timeline_controller.room().room_id(), - ) -)] -async fn pinned_events_task(pinned_event_ids_stream: S, timeline_controller: TimelineController) -where - S: Stream>, -{ - pin_mut!(pinned_event_ids_stream); - - while pinned_event_ids_stream.next().await.is_some() { - trace!("received a pinned events update"); - - match timeline_controller.reload_pinned_events().await { - Ok(Some(events)) => { - trace!("successfully reloaded pinned events"); - timeline_controller - .replace_with_initial_remote_events( - events.into_iter(), - RemoteEventOrigin::Pagination, - ) - .await; - } - - Ok(None) => { - // The list of pinned events hasn't changed since the previous - // time. - } - - Err(err) => { - warn!("Failed to reload pinned events: {err}"); - } - } - } -} - -/// The task that handles the [`RoomEventCacheUpdate`]s. -async fn room_event_cache_updates_task( - room_event_cache: RoomEventCache, - timeline_controller: TimelineController, - mut room_event_cache_subscriber: RoomEventCacheSubscriber, - timeline_focus: TimelineFocus, -) { - trace!("Spawned the event subscriber task."); - - loop { - trace!("Waiting for an event."); - - let update = match room_event_cache_subscriber.recv().await { - Ok(up) => up, - Err(RecvError::Closed) => break, - Err(RecvError::Lagged(num_skipped)) => { - warn!(num_skipped, "Lagged behind event cache updates, resetting timeline"); - - // The updates might have lagged, but the room event cache might have - // events, so retrieve them and add them back again to the timeline, - // after clearing it. - let initial_events = room_event_cache.events().await; - - timeline_controller - .replace_with_initial_remote_events( - initial_events.into_iter(), - RemoteEventOrigin::Cache, - ) - .await; - - continue; - } - }; - - match update { - RoomEventCacheUpdate::MoveReadMarkerTo { event_id } => { - trace!(target = %event_id, "Handling fully read marker."); - timeline_controller.handle_fully_read_marker(event_id).await; - } - - RoomEventCacheUpdate::UpdateTimelineEvents { diffs, origin } => { - trace!("Received new timeline events diffs"); - let origin = match origin { - EventsOrigin::Sync => RemoteEventOrigin::Sync, - EventsOrigin::Pagination => RemoteEventOrigin::Pagination, - EventsOrigin::Cache => RemoteEventOrigin::Cache, - }; - - let has_diffs = !diffs.is_empty(); - - if matches!( - timeline_focus, - TimelineFocus::Live { .. } | TimelineFocus::Thread { .. } - ) { - timeline_controller.handle_remote_events_with_diffs(diffs, origin).await; - } else { - // Only handle the remote aggregation for a non-live timeline. - timeline_controller.handle_remote_aggregations(diffs, origin).await; - } - - if has_diffs && matches!(origin, RemoteEventOrigin::Cache) { - timeline_controller.retry_event_decryption(None).await; - } - } - - RoomEventCacheUpdate::AddEphemeralEvents { events } => { - trace!("Received new ephemeral events from sync."); - - // TODO: (bnjbvr) ephemeral should be handled by the event cache. - timeline_controller.handle_ephemeral_events(events).await; - } - - RoomEventCacheUpdate::UpdateMembers { ambiguity_changes } => { - if !ambiguity_changes.is_empty() { - let member_ambiguity_changes = ambiguity_changes - .values() - .flat_map(|change| change.user_ids()) - .collect::>(); - timeline_controller - .force_update_sender_profiles(&member_ambiguity_changes) - .await; - } - } - } - } -} - -/// The task that handles the [`RoomSendQueueUpdate`]s. -async fn room_send_queue_update_task( - mut send_queue_stream: Receiver, - timeline_controller: TimelineController, -) { - trace!("spawned the local echo task!"); - - loop { - match send_queue_stream.recv().await { - Ok(update) => timeline_controller.handle_room_send_queue_update(update).await, - - Err(RecvError::Lagged(num_missed)) => { - warn!("missed {num_missed} local echoes, ignoring those missed"); - } - - Err(RecvError::Closed) => { - trace!("channel closed, exiting the local echo handler"); - break; - } - } - } -} - /// The task that handles the room keys from backups. async fn room_keys_from_backups_task(stream: S, timeline_controller: TimelineController) where diff --git a/crates/matrix-sdk-ui/src/timeline/mod.rs b/crates/matrix-sdk-ui/src/timeline/mod.rs index 3f5df11d1..33ea1ebcc 100644 --- a/crates/matrix-sdk-ui/src/timeline/mod.rs +++ b/crates/matrix-sdk-ui/src/timeline/mod.rs @@ -78,6 +78,7 @@ mod item; mod pagination; mod pinned_events_loader; mod subscriber; +mod tasks; #[cfg(test)] mod tests; mod to_device; diff --git a/crates/matrix-sdk-ui/src/timeline/tasks.rs b/crates/matrix-sdk-ui/src/timeline/tasks.rs new file mode 100644 index 000000000..8c5320424 --- /dev/null +++ b/crates/matrix-sdk-ui/src/timeline/tasks.rs @@ -0,0 +1,181 @@ +// Copyright 2025 The Matrix.org Foundation C.I.C. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//! Long-lived tasks for the timeline. + +use std::collections::BTreeSet; + +use futures_core::Stream; +use futures_util::pin_mut; +use matrix_sdk::{ + event_cache::{EventsOrigin, RoomEventCache, RoomEventCacheSubscriber, RoomEventCacheUpdate}, + send_queue::RoomSendQueueUpdate, +}; +use ruma::OwnedEventId; +use tokio::sync::broadcast::{error::RecvError, Receiver}; +use tokio_stream::StreamExt as _; +use tracing::{instrument, trace, warn}; + +use crate::timeline::{event_item::RemoteEventOrigin, TimelineController, TimelineFocus}; + +/// The task that handles the pinned event IDs updates. +#[instrument( + skip_all, + fields( + room_id = %timeline_controller.room().room_id(), + ) +)] +pub(in crate::timeline) async fn pinned_events_task( + pinned_event_ids_stream: S, + timeline_controller: TimelineController, +) where + S: Stream>, +{ + pin_mut!(pinned_event_ids_stream); + + while pinned_event_ids_stream.next().await.is_some() { + trace!("received a pinned events update"); + + match timeline_controller.reload_pinned_events().await { + Ok(Some(events)) => { + trace!("successfully reloaded pinned events"); + timeline_controller + .replace_with_initial_remote_events( + events.into_iter(), + RemoteEventOrigin::Pagination, + ) + .await; + } + + Ok(None) => { + // The list of pinned events hasn't changed since the previous + // time. + } + + Err(err) => { + warn!("Failed to reload pinned events: {err}"); + } + } + } +} + +/// The task that handles the [`RoomEventCacheUpdate`]s. +pub(in crate::timeline) async fn room_event_cache_updates_task( + room_event_cache: RoomEventCache, + timeline_controller: TimelineController, + mut room_event_cache_subscriber: RoomEventCacheSubscriber, + timeline_focus: TimelineFocus, +) { + trace!("Spawned the event subscriber task."); + + loop { + trace!("Waiting for an event."); + + let update = match room_event_cache_subscriber.recv().await { + Ok(up) => up, + Err(RecvError::Closed) => break, + Err(RecvError::Lagged(num_skipped)) => { + warn!(num_skipped, "Lagged behind event cache updates, resetting timeline"); + + // The updates might have lagged, but the room event cache might have + // events, so retrieve them and add them back again to the timeline, + // after clearing it. + let initial_events = room_event_cache.events().await; + + timeline_controller + .replace_with_initial_remote_events( + initial_events.into_iter(), + RemoteEventOrigin::Cache, + ) + .await; + + continue; + } + }; + + match update { + RoomEventCacheUpdate::MoveReadMarkerTo { event_id } => { + trace!(target = %event_id, "Handling fully read marker."); + timeline_controller.handle_fully_read_marker(event_id).await; + } + + RoomEventCacheUpdate::UpdateTimelineEvents { diffs, origin } => { + trace!("Received new timeline events diffs"); + let origin = match origin { + EventsOrigin::Sync => RemoteEventOrigin::Sync, + EventsOrigin::Pagination => RemoteEventOrigin::Pagination, + EventsOrigin::Cache => RemoteEventOrigin::Cache, + }; + + let has_diffs = !diffs.is_empty(); + + if matches!( + timeline_focus, + TimelineFocus::Live { .. } | TimelineFocus::Thread { .. } + ) { + timeline_controller.handle_remote_events_with_diffs(diffs, origin).await; + } else { + // Only handle the remote aggregation for a non-live timeline. + timeline_controller.handle_remote_aggregations(diffs, origin).await; + } + + if has_diffs && matches!(origin, RemoteEventOrigin::Cache) { + timeline_controller.retry_event_decryption(None).await; + } + } + + RoomEventCacheUpdate::AddEphemeralEvents { events } => { + trace!("Received new ephemeral events from sync."); + + // TODO: (bnjbvr) ephemeral should be handled by the event cache. + timeline_controller.handle_ephemeral_events(events).await; + } + + RoomEventCacheUpdate::UpdateMembers { ambiguity_changes } => { + if !ambiguity_changes.is_empty() { + let member_ambiguity_changes = ambiguity_changes + .values() + .flat_map(|change| change.user_ids()) + .collect::>(); + timeline_controller + .force_update_sender_profiles(&member_ambiguity_changes) + .await; + } + } + } + } +} + +/// The task that handles the [`RoomSendQueueUpdate`]s. +pub(in crate::timeline) async fn room_send_queue_update_task( + mut send_queue_stream: Receiver, + timeline_controller: TimelineController, +) { + trace!("spawned the local echo task!"); + + loop { + match send_queue_stream.recv().await { + Ok(update) => timeline_controller.handle_room_send_queue_update(update).await, + + Err(RecvError::Lagged(num_missed)) => { + warn!("missed {num_missed} local echoes, ignoring those missed"); + } + + Err(RecvError::Closed) => { + trace!("channel closed, exiting the local echo handler"); + break; + } + } + } +}