ui: Add a queue to serialize message sending requests

This commit is contained in:
Jonas Platte
2023-06-22 13:24:41 +02:00
committed by Jonas Platte
parent cb6d3c3c47
commit 4c24007cb8
8 changed files with 299 additions and 49 deletions

View File

@@ -244,6 +244,9 @@ pub enum EventSendState {
/// The local event has been sent to the server, but unsuccessfully: The
/// sending has failed.
SendingFailed { error: String },
/// Sending has been cancelled because an earlier event in the
/// message-sending queue failed.
Cancelled,
/// The local event has been sent successfully to the server.
Sent { event_id: String },
}
@@ -255,6 +258,7 @@ impl From<&matrix_sdk_ui::timeline::EventSendState> for EventSendState {
match value {
NotSentYet => Self::NotSentYet,
SendingFailed { error } => Self::SendingFailed { error: error.to_string() },
Cancelled => Self::Cancelled,
Sent { event_id } => Self::Sent { event_id: event_id.to_string() },
}
}

View File

@@ -20,12 +20,12 @@ use matrix_sdk::{
deserialized_responses::SyncTimelineEvent, executor::spawn, room, sync::RoomUpdate,
};
use ruma::events::receipt::{ReceiptThread, ReceiptType};
use tokio::sync::broadcast;
use tokio::sync::{broadcast, mpsc};
use tracing::{error, warn};
#[cfg(feature = "e2e-encryption")]
use super::to_device::{handle_forwarded_room_key_event, handle_room_key_event};
use super::{inner::TimelineInner, Timeline, TimelineDropHandle};
use super::{inner::TimelineInner, queue::send_queued_messages, Timeline, TimelineDropHandle};
/// Builder that allows creating and configuring various parts of a
/// [`Timeline`].
@@ -196,11 +196,15 @@ impl TimelineBuilder {
forwarded_room_key_handle,
];
let (msg_sender, msg_receiver) = mpsc::channel(1);
spawn(send_queued_messages(inner.clone(), room.clone(), msg_receiver));
let timeline = Timeline {
inner,
start_token,
start_token_condvar: Default::default(),
_end_token: Mutex::new(None),
msg_sender,
drop_handle: Arc::new(TimelineDropHandle {
client,
event_handler_handles: handles,

View File

@@ -301,6 +301,9 @@ pub enum EventSendState {
/// Details about how sending the event failed.
error: Arc<Error>,
},
/// Sending has been cancelled because an earlier event in the
/// message-sending queue failed.
Cancelled,
/// The local event has been sent successfully to the server.
Sent {
/// The event ID assigned by the server.

View File

@@ -333,8 +333,26 @@ impl<P: RoomDataProvider> TimelineInner<P> {
error!(?existing_event_id, ?new_event_id, "Local echo already marked as sent");
}
let is_error = matches!(send_state, EventSendState::SendingFailed { .. });
let new_item = TimelineItem::Event(item.with_kind(local_item.with_send_state(send_state)));
state.items.set(idx, Arc::new(new_item));
if is_error {
// When there is an error, sending further messages is paused. This
// should be reflected in the timeline, so we set all other pending
// events to cancelled.
let num_items = state.items.len();
for idx in 0..num_items {
let Some(item) = state.items[idx].as_event() else { continue };
let Some(local_item) = item.as_local() else { continue };
if matches!(&local_item.send_state, EventSendState::NotSentYet) {
let new_item =
item.with_kind(local_item.with_send_state(EventSendState::Cancelled));
state.items.set(idx, Arc::new(TimelineItem::Event(new_item)));
}
}
}
}
pub(super) async fn prepare_retry(

View File

@@ -42,6 +42,7 @@ use ruma::{
EventId, MilliSecondsSinceUnixEpoch, OwnedEventId, TransactionId, UserId,
};
use thiserror::Error;
use tokio::sync::mpsc::Sender;
use tracing::{debug, error, info, instrument, warn};
mod builder;
@@ -50,6 +51,7 @@ mod event_item;
mod futures;
mod inner;
mod pagination;
mod queue;
mod read_receipts;
#[cfg(feature = "experimental-sliding-sync")]
mod sliding_sync_ext;
@@ -61,7 +63,6 @@ mod traits;
mod virtual_item;
pub(crate) use self::builder::TimelineBuilder;
use self::inner::{TimelineInner, TimelineInnerState};
#[cfg(feature = "experimental-sliding-sync")]
pub use self::sliding_sync_ext::SlidingSyncRoomExt;
pub use self::{
@@ -76,6 +77,10 @@ pub use self::{
traits::RoomExt,
virtual_item::VirtualTimelineItem,
};
use self::{
inner::{TimelineInner, TimelineInnerState},
queue::LocalMessage,
};
/// The default sanitizer mode used when sanitizing HTML.
const DEFAULT_SANITIZER_MODE: HtmlSanitizerMode = HtmlSanitizerMode::Compat;
@@ -91,6 +96,7 @@ pub struct Timeline {
start_token: Arc<Mutex<Option<String>>>,
start_token_condvar: Arc<Condvar>,
_end_token: Mutex<Option<String>>,
msg_sender: Sender<LocalMessage>,
drop_handle: Arc<TimelineDropHandle>,
}
@@ -312,25 +318,9 @@ impl Timeline {
pub async fn send(&self, content: AnyMessageLikeEventContent, txn_id: Option<&TransactionId>) {
let txn_id = txn_id.map_or_else(TransactionId::new, ToOwned::to_owned);
self.inner.handle_local_event(txn_id.clone(), content.clone()).await;
let send_state = match Room::from(self.room().clone()) {
Room::Joined(room) => {
let response = room.send(content, Some(&txn_id)).await;
match response {
Ok(response) => EventSendState::Sent { event_id: response.event_id },
Err(error) => EventSendState::SendingFailed { error: Arc::new(error) },
}
}
_ => {
EventSendState::SendingFailed {
// FIXME: Probably not exactly right
error: Arc::new(matrix_sdk::Error::InconsistentState),
}
}
};
self.inner.update_event_send_state(&txn_id, send_state).await;
if self.msg_sender.send(LocalMessage { content, txn_id }).await.is_err() {
error!("Internal error: timeline message receiver is closed");
}
}
/// Sends an attachment to the room. It does not currently support local

View File

@@ -0,0 +1,244 @@
// Copyright 2023 The Matrix.org Foundation C.I.C.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::{
collections::VecDeque,
future::{pending, Future, Pending},
mem,
pin::Pin,
sync::Arc,
task::{Context, Poll},
};
use futures_util::future::Either;
use matrix_sdk::{
executor::{spawn, JoinError, JoinHandle},
room::{self, Room},
};
use ruma::{events::AnyMessageLikeEventContent, OwnedTransactionId};
use tokio::{select, sync::mpsc::Receiver};
use tracing::{debug, error, info, instrument, trace, warn};
use super::{inner::TimelineInner, EventSendState};
/// A locally-created message that is supposed to be sent.
pub(super) struct LocalMessage {
/// The transaction ID.
///
/// Used for finding the corresponding local echo in the timeline.
pub txn_id: OwnedTransactionId,
/// The message contents.
pub content: AnyMessageLikeEventContent,
}
#[instrument(skip_all, fields(room_id = ?room.room_id()))]
pub(super) async fn send_queued_messages(
timeline_inner: Arc<TimelineInner>,
room: room::Common,
mut msg_receiver: Receiver<LocalMessage>,
) {
let mut queue = VecDeque::new();
let mut send_task: SendMessageTask = SendMessageTask::Idle;
let mut recv_fut: Either<_, Pending<Option<LocalMessage>>> =
Either::Left(Box::pin(msg_receiver.recv()));
loop {
select! {
result = &mut send_task => {
trace!("SendMessageTask finished");
send_task.reset();
handle_task_ready(
result,
&mut send_task,
&mut queue,
&timeline_inner,
).await;
}
recv_res = &mut recv_fut => {
recv_fut = if let Some(msg) = recv_res {
trace!("Got a LocalMessage");
handle_message(
msg,
room.clone(),
&mut send_task,
&mut queue,
&timeline_inner,
).await;
// appease the borrow checker
drop(recv_fut);
Either::Left(Box::pin(msg_receiver.recv()))
} else {
info!("Message receiver closed");
Either::Right(pending())
};
}
}
if send_task.is_idle() && matches!(recv_fut, Either::Right(_)) {
break;
}
}
info!("Stopped");
}
async fn handle_message(
msg: LocalMessage,
room: room::Common,
send_task: &mut SendMessageTask,
queue: &mut VecDeque<LocalMessage>,
timeline_inner: &Arc<TimelineInner>,
) {
if queue.is_empty() && send_task.is_idle() {
match Room::from(room) {
Room::Joined(room) => {
send_task.start(room, timeline_inner.clone(), msg);
}
_ => {
info!("Refusing to send message, room is not joined");
timeline_inner
.update_event_send_state(
&msg.txn_id,
EventSendState::SendingFailed {
// FIXME: Probably not exactly right
error: Arc::new(matrix_sdk::Error::InconsistentState),
},
)
.await;
}
}
} else {
queue.push_back(msg);
}
}
async fn handle_task_ready(
result: SendMessageResult,
send_task: &mut SendMessageTask,
queue: &mut VecDeque<LocalMessage>,
timeline_inner: &Arc<TimelineInner>,
) {
match result {
SendMessageResult::Success { room } => {
if let Some(msg) = queue.pop_front() {
send_task.start(room, timeline_inner.clone(), msg);
}
}
SendMessageResult::SendingFailed => {
// Timeline items are marked as failed / cancelled in this case.
// Clear the timeline and wait for the user to explicitly retry.
queue.clear();
}
SendMessageResult::TaskError { join_error, txn_id } => {
error!("Message-sending task failed: {join_error}");
queue.clear();
let send_state = EventSendState::SendingFailed {
// FIXME: Probably not exactly right
error: Arc::new(matrix_sdk::Error::InconsistentState),
};
timeline_inner.update_event_send_state(&txn_id, send_state).await;
}
}
}
/// Result of [`SendMessageTask`].
enum SendMessageResult {
/// The message was sent successfully, and the local echo was updated to
/// indicate this.
Success {
/// The joined room object, used to start sending of the next message
/// in the queue, if it isn't empty.
room: room::Joined,
},
/// Sending failed, and the local echo was updated to indicate this.
SendingFailed,
/// The [`SendMessageTask`] failed, likely due to a panic.
///
/// This means that the timeline item was likely not updated yet, which thus
/// becomes the responsibility of the code observing this result.
TaskError {
/// The error with which the task failed.
join_error: JoinError,
/// The transaction ID of the message that was being sent by the task.
txn_id: OwnedTransactionId,
},
}
/// Future that tracks the process of an event-sending background task, if one
/// is currently active for a given message-sending queue.
enum SendMessageTask {
/// No background task is active right now.
Idle,
/// A background task has been spawned, we're waiting for its result.
Running {
/// The transaction ID of the message that is being sent.
txn_id: OwnedTransactionId,
/// Handle to the task itself.
join_handle: JoinHandle<Option<room::Joined>>,
},
}
impl SendMessageTask {
#[must_use]
fn is_idle(&self) -> bool {
matches!(self, Self::Idle)
}
fn start(&mut self, room: room::Joined, timeline_inner: Arc<TimelineInner>, msg: LocalMessage) {
debug!("Spawning message-sending task");
let txn_id = msg.txn_id.clone();
let join_handle = spawn(async move {
let result = room.send(msg.content, Some(&msg.txn_id)).await;
let (room, send_state) = match result {
Ok(response) => (Some(room), EventSendState::Sent { event_id: response.event_id }),
Err(error) => (None, EventSendState::SendingFailed { error: Arc::new(error) }),
};
timeline_inner.update_event_send_state(&msg.txn_id, send_state).await;
room
});
*self = Self::Running { txn_id, join_handle };
}
fn reset(&mut self) {
*self = Self::Idle;
}
}
impl Future for SendMessageTask {
type Output = SendMessageResult;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
match &mut *self {
SendMessageTask::Idle => Poll::Pending,
SendMessageTask::Running { txn_id, join_handle } => {
Pin::new(join_handle).poll(cx).map(|result| {
let txn_id = mem::replace(txn_id, OwnedTransactionId::from(""));
if txn_id.as_str().is_empty() {
warn!("SendMessageTask polled after returning Poll::Ready!");
}
match result {
Ok(Some(room)) => SendMessageResult::Success { room },
Ok(None) => SendMessageResult::SendingFailed,
Err(join_error) => SendMessageResult::TaskError { join_error, txn_id },
}
})
}
}
}
}

View File

@@ -162,7 +162,7 @@ async fn retry_failed() {
});
// Sending fails, the mock server has no matching route yet
assert_next_matches!(timeline_stream, VectorDiff::Set { index: 0, value } => {
assert_matches!(timeline_stream.next().await, Some(VectorDiff::Set { index: 0, value }) => {
assert_matches!(value.send_state(), Some(EventSendState::SendingFailed { .. }));
});
@@ -182,7 +182,7 @@ async fn retry_failed() {
});
// … before succeeding.
assert_next_matches!(timeline_stream, VectorDiff::Set { index: 0, value } => {
assert_matches!(timeline_stream.next().await, Some(VectorDiff::Set { index: 0, value }) => {
assert_matches!(value.send_state(), Some(EventSendState::Sent { .. }));
});
}
@@ -222,11 +222,7 @@ async fn dedup_by_event_id_late() {
.mount(&server)
.await;
let send_hdl = spawn(async move {
timeline
.send(RoomMessageEventContent::text_plain("Hello, World!").into(), Some(txn_id))
.await
});
timeline.send(RoomMessageEventContent::text_plain("Hello, World!").into(), Some(txn_id)).await;
assert_matches!(timeline_stream.next().await, Some(VectorDiff::PushBack { .. })); // day divider
let local_echo = assert_matches!(timeline_stream.next().await, Some(VectorDiff::PushBack { value }) => value);
@@ -256,12 +252,9 @@ async fn dedup_by_event_id_late() {
let item = remote_echo.as_event().unwrap();
assert_eq!(item.event_id(), Some(event_id));
// Wait for the sending to finish
send_hdl.await.unwrap();
// Local echo and its day divider are removed.
assert_next_matches!(timeline_stream, VectorDiff::Remove { index: 1 });
assert_next_matches!(timeline_stream, VectorDiff::Remove { index: 0 });
assert_matches!(timeline_stream.next().await, Some(VectorDiff::Remove { index: 1 }));
assert_matches!(timeline_stream.next().await, Some(VectorDiff::Remove { index: 0 }));
}
#[async_test]
@@ -286,13 +279,13 @@ async fn cancel_failed() {
timeline.send(RoomMessageEventContent::text_plain("Hello, World!").into(), Some(txn_id)).await;
// Local echo is added
// Local echo is added (immediately)
assert_next_matches!(timeline_stream, VectorDiff::PushBack { value } => {
assert_matches!(value.send_state(), Some(EventSendState::NotSentYet));
});
// Sending fails, the mock server has no matching route
assert_next_matches!(timeline_stream, VectorDiff::Set { index: 0, value } => {
assert_matches!(timeline_stream.next().await, Some(VectorDiff::Set { index: 0, value }) => {
assert_matches!(value.send_state(), Some(EventSendState::SendingFailed { .. }));
});
@@ -300,5 +293,5 @@ async fn cancel_failed() {
assert!(timeline.cancel_send(txn_id).await);
// Observable local echo being removed
assert_next_matches!(timeline_stream, VectorDiff::Remove { index: 0 });
assert_matches!(timeline_stream.next().await, Some(VectorDiff::Remove { index: 0 }));
}

View File

@@ -74,18 +74,10 @@ async fn message_order() {
.mount(&server)
.await;
tokio::spawn({
let timeline = timeline.clone();
async move {
timeline.send(RoomMessageEventContent::text_plain("First!").into(), None).await;
}
});
tokio::spawn(async move {
timeline.send(RoomMessageEventContent::text_plain("Second.").into(), None).await;
});
sleep(Duration::from_millis(50)).await;
timeline.send(RoomMessageEventContent::text_plain("First!").into(), None).await;
timeline.send(RoomMessageEventContent::text_plain("Second.").into(), None).await;
// Local echoes are available as soon as `timeline.send` returns
assert_next_matches!(timeline_stream, VectorDiff::PushBack { value } => {
assert_eq!(value.content().as_message().unwrap().body(), "First!");
});
@@ -93,13 +85,15 @@ async fn message_order() {
assert_eq!(value.content().as_message().unwrap().body(), "Second.");
});
// 200ms for the first msg, 100ms for the second, 100ms for overhead
// Wait 200ms for the first msg, 100ms for the second, 100ms for overhead
sleep(Duration::from_millis(400)).await;
// The first item should be updated first
assert_next_matches!(timeline_stream, VectorDiff::Set { index: 0, value } => {
assert_eq!(value.content().as_message().unwrap().body(), "First!");
assert_eq!(value.event_id().unwrap(), "$PyHxV5mYzjetBUT3qZq7V95GOzxb02EP");
});
// Then the second one
assert_next_matches!(timeline_stream, VectorDiff::Set { index: 1, value } => {
assert_eq!(value.content().as_message().unwrap().body(), "Second.");
assert_eq!(value.event_id().unwrap(), "$5E2kLK/Sg342bgBU9ceEIEPYpbFaqJpZ");