From 4c24007cb82313e726fbd92d09ff3fd4d9cda826 Mon Sep 17 00:00:00 2001 From: Jonas Platte Date: Thu, 22 Jun 2023 13:24:41 +0200 Subject: [PATCH] ui: Add a queue to serialize message sending requests --- bindings/matrix-sdk-ffi/src/timeline.rs | 4 + crates/matrix-sdk-ui/src/timeline/builder.rs | 8 +- .../src/timeline/event_item/mod.rs | 3 + crates/matrix-sdk-ui/src/timeline/inner.rs | 18 ++ crates/matrix-sdk-ui/src/timeline/mod.rs | 30 +-- crates/matrix-sdk-ui/src/timeline/queue.rs | 244 ++++++++++++++++++ .../tests/integration/timeline/echo.rs | 23 +- .../tests/integration/timeline/queue.rs | 18 +- 8 files changed, 299 insertions(+), 49 deletions(-) create mode 100644 crates/matrix-sdk-ui/src/timeline/queue.rs diff --git a/bindings/matrix-sdk-ffi/src/timeline.rs b/bindings/matrix-sdk-ffi/src/timeline.rs index 356695db2..e5a81fe69 100644 --- a/bindings/matrix-sdk-ffi/src/timeline.rs +++ b/bindings/matrix-sdk-ffi/src/timeline.rs @@ -244,6 +244,9 @@ pub enum EventSendState { /// The local event has been sent to the server, but unsuccessfully: The /// sending has failed. SendingFailed { error: String }, + /// Sending has been cancelled because an earlier event in the + /// message-sending queue failed. + Cancelled, /// The local event has been sent successfully to the server. Sent { event_id: String }, } @@ -255,6 +258,7 @@ impl From<&matrix_sdk_ui::timeline::EventSendState> for EventSendState { match value { NotSentYet => Self::NotSentYet, SendingFailed { error } => Self::SendingFailed { error: error.to_string() }, + Cancelled => Self::Cancelled, Sent { event_id } => Self::Sent { event_id: event_id.to_string() }, } } diff --git a/crates/matrix-sdk-ui/src/timeline/builder.rs b/crates/matrix-sdk-ui/src/timeline/builder.rs index 5b7a624ef..eda70a368 100644 --- a/crates/matrix-sdk-ui/src/timeline/builder.rs +++ b/crates/matrix-sdk-ui/src/timeline/builder.rs @@ -20,12 +20,12 @@ use matrix_sdk::{ deserialized_responses::SyncTimelineEvent, executor::spawn, room, sync::RoomUpdate, }; use ruma::events::receipt::{ReceiptThread, ReceiptType}; -use tokio::sync::broadcast; +use tokio::sync::{broadcast, mpsc}; use tracing::{error, warn}; #[cfg(feature = "e2e-encryption")] use super::to_device::{handle_forwarded_room_key_event, handle_room_key_event}; -use super::{inner::TimelineInner, Timeline, TimelineDropHandle}; +use super::{inner::TimelineInner, queue::send_queued_messages, Timeline, TimelineDropHandle}; /// Builder that allows creating and configuring various parts of a /// [`Timeline`]. @@ -196,11 +196,15 @@ impl TimelineBuilder { forwarded_room_key_handle, ]; + let (msg_sender, msg_receiver) = mpsc::channel(1); + spawn(send_queued_messages(inner.clone(), room.clone(), msg_receiver)); + let timeline = Timeline { inner, start_token, start_token_condvar: Default::default(), _end_token: Mutex::new(None), + msg_sender, drop_handle: Arc::new(TimelineDropHandle { client, event_handler_handles: handles, diff --git a/crates/matrix-sdk-ui/src/timeline/event_item/mod.rs b/crates/matrix-sdk-ui/src/timeline/event_item/mod.rs index 910ebb740..f670e7102 100644 --- a/crates/matrix-sdk-ui/src/timeline/event_item/mod.rs +++ b/crates/matrix-sdk-ui/src/timeline/event_item/mod.rs @@ -301,6 +301,9 @@ pub enum EventSendState { /// Details about how sending the event failed. error: Arc, }, + /// Sending has been cancelled because an earlier event in the + /// message-sending queue failed. + Cancelled, /// The local event has been sent successfully to the server. Sent { /// The event ID assigned by the server. diff --git a/crates/matrix-sdk-ui/src/timeline/inner.rs b/crates/matrix-sdk-ui/src/timeline/inner.rs index 324134659..3bdec2d9f 100644 --- a/crates/matrix-sdk-ui/src/timeline/inner.rs +++ b/crates/matrix-sdk-ui/src/timeline/inner.rs @@ -333,8 +333,26 @@ impl TimelineInner

{ error!(?existing_event_id, ?new_event_id, "Local echo already marked as sent"); } + let is_error = matches!(send_state, EventSendState::SendingFailed { .. }); + let new_item = TimelineItem::Event(item.with_kind(local_item.with_send_state(send_state))); state.items.set(idx, Arc::new(new_item)); + + if is_error { + // When there is an error, sending further messages is paused. This + // should be reflected in the timeline, so we set all other pending + // events to cancelled. + let num_items = state.items.len(); + for idx in 0..num_items { + let Some(item) = state.items[idx].as_event() else { continue }; + let Some(local_item) = item.as_local() else { continue }; + if matches!(&local_item.send_state, EventSendState::NotSentYet) { + let new_item = + item.with_kind(local_item.with_send_state(EventSendState::Cancelled)); + state.items.set(idx, Arc::new(TimelineItem::Event(new_item))); + } + } + } } pub(super) async fn prepare_retry( diff --git a/crates/matrix-sdk-ui/src/timeline/mod.rs b/crates/matrix-sdk-ui/src/timeline/mod.rs index ea570792d..9900b5d91 100644 --- a/crates/matrix-sdk-ui/src/timeline/mod.rs +++ b/crates/matrix-sdk-ui/src/timeline/mod.rs @@ -42,6 +42,7 @@ use ruma::{ EventId, MilliSecondsSinceUnixEpoch, OwnedEventId, TransactionId, UserId, }; use thiserror::Error; +use tokio::sync::mpsc::Sender; use tracing::{debug, error, info, instrument, warn}; mod builder; @@ -50,6 +51,7 @@ mod event_item; mod futures; mod inner; mod pagination; +mod queue; mod read_receipts; #[cfg(feature = "experimental-sliding-sync")] mod sliding_sync_ext; @@ -61,7 +63,6 @@ mod traits; mod virtual_item; pub(crate) use self::builder::TimelineBuilder; -use self::inner::{TimelineInner, TimelineInnerState}; #[cfg(feature = "experimental-sliding-sync")] pub use self::sliding_sync_ext::SlidingSyncRoomExt; pub use self::{ @@ -76,6 +77,10 @@ pub use self::{ traits::RoomExt, virtual_item::VirtualTimelineItem, }; +use self::{ + inner::{TimelineInner, TimelineInnerState}, + queue::LocalMessage, +}; /// The default sanitizer mode used when sanitizing HTML. const DEFAULT_SANITIZER_MODE: HtmlSanitizerMode = HtmlSanitizerMode::Compat; @@ -91,6 +96,7 @@ pub struct Timeline { start_token: Arc>>, start_token_condvar: Arc, _end_token: Mutex>, + msg_sender: Sender, drop_handle: Arc, } @@ -312,25 +318,9 @@ impl Timeline { pub async fn send(&self, content: AnyMessageLikeEventContent, txn_id: Option<&TransactionId>) { let txn_id = txn_id.map_or_else(TransactionId::new, ToOwned::to_owned); self.inner.handle_local_event(txn_id.clone(), content.clone()).await; - - let send_state = match Room::from(self.room().clone()) { - Room::Joined(room) => { - let response = room.send(content, Some(&txn_id)).await; - - match response { - Ok(response) => EventSendState::Sent { event_id: response.event_id }, - Err(error) => EventSendState::SendingFailed { error: Arc::new(error) }, - } - } - _ => { - EventSendState::SendingFailed { - // FIXME: Probably not exactly right - error: Arc::new(matrix_sdk::Error::InconsistentState), - } - } - }; - - self.inner.update_event_send_state(&txn_id, send_state).await; + if self.msg_sender.send(LocalMessage { content, txn_id }).await.is_err() { + error!("Internal error: timeline message receiver is closed"); + } } /// Sends an attachment to the room. It does not currently support local diff --git a/crates/matrix-sdk-ui/src/timeline/queue.rs b/crates/matrix-sdk-ui/src/timeline/queue.rs new file mode 100644 index 000000000..f210692ff --- /dev/null +++ b/crates/matrix-sdk-ui/src/timeline/queue.rs @@ -0,0 +1,244 @@ +// Copyright 2023 The Matrix.org Foundation C.I.C. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::{ + collections::VecDeque, + future::{pending, Future, Pending}, + mem, + pin::Pin, + sync::Arc, + task::{Context, Poll}, +}; + +use futures_util::future::Either; +use matrix_sdk::{ + executor::{spawn, JoinError, JoinHandle}, + room::{self, Room}, +}; +use ruma::{events::AnyMessageLikeEventContent, OwnedTransactionId}; +use tokio::{select, sync::mpsc::Receiver}; +use tracing::{debug, error, info, instrument, trace, warn}; + +use super::{inner::TimelineInner, EventSendState}; + +/// A locally-created message that is supposed to be sent. +pub(super) struct LocalMessage { + /// The transaction ID. + /// + /// Used for finding the corresponding local echo in the timeline. + pub txn_id: OwnedTransactionId, + /// The message contents. + pub content: AnyMessageLikeEventContent, +} + +#[instrument(skip_all, fields(room_id = ?room.room_id()))] +pub(super) async fn send_queued_messages( + timeline_inner: Arc, + room: room::Common, + mut msg_receiver: Receiver, +) { + let mut queue = VecDeque::new(); + let mut send_task: SendMessageTask = SendMessageTask::Idle; + let mut recv_fut: Either<_, Pending>> = + Either::Left(Box::pin(msg_receiver.recv())); + + loop { + select! { + result = &mut send_task => { + trace!("SendMessageTask finished"); + send_task.reset(); + handle_task_ready( + result, + &mut send_task, + &mut queue, + &timeline_inner, + ).await; + } + recv_res = &mut recv_fut => { + recv_fut = if let Some(msg) = recv_res { + trace!("Got a LocalMessage"); + handle_message( + msg, + room.clone(), + &mut send_task, + &mut queue, + &timeline_inner, + ).await; + + // appease the borrow checker + drop(recv_fut); + + Either::Left(Box::pin(msg_receiver.recv())) + } else { + info!("Message receiver closed"); + Either::Right(pending()) + }; + } + } + + if send_task.is_idle() && matches!(recv_fut, Either::Right(_)) { + break; + } + } + + info!("Stopped"); +} + +async fn handle_message( + msg: LocalMessage, + room: room::Common, + send_task: &mut SendMessageTask, + queue: &mut VecDeque, + timeline_inner: &Arc, +) { + if queue.is_empty() && send_task.is_idle() { + match Room::from(room) { + Room::Joined(room) => { + send_task.start(room, timeline_inner.clone(), msg); + } + _ => { + info!("Refusing to send message, room is not joined"); + timeline_inner + .update_event_send_state( + &msg.txn_id, + EventSendState::SendingFailed { + // FIXME: Probably not exactly right + error: Arc::new(matrix_sdk::Error::InconsistentState), + }, + ) + .await; + } + } + } else { + queue.push_back(msg); + } +} + +async fn handle_task_ready( + result: SendMessageResult, + send_task: &mut SendMessageTask, + queue: &mut VecDeque, + timeline_inner: &Arc, +) { + match result { + SendMessageResult::Success { room } => { + if let Some(msg) = queue.pop_front() { + send_task.start(room, timeline_inner.clone(), msg); + } + } + SendMessageResult::SendingFailed => { + // Timeline items are marked as failed / cancelled in this case. + // Clear the timeline and wait for the user to explicitly retry. + queue.clear(); + } + SendMessageResult::TaskError { join_error, txn_id } => { + error!("Message-sending task failed: {join_error}"); + queue.clear(); + + let send_state = EventSendState::SendingFailed { + // FIXME: Probably not exactly right + error: Arc::new(matrix_sdk::Error::InconsistentState), + }; + timeline_inner.update_event_send_state(&txn_id, send_state).await; + } + } +} + +/// Result of [`SendMessageTask`]. +enum SendMessageResult { + /// The message was sent successfully, and the local echo was updated to + /// indicate this. + Success { + /// The joined room object, used to start sending of the next message + /// in the queue, if it isn't empty. + room: room::Joined, + }, + /// Sending failed, and the local echo was updated to indicate this. + SendingFailed, + /// The [`SendMessageTask`] failed, likely due to a panic. + /// + /// This means that the timeline item was likely not updated yet, which thus + /// becomes the responsibility of the code observing this result. + TaskError { + /// The error with which the task failed. + join_error: JoinError, + /// The transaction ID of the message that was being sent by the task. + txn_id: OwnedTransactionId, + }, +} + +/// Future that tracks the process of an event-sending background task, if one +/// is currently active for a given message-sending queue. +enum SendMessageTask { + /// No background task is active right now. + Idle, + /// A background task has been spawned, we're waiting for its result. + Running { + /// The transaction ID of the message that is being sent. + txn_id: OwnedTransactionId, + /// Handle to the task itself. + join_handle: JoinHandle>, + }, +} + +impl SendMessageTask { + #[must_use] + fn is_idle(&self) -> bool { + matches!(self, Self::Idle) + } + + fn start(&mut self, room: room::Joined, timeline_inner: Arc, msg: LocalMessage) { + debug!("Spawning message-sending task"); + let txn_id = msg.txn_id.clone(); + let join_handle = spawn(async move { + let result = room.send(msg.content, Some(&msg.txn_id)).await; + let (room, send_state) = match result { + Ok(response) => (Some(room), EventSendState::Sent { event_id: response.event_id }), + Err(error) => (None, EventSendState::SendingFailed { error: Arc::new(error) }), + }; + + timeline_inner.update_event_send_state(&msg.txn_id, send_state).await; + room + }); + *self = Self::Running { txn_id, join_handle }; + } + + fn reset(&mut self) { + *self = Self::Idle; + } +} + +impl Future for SendMessageTask { + type Output = SendMessageResult; + + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + match &mut *self { + SendMessageTask::Idle => Poll::Pending, + SendMessageTask::Running { txn_id, join_handle } => { + Pin::new(join_handle).poll(cx).map(|result| { + let txn_id = mem::replace(txn_id, OwnedTransactionId::from("")); + if txn_id.as_str().is_empty() { + warn!("SendMessageTask polled after returning Poll::Ready!"); + } + + match result { + Ok(Some(room)) => SendMessageResult::Success { room }, + Ok(None) => SendMessageResult::SendingFailed, + Err(join_error) => SendMessageResult::TaskError { join_error, txn_id }, + } + }) + } + } + } +} diff --git a/crates/matrix-sdk-ui/tests/integration/timeline/echo.rs b/crates/matrix-sdk-ui/tests/integration/timeline/echo.rs index a6fbd4f69..046a1fd5c 100644 --- a/crates/matrix-sdk-ui/tests/integration/timeline/echo.rs +++ b/crates/matrix-sdk-ui/tests/integration/timeline/echo.rs @@ -162,7 +162,7 @@ async fn retry_failed() { }); // Sending fails, the mock server has no matching route yet - assert_next_matches!(timeline_stream, VectorDiff::Set { index: 0, value } => { + assert_matches!(timeline_stream.next().await, Some(VectorDiff::Set { index: 0, value }) => { assert_matches!(value.send_state(), Some(EventSendState::SendingFailed { .. })); }); @@ -182,7 +182,7 @@ async fn retry_failed() { }); // … before succeeding. - assert_next_matches!(timeline_stream, VectorDiff::Set { index: 0, value } => { + assert_matches!(timeline_stream.next().await, Some(VectorDiff::Set { index: 0, value }) => { assert_matches!(value.send_state(), Some(EventSendState::Sent { .. })); }); } @@ -222,11 +222,7 @@ async fn dedup_by_event_id_late() { .mount(&server) .await; - let send_hdl = spawn(async move { - timeline - .send(RoomMessageEventContent::text_plain("Hello, World!").into(), Some(txn_id)) - .await - }); + timeline.send(RoomMessageEventContent::text_plain("Hello, World!").into(), Some(txn_id)).await; assert_matches!(timeline_stream.next().await, Some(VectorDiff::PushBack { .. })); // day divider let local_echo = assert_matches!(timeline_stream.next().await, Some(VectorDiff::PushBack { value }) => value); @@ -256,12 +252,9 @@ async fn dedup_by_event_id_late() { let item = remote_echo.as_event().unwrap(); assert_eq!(item.event_id(), Some(event_id)); - // Wait for the sending to finish - send_hdl.await.unwrap(); - // Local echo and its day divider are removed. - assert_next_matches!(timeline_stream, VectorDiff::Remove { index: 1 }); - assert_next_matches!(timeline_stream, VectorDiff::Remove { index: 0 }); + assert_matches!(timeline_stream.next().await, Some(VectorDiff::Remove { index: 1 })); + assert_matches!(timeline_stream.next().await, Some(VectorDiff::Remove { index: 0 })); } #[async_test] @@ -286,13 +279,13 @@ async fn cancel_failed() { timeline.send(RoomMessageEventContent::text_plain("Hello, World!").into(), Some(txn_id)).await; - // Local echo is added + // Local echo is added (immediately) assert_next_matches!(timeline_stream, VectorDiff::PushBack { value } => { assert_matches!(value.send_state(), Some(EventSendState::NotSentYet)); }); // Sending fails, the mock server has no matching route - assert_next_matches!(timeline_stream, VectorDiff::Set { index: 0, value } => { + assert_matches!(timeline_stream.next().await, Some(VectorDiff::Set { index: 0, value }) => { assert_matches!(value.send_state(), Some(EventSendState::SendingFailed { .. })); }); @@ -300,5 +293,5 @@ async fn cancel_failed() { assert!(timeline.cancel_send(txn_id).await); // Observable local echo being removed - assert_next_matches!(timeline_stream, VectorDiff::Remove { index: 0 }); + assert_matches!(timeline_stream.next().await, Some(VectorDiff::Remove { index: 0 })); } diff --git a/crates/matrix-sdk-ui/tests/integration/timeline/queue.rs b/crates/matrix-sdk-ui/tests/integration/timeline/queue.rs index 8118329b3..ad6d27da4 100644 --- a/crates/matrix-sdk-ui/tests/integration/timeline/queue.rs +++ b/crates/matrix-sdk-ui/tests/integration/timeline/queue.rs @@ -74,18 +74,10 @@ async fn message_order() { .mount(&server) .await; - tokio::spawn({ - let timeline = timeline.clone(); - async move { - timeline.send(RoomMessageEventContent::text_plain("First!").into(), None).await; - } - }); - tokio::spawn(async move { - timeline.send(RoomMessageEventContent::text_plain("Second.").into(), None).await; - }); - - sleep(Duration::from_millis(50)).await; + timeline.send(RoomMessageEventContent::text_plain("First!").into(), None).await; + timeline.send(RoomMessageEventContent::text_plain("Second.").into(), None).await; + // Local echoes are available as soon as `timeline.send` returns assert_next_matches!(timeline_stream, VectorDiff::PushBack { value } => { assert_eq!(value.content().as_message().unwrap().body(), "First!"); }); @@ -93,13 +85,15 @@ async fn message_order() { assert_eq!(value.content().as_message().unwrap().body(), "Second."); }); - // 200ms for the first msg, 100ms for the second, 100ms for overhead + // Wait 200ms for the first msg, 100ms for the second, 100ms for overhead sleep(Duration::from_millis(400)).await; + // The first item should be updated first assert_next_matches!(timeline_stream, VectorDiff::Set { index: 0, value } => { assert_eq!(value.content().as_message().unwrap().body(), "First!"); assert_eq!(value.event_id().unwrap(), "$PyHxV5mYzjetBUT3qZq7V95GOzxb02EP"); }); + // Then the second one assert_next_matches!(timeline_stream, VectorDiff::Set { index: 1, value } => { assert_eq!(value.content().as_message().unwrap().body(), "Second."); assert_eq!(value.event_id().unwrap(), "$5E2kLK/Sg342bgBU9ceEIEPYpbFaqJpZ");