From e49d62988bdd77966e34c1f366dd5a383ebcc016 Mon Sep 17 00:00:00 2001 From: Benjamin Bouvier Date: Fri, 17 May 2024 18:00:46 +0200 Subject: [PATCH] timeline queue: tiny refactorings MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit A few renamings here and there, making use of `as_variant!` a bit more, adding a few comments,… --- .../src/timeline/event_item/mod.rs | 34 ++---- .../matrix-sdk-ui/src/timeline/inner/mod.rs | 15 ++- crates/matrix-sdk-ui/src/timeline/mod.rs | 25 +++-- crates/matrix-sdk-ui/src/timeline/queue.rs | 100 +++++++++++------- 4 files changed, 98 insertions(+), 76 deletions(-) diff --git a/crates/matrix-sdk-ui/src/timeline/event_item/mod.rs b/crates/matrix-sdk-ui/src/timeline/event_item/mod.rs index d0162d46a..e2d826a0c 100644 --- a/crates/matrix-sdk-ui/src/timeline/event_item/mod.rs +++ b/crates/matrix-sdk-ui/src/timeline/event_item/mod.rs @@ -14,6 +14,7 @@ use std::sync::Arc; +use as_variant::as_variant; use indexmap::IndexMap; use matrix_sdk::{deserialized_responses::EncryptionInfo, Client, Error}; use matrix_sdk_base::{deserialized_responses::SyncTimelineEvent, latest_event::LatestEvent}; @@ -180,44 +181,31 @@ impl EventTimelineItem { /// Get the `LocalEventTimelineItem` if `self` is `Local`. pub(super) fn as_local(&self) -> Option<&LocalEventTimelineItem> { - match &self.kind { - EventTimelineItemKind::Local(local_event_item) => Some(local_event_item), - EventTimelineItemKind::Remote(_) => None, - } + as_variant!(&self.kind, EventTimelineItemKind::Local(local_event_item) => local_event_item) } - /// Get the `RemoteEventTimelineItem` if `self` is `Remote`. + /// Get a reference to a [`RemoteEventTimelineItem`] if it's a remote echo. pub(super) fn as_remote(&self) -> Option<&RemoteEventTimelineItem> { - match &self.kind { - EventTimelineItemKind::Local(_) => None, - EventTimelineItemKind::Remote(remote_event_item) => Some(remote_event_item), - } + as_variant!(&self.kind, EventTimelineItemKind::Remote(remote_event_item) => remote_event_item) } + /// Get a mutable reference to a [`RemoteEventTimelineItem`] if it's a + /// remote echo. pub(super) fn as_remote_mut(&mut self) -> Option<&mut RemoteEventTimelineItem> { - match &mut self.kind { - EventTimelineItemKind::Local(_) => None, - EventTimelineItemKind::Remote(remote_event_item) => Some(remote_event_item), - } + as_variant!(&mut self.kind, EventTimelineItemKind::Remote(remote_event_item) => remote_event_item) } - /// Get the event's send state, if it is a local echo. + /// Get the event's send state of a local echo. pub fn send_state(&self) -> Option<&EventSendState> { - match &self.kind { - EventTimelineItemKind::Local(local) => Some(&local.send_state), - EventTimelineItemKind::Remote(_) => None, - } + as_variant!(&self.kind, EventTimelineItemKind::Local(local) => &local.send_state) } - /// Get the transaction ID of this item. + /// Get the transaction ID of a local echo item. /// /// The transaction ID is currently only kept until the remote echo for a /// local event is received. pub fn transaction_id(&self) -> Option<&TransactionId> { - match &self.kind { - EventTimelineItemKind::Local(local) => Some(&local.transaction_id), - EventTimelineItemKind::Remote(_) => None, - } + as_variant!(&self.kind, EventTimelineItemKind::Local(local) => &local.transaction_id) } /// Get the event ID of this item. diff --git a/crates/matrix-sdk-ui/src/timeline/inner/mod.rs b/crates/matrix-sdk-ui/src/timeline/inner/mod.rs index e4a3494ec..56031535d 100644 --- a/crates/matrix-sdk-ui/src/timeline/inner/mod.rs +++ b/crates/matrix-sdk-ui/src/timeline/inner/mod.rs @@ -645,7 +645,8 @@ impl TimelineInner

{ /// Update the send state of a local event represented by a transaction ID. /// - /// If no local event is found, a warning is raised. + /// If the corresponding local timeline item is missing, a warning is + /// raised. #[instrument(skip_all, fields(txn_id))] pub(super) async fn update_event_send_state( &self, @@ -718,9 +719,11 @@ impl TimelineInner

{ txn.items.set(idx, new_item); if is_error { - // When there is an error, sending further messages is paused. This - // should be reflected in the timeline, so we set all other pending - // events to cancelled. + // When there is an error, sending further messages is paused. This should be + // reflected in the timeline, so we set all other pending events to + // cancelled. + // + // TODO(bnjbvr): spooky action at a distance here^ let items = &mut txn.items; let num_items = items.len(); for idx in 0..num_items { @@ -797,6 +800,10 @@ impl TimelineInner

{ Ok(follow_up_action) } + /// Attempts to find a local echo item for some event to be sent. + /// + /// If it's found, it's moved back to the end of the timeline, then its + /// content is returned by this function. pub(super) async fn prepare_retry( &self, txn_id: &TransactionId, diff --git a/crates/matrix-sdk-ui/src/timeline/mod.rs b/crates/matrix-sdk-ui/src/timeline/mod.rs index 101e81331..109010960 100644 --- a/crates/matrix-sdk-ui/src/timeline/mod.rs +++ b/crates/matrix-sdk-ui/src/timeline/mod.rs @@ -571,33 +571,32 @@ impl Timeline { } let item = self.inner.prepare_retry(txn_id).await.ok_or(Error::RetryEventNotInTimeline)?; + let content = match item { TimelineItemContent::Message(msg) => { AnyMessageLikeEventContent::RoomMessage(msg.into()) } - TimelineItemContent::RedactedMessage => { - error_return!("Invalid state: attempting to retry a redacted message"); - } TimelineItemContent::Sticker(sticker) => { AnyMessageLikeEventContent::Sticker(sticker.content) } - TimelineItemContent::UnableToDecrypt(_) => { - error_return!("Invalid state: attempting to retry a UTD item"); - } + TimelineItemContent::Poll(poll_state) => AnyMessageLikeEventContent::UnstablePollStart( + UnstablePollStartEventContent::New(poll_state.into()), + ), TimelineItemContent::MembershipChange(_) | TimelineItemContent::ProfileChange(_) - | TimelineItemContent::OtherState(_) => { - error_return!("Retrying state events is not currently supported"); + | TimelineItemContent::OtherState(_) + | TimelineItemContent::CallInvite => { + error_return!("Retrying state events/call invite is not currently supported"); } TimelineItemContent::FailedToParseMessageLike { .. } | TimelineItemContent::FailedToParseState { .. } => { error_return!("Invalid state: attempting to retry a failed-to-parse item"); } - TimelineItemContent::Poll(poll_state) => AnyMessageLikeEventContent::UnstablePollStart( - UnstablePollStartEventContent::New(poll_state.into()), - ), - TimelineItemContent::CallInvite => { - error_return!("Retrying call events is not currently supported"); + TimelineItemContent::RedactedMessage => { + error_return!("Invalid state: attempting to retry a redacted message"); + } + TimelineItemContent::UnableToDecrypt(_) => { + error_return!("Invalid state: attempting to retry a UTD item"); } }; diff --git a/crates/matrix-sdk-ui/src/timeline/queue.rs b/crates/matrix-sdk-ui/src/timeline/queue.rs index c3c70f3cd..6bd9b0545 100644 --- a/crates/matrix-sdk-ui/src/timeline/queue.rs +++ b/crates/matrix-sdk-ui/src/timeline/queue.rs @@ -39,13 +39,14 @@ pub(super) struct LocalMessage { /// /// Used for finding the corresponding local echo in the timeline. pub txn_id: OwnedTransactionId, + /// The message contents. pub content: AnyMessageLikeEventContent, } #[instrument(skip_all, fields(room_id = ?room.room_id()))] pub(super) async fn send_queued_messages( - timeline_inner: TimelineInner, + timeline: TimelineInner, room: Room, mut msg_receiver: Receiver, ) { @@ -58,23 +59,27 @@ pub(super) async fn send_queued_messages( select! { result = &mut send_task => { trace!("SendMessageTask finished"); + send_task.reset(); - handle_task_ready( + + handle_send_result( result, &mut send_task, &mut queue, - &timeline_inner, + &timeline, ).await; } + recv_res = &mut recv_fut => { recv_fut = if let Some(msg) = recv_res { trace!("Got a LocalMessage"); - handle_message( + + send_or_queue_msg( msg, room.clone(), &mut send_task, &mut queue, - &timeline_inner, + &timeline, ).await; // appease the borrow checker @@ -96,59 +101,71 @@ pub(super) async fn send_queued_messages( info!("Stopped"); } -async fn handle_message( +async fn send_or_queue_msg( msg: LocalMessage, room: Room, send_task: &mut SendMessageTask, queue: &mut VecDeque, - timeline_inner: &TimelineInner, + timeline: &TimelineInner, ) { - if queue.is_empty() && send_task.is_idle() { - if room.state() == RoomState::Joined { - send_task.start(room, timeline_inner.clone(), msg); - } else { - info!("Refusing to send message, room is not joined"); - timeline_inner - .update_event_send_state( - &msg.txn_id, - EventSendState::SendingFailed { - // FIXME: Probably not exactly right - error: Arc::new(matrix_sdk::Error::InconsistentState), - }, - ) - .await; - } - } else { + // Only send the message immediately if there aren't other messages to be sent + // first, and we're not currently sending a message. + if !queue.is_empty() || !send_task.is_idle() { queue.push_back(msg); + return; } + + if room.state() != RoomState::Joined { + info!("Refusing to send message, room is not joined"); + timeline + .update_event_send_state( + &msg.txn_id, + EventSendState::SendingFailed { + // FIXME: Probably not exactly right + error: Arc::new(matrix_sdk::Error::InconsistentState), + }, + ) + .await; + return; + } + + send_task.start(room, timeline.clone(), msg); } -async fn handle_task_ready( - result: SendMessageResult, +async fn handle_send_result( + send_result: SendMessageResult, send_task: &mut SendMessageTask, queue: &mut VecDeque, - timeline_inner: &TimelineInner, + timeline: &TimelineInner, ) { - match result { + match send_result { SendMessageResult::Success { room } => { + // Event was successfully sent, move on to the next queued event. if let Some(msg) = queue.pop_front() { - send_task.start(room, timeline_inner.clone(), msg); + send_task.start(room, timeline.clone(), msg); } } + SendMessageResult::SendingFailed => { // Timeline items are marked as failed / cancelled in this case. - // Clear the timeline and wait for the user to explicitly retry. + // + // Clear the queue and wait for the user to explicitly retry (which will + // re-append the to-be-sent events in the queue). queue.clear(); } + SendMessageResult::TaskError { join_error, txn_id } => { error!("Message-sending task failed: {join_error}"); + + // See above comment in the `SendingFailed` arm. queue.clear(); let send_state = EventSendState::SendingFailed { // FIXME: Probably not exactly right error: Arc::new(matrix_sdk::Error::InconsistentState), }; - timeline_inner.update_event_send_state(&txn_id, send_state).await; + + timeline.update_event_send_state(&txn_id, send_state).await; } } } @@ -162,8 +179,10 @@ enum SendMessageResult { /// in the queue, if it isn't empty. room: Room, }, + /// Sending failed, and the local echo was updated to indicate this. SendingFailed, + /// The [`SendMessageTask`] failed, likely due to a panic. /// /// This means that the timeline item was likely not updated yet, which thus @@ -181,12 +200,13 @@ enum SendMessageResult { enum SendMessageTask { /// No background task is active right now. Idle, + /// A background task has been spawned, we're waiting for its result. Running { /// The transaction ID of the message that is being sent. txn_id: OwnedTransactionId, /// Handle to the task itself. - join_handle: JoinHandle>, + task: JoinHandle>, }, } @@ -196,20 +216,27 @@ impl SendMessageTask { matches!(self, Self::Idle) } - fn start(&mut self, room: Room, timeline_inner: TimelineInner, msg: LocalMessage) { + /// Spawns a task sending the message to the room, and updating the timeline + /// once the result has been processed. + fn start(&mut self, room: Room, timeline: TimelineInner, msg: LocalMessage) { debug!("Spawning message-sending task"); + let txn_id = msg.txn_id.clone(); - let join_handle = spawn(async move { + + let task = spawn(async move { let result = room.send(msg.content).with_transaction_id(&msg.txn_id).await; + let (room, send_state) = match result { Ok(response) => (Some(room), EventSendState::Sent { event_id: response.event_id }), Err(error) => (None, EventSendState::SendingFailed { error: Arc::new(error) }), }; - timeline_inner.update_event_send_state(&msg.txn_id, send_state).await; + timeline.update_event_send_state(&msg.txn_id, send_state).await; + room }); - *self = Self::Running { txn_id, join_handle }; + + *self = Self::Running { txn_id, task }; } fn reset(&mut self) { @@ -223,7 +250,8 @@ impl Future for SendMessageTask { fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { match &mut *self { SendMessageTask::Idle => Poll::Pending, - SendMessageTask::Running { txn_id, join_handle } => { + + SendMessageTask::Running { txn_id, task: join_handle } => { Pin::new(join_handle).poll(cx).map(|result| { let txn_id = mem::replace(txn_id, OwnedTransactionId::from("")); if txn_id.as_str().is_empty() {