From faa2fa2ef02649afefd12b9e1c8c5e78cba11f80 Mon Sep 17 00:00:00 2001 From: Ivan Enderlin Date: Wed, 12 Mar 2025 16:04:08 +0100 Subject: [PATCH] chore(ui): Move the pinned event IDs stream task inside its own function. This patch moves the task responsibles to handle the pinned event IDs stream into its own function. The goal is to get simpler code. --- crates/matrix-sdk-ui/src/timeline/builder.rs | 39 +++++++++++--------- 1 file changed, 22 insertions(+), 17 deletions(-) diff --git a/crates/matrix-sdk-ui/src/timeline/builder.rs b/crates/matrix-sdk-ui/src/timeline/builder.rs index 18768a26a..4ff8dcd4b 100644 --- a/crates/matrix-sdk-ui/src/timeline/builder.rs +++ b/crates/matrix-sdk-ui/src/timeline/builder.rs @@ -14,6 +14,7 @@ use std::{collections::BTreeSet, sync::Arc}; +use futures_core::Stream; use futures_util::{pin_mut, StreamExt}; use matrix_sdk::{ encryption::backups::BackupState, @@ -21,7 +22,7 @@ use matrix_sdk::{ executor::spawn, Room, }; -use ruma::{events::AnySyncTimelineEvent, RoomVersionId}; +use ruma::{events::AnySyncTimelineEvent, OwnedEventId, RoomVersionId}; use tokio::sync::broadcast::error::RecvError; use tokio_stream::wrappers::errors::BroadcastStreamRecvError; use tracing::{info, info_span, trace, warn, Instrument, Span}; @@ -186,22 +187,7 @@ impl TimelineBuilder { let client = room.client(); let pinned_events_join_handle = if is_pinned_events { - let mut pinned_event_ids_stream = room.pinned_event_ids_stream(); - Some(spawn({ - let inner = controller.clone(); - async move { - while pinned_event_ids_stream.next().await.is_some() { - if let Ok(events) = inner.reload_pinned_events().await { - inner - .replace_with_initial_remote_events( - events.into_iter(), - RemoteEventOrigin::Pagination, - ) - .await; - } - } - } - })) + Some(spawn(pinned_events_task(room.pinned_event_ids_stream(), controller.clone()))) } else { None }; @@ -425,6 +411,25 @@ impl TimelineBuilder { } } +/// The task that handles the pinned event IDs updates. +async fn pinned_events_task(pinned_event_ids_stream: S, timeline_controller: TimelineController) +where + S: Stream>, +{ + pin_mut!(pinned_event_ids_stream); + + while pinned_event_ids_stream.next().await.is_some() { + if let Ok(events) = timeline_controller.reload_pinned_events().await { + timeline_controller + .replace_with_initial_remote_events( + events.into_iter(), + RemoteEventOrigin::Pagination, + ) + .await; + } + } +} + /// The task that handles the [`RoomEventCacheUpdate`]s. async fn room_event_cache_updates_task( room_event_cache: RoomEventCache,