timeline: use the new sending queue mechanism to send and receive local echoes

This commit is contained in:
Benjamin Bouvier
2024-05-24 16:27:29 +02:00
parent 3917ba67c9
commit b88381a289
13 changed files with 233 additions and 518 deletions

View File

@@ -51,7 +51,7 @@ use tokio::{
sync::Mutex,
task::{AbortHandle, JoinHandle},
};
use tracing::{error, info, warn};
use tracing::{error, warn};
use uuid::Uuid;
use self::content::{Reaction, ReactionSenderData, TimelineItemContent};
@@ -221,7 +221,9 @@ impl Timeline {
pub fn send(self: Arc<Self>, msg: Arc<RoomMessageEventContentWithoutRelation>) {
RUNTIME.spawn(async move {
self.inner.send((*msg).to_owned().with_relation(None).into()).await;
if let Err(err) = self.inner.send((*msg).to_owned().with_relation(None).into()).await {
error!("error when sending a message: {err}");
}
});
}
@@ -390,7 +392,9 @@ impl Timeline {
AnyMessageLikeEventContent::UnstablePollStart(poll_start_event_content.into());
RUNTIME.spawn(async move {
self.inner.send(event_content).await;
if let Err(err) = self.inner.send(event_content).await {
error!("unable to start poll: {err}");
}
});
Ok(())
@@ -409,7 +413,9 @@ impl Timeline {
AnyMessageLikeEventContent::UnstablePollResponse(poll_response_event_content);
RUNTIME.spawn(async move {
self.inner.send(event_content).await;
if let Err(err) = self.inner.send(event_content).await {
error!("unable to send poll response: {err}");
}
});
Ok(())
@@ -426,7 +432,9 @@ impl Timeline {
let event_content = AnyMessageLikeEventContent::UnstablePollEnd(poll_end_event_content);
RUNTIME.spawn(async move {
self.inner.send(event_content).await;
if let Err(err) = self.inner.send(event_content).await {
error!("unable to end poll: {err}");
}
});
Ok(())
@@ -511,22 +519,6 @@ impl Timeline {
Ok(())
}
pub fn retry_send(self: Arc<Self>, txn_id: String) {
RUNTIME.spawn(async move {
if let Err(e) = self.inner.retry_send(txn_id.as_str().into()).await {
error!(txn_id, "Failed to retry sending: {e}");
}
});
}
pub fn cancel_send(self: Arc<Self>, txn_id: String) {
RUNTIME.spawn(async move {
if !self.inner.cancel_send(txn_id.as_str().into()).await {
info!(txn_id, "Failed to discard local echo: Not found");
}
});
}
pub async fn get_event_timeline_item_by_event_id(
&self,
event_id: String,

View File

@@ -18,21 +18,24 @@ use futures_util::{pin_mut, StreamExt};
use matrix_sdk::{
event_cache::{EventsOrigin, RoomEventCacheUpdate},
executor::spawn,
send_queue::RoomSendingQueueUpdate,
Room,
};
use ruma::{events::AnySyncTimelineEvent, RoomVersionId};
use tokio::sync::{broadcast, mpsc};
use tokio::sync::broadcast::error::RecvError;
use tracing::{info, info_span, trace, warn, Instrument, Span};
#[cfg(feature = "e2e-encryption")]
use super::to_device::{handle_forwarded_room_key_event, handle_room_key_event};
use super::{
inner::{TimelineInner, TimelineInnerSettings},
queue::send_queued_messages,
Error, Timeline, TimelineDropHandle, TimelineFocus,
};
use crate::{
timeline::{event_item::RemoteEventOrigin, inner::TimelineEnd},
timeline::{
event_handler::TimelineEventKind, event_item::RemoteEventOrigin, inner::TimelineEnd,
EventSendState,
},
unable_to_decrypt_hook::UtdHookManager,
};
@@ -182,8 +185,8 @@ impl TimelineBuilder {
let update = match event_subscriber.recv().await {
Ok(up) => up,
Err(broadcast::error::RecvError::Closed) => break,
Err(broadcast::error::RecvError::Lagged(num_skipped)) => {
Err(RecvError::Closed) => break,
Err(RecvError::Lagged(num_skipped)) => {
warn!(
num_skipped,
"Lagged behind event cache updates, resetting timeline"
@@ -261,6 +264,85 @@ impl TimelineBuilder {
.instrument(span)
});
let local_echo_listener_handle = spawn({
let timeline = inner.clone();
let (local_echoes, mut listener) = room.sending_queue().subscribe().await;
// Handles existing local echoes first.
for echo in local_echoes {
timeline
.handle_local_event(
echo.transaction_id,
TimelineEventKind::Message {
content: echo.content,
relations: Default::default(),
},
)
.await;
}
let span =
info_span!(parent: Span::none(), "local_echo_handler", room_id = ?room.room_id());
span.follows_from(Span::current());
// React to future local echoes too.
async move {
info!("spawned the local echo handler!");
loop {
match listener.recv().await {
Ok(update) => match update {
RoomSendingQueueUpdate::NewLocalEvent(echo) => {
timeline
.handle_local_event(
echo.transaction_id,
TimelineEventKind::Message {
content: echo.content,
relations: Default::default(),
},
)
.await;
}
RoomSendingQueueUpdate::CancelledLocalEvent { transaction_id } => {
if !timeline.discard_local_echo(&transaction_id).await {
warn!("couldn't find the local echo to discard");
}
}
RoomSendingQueueUpdate::SendError { transaction_id, error } => {
timeline
.update_event_send_state(
&transaction_id,
EventSendState::SendingFailed { error },
)
.await;
}
RoomSendingQueueUpdate::SentEvent { transaction_id, event_id } => {
timeline
.update_event_send_state(
&transaction_id,
EventSendState::Sent { event_id },
)
.await;
}
},
Err(RecvError::Lagged(num_missed)) => {
warn!("missed {num_missed} local echoes, ignoring those missed");
}
Err(RecvError::Closed) => {
info!("channel closed, exiting the local echo handler");
break;
}
}
}
}
.instrument(span)
});
// Not using room.add_event_handler here because RoomKey events are
// to-device events that are not received in the context of a room.
@@ -309,19 +391,15 @@ impl TimelineBuilder {
})
};
let (msg_sender, msg_receiver) = mpsc::channel(1);
info!("Starting message-sending loop");
spawn(send_queued_messages(inner.clone(), room.clone(), msg_receiver));
let timeline = Timeline {
inner,
msg_sender,
event_cache: room_event_cache,
drop_handle: Arc::new(TimelineDropHandle {
client,
event_handler_handles: handles,
room_update_join_handle,
room_key_from_backups_join_handle,
local_echo_listener_handle,
_event_cache_drop_handle: event_cache_drop,
}),
};

View File

@@ -14,7 +14,10 @@
use std::fmt;
use matrix_sdk::event_cache::{paginator::PaginatorError, EventCacheError};
use matrix_sdk::{
event_cache::{paginator::PaginatorError, EventCacheError},
send_queue::RoomSendingQueueError,
};
use thiserror::Error;
/// Errors specific to the timeline.
@@ -127,3 +130,15 @@ enum UnsupportedEditItemInner {
#[error("tried to edit a non-poll event")]
NotPollEvent,
}
#[derive(Debug, Error)]
pub enum SendEventError {
#[error(transparent)]
UnsupportedReplyItem(#[from] UnsupportedReplyItem),
#[error(transparent)]
UnsupportedEditItem(#[from] UnsupportedEditItem),
#[error(transparent)]
SendError(#[from] RoomSendingQueueError),
}

View File

@@ -722,12 +722,6 @@ impl<P: RoomDataProvider> TimelineInner<P> {
// When there is an error, sending further messages is paused. This should be
// reflected in the timeline, so we set all other pending events to
// cancelled.
//
// TODO(bnjbvr): spooky action at a distance here^. The sending task is the one
// deciding to clear the sending queue, so we're updating based on that implicit
// knowledge here. Instead, the sending queue should notify the timeline that
// it's deciding to not send those messages, and then only the
// timeline should mark these items as cancelled.
let items = &mut txn.items;
let num_items = items.len();
for idx in 0..num_items {
@@ -804,42 +798,6 @@ impl<P: RoomDataProvider> TimelineInner<P> {
Ok(follow_up_action)
}
/// Attempts to find a local echo item for some event to be sent.
///
/// If it's found, it's moved back to the end of the timeline, then its
/// content is returned by this function.
pub(super) async fn prepare_retry(
&self,
txn_id: &TransactionId,
) -> Option<TimelineItemContent> {
let mut state = self.state.write().await;
let (idx, item) = rfind_event_item(&state.items, |it| it.transaction_id() == Some(txn_id))?;
let local_item = item.as_local()?;
match &local_item.send_state {
EventSendState::NotSentYet => {
warn!("Attempted to retry the sending of an item that is already pending");
return None;
}
EventSendState::Sent { .. } => {
warn!("Attempted to retry the sending of an item that has already succeeded");
return None;
}
EventSendState::SendingFailed { .. } | EventSendState::Cancelled => {}
}
let new_item = item.with_inner_kind(local_item.with_send_state(EventSendState::NotSentYet));
let content = item.content.clone();
let mut txn = state.items.transaction();
txn.remove(idx);
txn.push_back(new_item);
txn.commit();
Some(content)
}
pub(super) async fn discard_local_echo(&self, txn_id: &TransactionId) -> bool {
let mut state = self.state.write().await;

View File

@@ -27,6 +27,7 @@ use matrix_sdk::{
event_handler::EventHandlerHandle,
executor::JoinHandle,
room::{Receipts, Room},
send_queue::{AbortSendHandle, RoomSendingQueueError},
Client, Result,
};
use matrix_sdk_base::RoomState;
@@ -55,10 +56,9 @@ use ruma::{
TransactionId, UserId,
};
use thiserror::Error;
use tokio::sync::mpsc::Sender;
use tracing::{debug, error, instrument, trace, warn};
use tracing::{error, instrument, trace, warn};
use self::{event_handler::TimelineEventKind, futures::SendAttachment};
use self::{error::SendEventError, futures::SendAttachment};
mod builder;
mod day_dividers;
@@ -71,7 +71,6 @@ mod inner;
mod item;
mod pagination;
mod polls;
mod queue;
mod reactions;
mod read_receipts;
mod sliding_sync_ext;
@@ -104,7 +103,6 @@ pub use self::{
};
use self::{
inner::{ReactionAction, TimelineInner},
queue::LocalMessage,
reactions::ReactionToggleResult,
util::rfind_event_by_id,
};
@@ -123,10 +121,6 @@ pub struct Timeline {
/// The event cache specialized for this room's view.
event_cache: RoomEventCache,
/// A sender to the task which responsibility is to send messages to the
/// current room.
msg_sender: Sender<LocalMessage>,
/// References to long-running tasks held by the timeline.
drop_handle: Arc<TimelineDropHandle>,
}
@@ -277,20 +271,11 @@ impl Timeline {
/// [`MessageLikeUnsigned`]: ruma::events::MessageLikeUnsigned
/// [`SyncMessageLikeEvent`]: ruma::events::SyncMessageLikeEvent
#[instrument(skip(self, content), fields(room_id = ?self.room().room_id()))]
pub async fn send(&self, content: AnyMessageLikeEventContent) {
let txn_id = TransactionId::new();
self.inner
.handle_local_event(
txn_id.clone(),
TimelineEventKind::Message {
content: content.clone(),
relations: Default::default(),
},
)
.await;
if self.msg_sender.send(LocalMessage { content, txn_id }).await.is_err() {
error!("Internal error: timeline message receiver is closed");
}
pub async fn send(
&self,
content: AnyMessageLikeEventContent,
) -> Result<AbortSendHandle, RoomSendingQueueError> {
self.room().sending_queue().send(content).await
}
/// Send a reply to the given event.
@@ -318,11 +303,11 @@ impl Timeline {
content: RoomMessageEventContentWithoutRelation,
reply_item: &EventTimelineItem,
forward_thread: ForwardThread,
) -> Result<(), UnsupportedReplyItem> {
) -> Result<(), SendEventError> {
// Error returns here must be in sync with
// `EventTimelineItem::can_be_replied_to`
let Some(event_id) = reply_item.event_id() else {
return Err(UnsupportedReplyItem::MISSING_EVENT_ID);
return Err(UnsupportedReplyItem::MISSING_EVENT_ID.into());
};
// [The specification](https://spec.matrix.org/v1.10/client-server-api/#user-and-room-mentions) says:
@@ -352,7 +337,7 @@ impl Timeline {
}
_ => {
let Some(raw_event) = reply_item.latest_json() else {
return Err(UnsupportedReplyItem::MISSING_JSON);
return Err(UnsupportedReplyItem::MISSING_JSON.into());
};
content.make_reply_to_raw(
@@ -365,7 +350,8 @@ impl Timeline {
}
};
self.send(content.into()).await;
self.send(content.into()).await?;
Ok(())
}
@@ -384,14 +370,14 @@ impl Timeline {
&self,
new_content: RoomMessageEventContentWithoutRelation,
edit_item: &EventTimelineItem,
) -> Result<(), UnsupportedEditItem> {
) -> Result<(), SendEventError> {
// Early returns here must be in sync with
// `EventTimelineItem::can_be_edited`
let Some(event_id) = edit_item.event_id() else {
return Err(UnsupportedEditItem::MISSING_EVENT_ID);
return Err(UnsupportedEditItem::MISSING_EVENT_ID.into());
};
let TimelineItemContent::Message(original_content) = edit_item.content() else {
return Err(UnsupportedEditItem::NOT_ROOM_MESSAGE);
return Err(UnsupportedEditItem::NOT_ROOM_MESSAGE.into());
};
let replied_to_message =
@@ -419,7 +405,8 @@ impl Timeline {
replied_to_message.as_ref(),
);
self.send(content.into()).await;
self.send(content.into()).await?;
Ok(())
}
@@ -428,12 +415,12 @@ impl Timeline {
fallback_text: impl Into<String>,
poll: UnstablePollStartContentBlock,
edit_item: &EventTimelineItem,
) -> Result<(), UnsupportedEditItem> {
) -> Result<(), SendEventError> {
let Some(event_id) = edit_item.event_id() else {
return Err(UnsupportedEditItem::MISSING_EVENT_ID);
return Err(UnsupportedEditItem::MISSING_EVENT_ID.into());
};
let TimelineItemContent::Poll(_) = edit_item.content() else {
return Err(UnsupportedEditItem::NOT_POLL_EVENT);
return Err(UnsupportedEditItem::NOT_POLL_EVENT.into());
};
let replacement_poll = ReplacementUnstablePollStartEventContent::plain_text(
@@ -441,7 +428,7 @@ impl Timeline {
poll,
event_id.into(),
);
self.send(UnstablePollStartEventContent::from(replacement_poll).into()).await;
self.send(UnstablePollStartEventContent::from(replacement_poll).into()).await?;
Ok(())
}
@@ -556,79 +543,6 @@ impl Timeline {
SendAttachment::new(self, path.into(), mime_type, config)
}
/// Retry sending a message that previously failed to send.
///
/// # Arguments
///
/// * `txn_id` - The transaction ID of a local echo timeline item that has a
/// `send_state()` of `SendState::FailedToSend { .. }`
#[instrument(skip(self))]
pub async fn retry_send(&self, txn_id: &TransactionId) -> Result<(), Error> {
macro_rules! error_return {
($msg:literal) => {{
error!($msg);
return Ok(());
}};
}
let item = self.inner.prepare_retry(txn_id).await.ok_or(Error::RetryEventNotInTimeline)?;
let content = match item {
TimelineItemContent::Message(msg) => {
AnyMessageLikeEventContent::RoomMessage(msg.into())
}
TimelineItemContent::Sticker(sticker) => {
AnyMessageLikeEventContent::Sticker(sticker.content)
}
TimelineItemContent::Poll(poll_state) => AnyMessageLikeEventContent::UnstablePollStart(
UnstablePollStartEventContent::New(poll_state.into()),
),
TimelineItemContent::MembershipChange(_)
| TimelineItemContent::ProfileChange(_)
| TimelineItemContent::OtherState(_)
| TimelineItemContent::CallInvite => {
error_return!("Retrying state events/call invites is not supported");
}
TimelineItemContent::CallNotify => {
error_return!("Retrying call notifies is not supported");
}
TimelineItemContent::FailedToParseMessageLike { .. }
| TimelineItemContent::FailedToParseState { .. } => {
error_return!("Invalid state: attempting to retry a failed-to-parse item");
}
TimelineItemContent::RedactedMessage => {
error_return!("Invalid state: attempting to retry a redacted message");
}
TimelineItemContent::UnableToDecrypt(_) => {
error_return!("Invalid state: attempting to retry a UTD item");
}
};
debug!("Retrying failed local echo");
let txn_id = txn_id.to_owned();
if self.msg_sender.send(LocalMessage { content, txn_id }).await.is_err() {
error!("Internal error: timeline message receiver is closed");
}
Ok(())
}
/// Discard a local echo for a message that failed to send.
///
/// Returns whether the local echo with the given transaction ID was found.
///
/// # Argument
///
/// * `txn_id` - The transaction ID of a local echo timeline item that has a
/// `send_state()` of `SendState::FailedToSend { .. }`. *Note:* A send
/// state of `SendState::NotYetSent` might be supported in the future as
/// well, but there can be no guarantee for that actually stopping the
/// event from reaching the server.
#[instrument(skip(self))]
pub async fn cancel_send(&self, txn_id: &TransactionId) -> bool {
self.inner.discard_local_echo(txn_id).await
}
/// Fetch unavailable details about the event with the given ID.
///
/// This method only works for IDs of remote [`EventTimelineItem`]s,
@@ -819,6 +733,7 @@ struct TimelineDropHandle {
event_handler_handles: Vec<EventHandlerHandle>,
room_update_join_handle: JoinHandle<()>,
room_key_from_backups_join_handle: JoinHandle<()>,
local_echo_listener_handle: JoinHandle<()>,
_event_cache_drop_handle: Arc<EventCacheDropHandles>,
}
@@ -827,6 +742,7 @@ impl Drop for TimelineDropHandle {
for handle in self.event_handler_handles.drain(..) {
self.client.remove_event_handler(handle);
}
self.local_echo_listener_handle.abort();
self.room_update_join_handle.abort();
self.room_key_from_backups_join_handle.abort();
}

View File

@@ -1,275 +0,0 @@
// Copyright 2023 The Matrix.org Foundation C.I.C.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::{
collections::VecDeque,
future::{pending, Future, Pending},
mem,
pin::Pin,
sync::Arc,
task::{Context, Poll},
};
use futures_util::future::Either;
use matrix_sdk::{
executor::{spawn, JoinError, JoinHandle},
Room,
};
use matrix_sdk_base::RoomState;
use ruma::{events::AnyMessageLikeEventContent, OwnedEventId, OwnedTransactionId};
use tokio::{select, sync::mpsc::Receiver};
use tracing::{debug, error, info, instrument, trace, warn};
use super::{inner::TimelineInner, EventSendState};
/// A locally-created message that is supposed to be sent.
pub(super) struct LocalMessage {
/// The transaction ID.
///
/// Used for finding the corresponding local echo in the timeline.
pub txn_id: OwnedTransactionId,
/// The message contents.
pub content: AnyMessageLikeEventContent,
}
#[instrument(skip_all, fields(room_id = ?room.room_id()))]
pub(super) async fn send_queued_messages(
timeline: TimelineInner,
room: Room,
mut msg_receiver: Receiver<LocalMessage>,
) {
let mut queue = VecDeque::new();
let mut send_task: SendMessageTask = SendMessageTask::Idle;
let mut recv_fut: Either<_, Pending<Option<LocalMessage>>> =
Either::Left(Box::pin(msg_receiver.recv()));
loop {
select! {
send_result = &mut send_task => {
trace!("SendMessageTask finished");
send_task.reset();
handle_send_result(
send_result,
&mut send_task,
&mut queue,
&timeline,
).await;
}
recv_res = &mut recv_fut => {
recv_fut = if let Some(msg) = recv_res {
trace!("Got a LocalMessage");
send_or_queue_msg(
msg,
room.clone(),
&mut send_task,
&mut queue,
&timeline,
).await;
// appease the borrow checker
drop(recv_fut);
Either::Left(Box::pin(msg_receiver.recv()))
} else {
info!("Message receiver closed");
Either::Right(pending())
};
}
}
if send_task.is_idle() && matches!(recv_fut, Either::Right(_)) {
break;
}
}
info!("Stopped");
}
async fn send_or_queue_msg(
msg: LocalMessage,
room: Room,
send_task: &mut SendMessageTask,
queue: &mut VecDeque<LocalMessage>,
timeline: &TimelineInner,
) {
// Only send the message immediately if there aren't other messages to be sent
// first, and we're not currently sending a message.
if !queue.is_empty() || !send_task.is_idle() {
queue.push_back(msg);
return;
}
if room.state() == RoomState::Joined {
send_task.start(room, msg);
} else {
info!("Refusing to send message, room is not joined");
timeline
.update_event_send_state(
&msg.txn_id,
EventSendState::SendingFailed {
// FIXME: Probably not exactly right
error: Arc::new(matrix_sdk::Error::InconsistentState),
},
)
.await;
}
}
async fn handle_send_result(
send_result: SendMessageResult,
send_task: &mut SendMessageTask,
queue: &mut VecDeque<LocalMessage>,
timeline: &TimelineInner,
) {
match send_result {
SendMessageResult::Success { event_id, txn_id } => {
timeline.update_event_send_state(&txn_id, EventSendState::Sent { event_id }).await;
// Event was successfully sent, move on to the next queued event.
if let Some(msg) = queue.pop_front() {
send_task.start(timeline.room().clone(), msg);
}
}
SendMessageResult::SendingFailed { send_error, txn_id } => {
timeline
.update_event_send_state(
&txn_id,
EventSendState::SendingFailed { error: Arc::new(send_error) },
)
.await;
// Clear the queue and wait for the user to explicitly retry (which will
// re-append the to-be-sent events in the queue).
queue.clear();
}
SendMessageResult::TaskError { join_error, txn_id } => {
error!("Message-sending task failed: {join_error}");
timeline
.update_event_send_state(
&txn_id,
EventSendState::SendingFailed {
// FIXME: Probably not exactly right
error: Arc::new(matrix_sdk::Error::InconsistentState),
},
)
.await;
// See above comment in the `SendingFailed` arm.
queue.clear();
}
}
}
/// Result of [`SendMessageTask`].
enum SendMessageResult {
/// The message was sent successfully.
Success {
/// The event id returned by the server.
event_id: OwnedEventId,
/// The transaction ID of the message that was being sent by the task.
txn_id: OwnedTransactionId,
},
/// Sending failed.
SendingFailed {
/// The reason of the sending failure.
send_error: matrix_sdk::Error,
/// The transaction ID of the message that was being sent by the task.
txn_id: OwnedTransactionId,
},
/// The [`SendMessageTask`] failed, likely due to a panic.
TaskError {
/// The error with which the task failed.
join_error: JoinError,
/// The transaction ID of the message that was being sent by the task.
txn_id: OwnedTransactionId,
},
}
/// Future that tracks the process of an event-sending background task, if one
/// is currently active for a given message-sending queue.
enum SendMessageTask {
/// No background task is active right now.
Idle,
/// A background task has been spawned, we're waiting for its result.
Running {
/// The transaction ID of the message that is being sent.
txn_id: OwnedTransactionId,
/// Handle to the task itself.
task: JoinHandle<SendMessageResult>,
},
}
impl SendMessageTask {
#[must_use]
fn is_idle(&self) -> bool {
matches!(self, Self::Idle)
}
/// Spawns a task sending the message to the room, and updating the timeline
/// once the result has been processed.
fn start(&mut self, room: Room, msg: LocalMessage) {
debug!("Spawning message-sending task");
let txn_id = msg.txn_id.clone();
let task = spawn(async move {
match room.send(msg.content).with_transaction_id(&msg.txn_id).await {
Ok(response) => {
SendMessageResult::Success { event_id: response.event_id, txn_id: msg.txn_id }
}
Err(error) => {
SendMessageResult::SendingFailed { send_error: error, txn_id: msg.txn_id }
}
}
});
*self = Self::Running { txn_id, task };
}
fn reset(&mut self) {
*self = Self::Idle;
}
}
impl Future for SendMessageTask {
type Output = SendMessageResult;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
match &mut *self {
SendMessageTask::Idle => Poll::Pending,
SendMessageTask::Running { txn_id, task } => Pin::new(task).poll(cx).map(|result| {
let txn_id = mem::replace(txn_id, OwnedTransactionId::from(""));
if txn_id.as_str().is_empty() {
warn!("SendMessageTask polled after returning Poll::Ready!");
}
result.unwrap_or_else(|error| SendMessageResult::TaskError {
join_error: error,
txn_id,
})
}),
}
}
}

View File

@@ -2537,7 +2537,10 @@ async fn test_room_latest_event() -> Result<(), Error> {
);
// Insert a local event in the `Timeline`.
timeline.send(RoomMessageEventContent::text_plain("Hello, World!").into()).await;
timeline.send(RoomMessageEventContent::text_plain("Hello, World!").into()).await.unwrap();
// Let the sending queue send the message, and the timeline process it.
yield_now().await;
// The latest event of the `Timeline` is a local event.
assert_matches!(

View File

@@ -31,6 +31,7 @@ use ruma::{
};
use serde_json::json;
use stream_assert::assert_next_matches;
use tokio::task::yield_now;
use wiremock::{
matchers::{header, method, path_regex},
Mock, ResponseTemplate,
@@ -87,7 +88,7 @@ async fn test_echo() {
assert_eq!(text.body, "Hello, World!");
// Wait for the sending to finish and assert everything was successful
send_hdl.await.unwrap();
send_hdl.await.unwrap().unwrap();
assert_let!(
Some(VectorDiff::Set { index: 1, value: sent_confirmation }) = timeline_stream.next().await
@@ -141,12 +142,17 @@ async fn test_retry_failed() {
mock_encryption_state(&server, false).await;
client.sending_queue().enable();
let room = client.get_room(room_id).unwrap();
let timeline = Arc::new(room.timeline().await.unwrap());
let (_, mut timeline_stream) =
timeline.subscribe_filter_map(|item| item.as_event().cloned()).await;
timeline.send(RoomMessageEventContent::text_plain("Hello, World!").into()).await;
timeline.send(RoomMessageEventContent::text_plain("Hello, World!").into()).await.unwrap();
// Let the sending queue handle the event.
yield_now().await;
// First, local echo is added
assert_next_matches!(timeline_stream, VectorDiff::PushBack { value } => {
@@ -156,7 +162,6 @@ async fn test_retry_failed() {
// Sending fails, the mock server has no matching route yet
assert_let!(Some(VectorDiff::Set { index: 0, value: item }) = timeline_stream.next().await);
assert_matches!(item.send_state(), Some(EventSendState::SendingFailed { .. }));
let txn_id = item.transaction_id().unwrap().to_owned();
Mock::given(method("PUT"))
.and(path_regex(r"^/_matrix/client/r0/rooms/.*/send/.*"))
@@ -168,16 +173,14 @@ async fn test_retry_failed() {
.mount(&server)
.await;
timeline.retry_send(&txn_id).await.unwrap();
assert!(!client.sending_queue().is_enabled());
// After mocking the endpoint and retrying, it first transitions back out of
// the error state
assert_next_matches!(timeline_stream, VectorDiff::Remove { index: 0 });
assert_next_matches!(timeline_stream, VectorDiff::PushBack { value } => {
assert_matches!(value.send_state(), Some(EventSendState::NotSentYet));
});
client.sending_queue().enable();
// … before succeeding.
// Let the sending queue handle the event.
tokio::time::sleep(Duration::from_millis(300)).await;
// After mocking the endpoint and retrying, it succeeds.
assert_let!(Some(VectorDiff::Set { index: 0, value }) = timeline_stream.next().await);
assert_matches!(value.send_state(), Some(EventSendState::Sent { .. }));
}
@@ -216,7 +219,7 @@ async fn test_dedup_by_event_id_late() {
.mount(&server)
.await;
timeline.send(RoomMessageEventContent::text_plain("Hello, World!").into()).await;
timeline.send(RoomMessageEventContent::text_plain("Hello, World!").into()).await.unwrap();
assert_let!(Some(VectorDiff::PushBack { value: local_echo }) = timeline_stream.next().await);
let item = local_echo.as_event().unwrap();
@@ -275,12 +278,15 @@ async fn test_cancel_failed() {
let (_, mut timeline_stream) =
timeline.subscribe_filter_map(|item| item.as_event().cloned()).await;
timeline.send(RoomMessageEventContent::text_plain("Hello, World!").into()).await;
let handle =
timeline.send(RoomMessageEventContent::text_plain("Hello, World!").into()).await.unwrap();
// Let the sending queue handle the event.
yield_now().await;
// Local echo is added (immediately)
let txn_id = assert_next_matches!(timeline_stream, VectorDiff::PushBack { value } => {
assert_next_matches!(timeline_stream, VectorDiff::PushBack { value } => {
assert_matches!(value.send_state(), Some(EventSendState::NotSentYet));
value.transaction_id().unwrap().to_owned()
});
// Sending fails, the mock server has no matching route
@@ -288,7 +294,7 @@ async fn test_cancel_failed() {
assert_matches!(value.send_state(), Some(EventSendState::SendingFailed { .. }));
// Discard, assert the local echo is found
assert!(timeline.cancel_send(&txn_id).await);
assert!(handle.abort().await);
// Observable local echo being removed
assert_matches!(timeline_stream.next().await, Some(VectorDiff::Remove { index: 0 }));

View File

@@ -40,7 +40,7 @@ use ruma::{
};
use serde_json::json;
use stream_assert::assert_next_matches;
use tokio::time::sleep;
use tokio::{task::yield_now, time::sleep};
use wiremock::{
matchers::{method, path_regex},
Mock, ResponseTemplate,
@@ -206,6 +206,9 @@ async fn test_send_edit() {
.await
.unwrap();
// Let the sending queue handle the event.
yield_now().await;
let edit_item =
assert_next_matches!(timeline_stream, VectorDiff::Set { index: 0, value } => value);
@@ -293,6 +296,9 @@ async fn test_send_reply_edit() {
.await
.unwrap();
// Let the sending queue handle the event.
yield_now().await;
let edit_item =
assert_next_matches!(timeline_stream, VectorDiff::Set { index: 1, value } => value);
@@ -381,6 +387,9 @@ async fn test_send_edit_poll() {
UnstablePollStartContentBlock::new("Edited Test".to_owned(), edited_poll_answers);
timeline.edit_poll("poll_fallback_text", edited_poll, &poll_event).await.unwrap();
// Let the sending queue handle the event.
yield_now().await;
let edit_item =
assert_next_matches!(timeline_stream, VectorDiff::Set { index: 0, value } => value);

View File

@@ -26,7 +26,7 @@ use ruma::{
};
use serde_json::json;
use stream_assert::{assert_next_matches, assert_pending};
use tokio::time::sleep;
use tokio::{task::yield_now, time::sleep};
use wiremock::{
matchers::{body_string_contains, method, path_regex},
Mock, ResponseTemplate,
@@ -79,10 +79,13 @@ async fn test_message_order() {
.mount(&server)
.await;
timeline.send(RoomMessageEventContent::text_plain("First!").into()).await;
timeline.send(RoomMessageEventContent::text_plain("Second.").into()).await;
timeline.send(RoomMessageEventContent::text_plain("First!").into()).await.unwrap();
timeline.send(RoomMessageEventContent::text_plain("Second.").into()).await.unwrap();
// Local echoes are available as soon as `timeline.send` returns
// Let the sending queue handle the event.
yield_now().await;
// Local echoes are available after the sending queue has processed these.
assert_next_matches!(timeline_stream, VectorDiff::PushBack { value } => {
assert_eq!(value.content().as_message().unwrap().body(), "First!");
});
@@ -128,10 +131,13 @@ async fn test_retry_order() {
// Send two messages without mocking the server response.
// It will respond with a 404, resulting in a failed-to-send state.
timeline.send(RoomMessageEventContent::text_plain("First!").into()).await;
timeline.send(RoomMessageEventContent::text_plain("Second.").into()).await;
timeline.send(RoomMessageEventContent::text_plain("First!").into()).await.unwrap();
timeline.send(RoomMessageEventContent::text_plain("Second.").into()).await.unwrap();
// Local echoes are available as soon as `timeline.send` returns
// Let the sending queue handle the event.
yield_now().await;
// Local echoes are available after the sending queue has processed these.
assert_next_matches!(timeline_stream, VectorDiff::PushBack { value } => {
assert_eq!(value.content().as_message().unwrap().body(), "First!");
});
@@ -140,16 +146,14 @@ async fn test_retry_order() {
});
// Local echoes are updated with the failed send state as soon as
// the 404 response is received
// the 404 response is received.
assert_let!(Some(VectorDiff::Set { index: 0, value: first }) = timeline_stream.next().await);
assert_matches!(first.send_state().unwrap(), EventSendState::SendingFailed { .. });
let txn_id_1 = first.transaction_id().unwrap().to_owned();
// The second one is cancelled without an extra delay
// The second one is cancelled without an extra delay.
let second =
assert_next_matches!(timeline_stream, VectorDiff::Set { index: 1, value } => value);
assert_matches!(second.send_state().unwrap(), EventSendState::Cancelled);
let txn_id_2 = second.transaction_id().unwrap().to_owned();
// Response for first message takes 100ms to respond
Mock::given(method("PUT"))
@@ -177,38 +181,26 @@ async fn test_retry_order() {
.await;
// Retry the second message first
timeline.retry_send(&txn_id_2).await.unwrap();
timeline.retry_send(&txn_id_1).await.unwrap();
// Both items are immediately updated and moved to the bottom in the order
// of the function calls to indicate they are being sent
assert_next_matches!(timeline_stream, VectorDiff::Remove { index: 1 });
assert_next_matches!(timeline_stream, VectorDiff::PushBack { value } => {
assert_matches!(value.send_state().unwrap(), EventSendState::NotSentYet);
assert_eq!(value.content().as_message().unwrap().body(), "Second.");
});
assert_next_matches!(timeline_stream, VectorDiff::Remove { index: 0 });
assert_next_matches!(timeline_stream, VectorDiff::PushBack { value } => {
assert_matches!(value.send_state().unwrap(), EventSendState::NotSentYet);
assert_eq!(value.content().as_message().unwrap().body(), "First!");
});
client.sending_queue().enable();
// Wait 200ms for the first msg, 100ms for the second, 300ms for overhead
sleep(Duration::from_millis(600)).await;
// The second item (now at index 0) should be updated first, since it was
// retried first
// With the sending queue, sending is retried in the same order as the events
// were sent. So we first see the first message.
assert_next_matches!(timeline_stream, VectorDiff::Set { index: 0, value } => {
assert_eq!(value.content().as_message().unwrap().body(), "Second.");
assert_matches!(value.send_state().unwrap(), EventSendState::Sent { .. });
assert_eq!(value.event_id().unwrap(), "$5E2kLK/Sg342bgBU9ceEIEPYpbFaqJpZ");
});
// Then the first one
assert_next_matches!(timeline_stream, VectorDiff::Set { index: 1, value } => {
assert_eq!(value.content().as_message().unwrap().body(), "First!");
assert_matches!(value.send_state().unwrap(), EventSendState::Sent { .. });
assert_eq!(value.event_id().unwrap(), "$PyHxV5mYzjetBUT3qZq7V95GOzxb02EP");
});
// Then the second.
assert_next_matches!(timeline_stream, VectorDiff::Set { index: 1, value } => {
assert_eq!(value.content().as_message().unwrap().body(), "Second.");
assert_matches!(value.send_state().unwrap(), EventSendState::Sent { .. });
assert_eq!(value.event_id().unwrap(), "$5E2kLK/Sg342bgBU9ceEIEPYpbFaqJpZ");
});
assert_pending!(timeline_stream);
}
@@ -235,7 +227,7 @@ async fn test_clear_with_echoes() {
{
let (_, mut timeline_stream) = timeline.subscribe().await;
timeline.send(RoomMessageEventContent::text_plain("Send failure").into()).await;
timeline.send(RoomMessageEventContent::text_plain("Send failure").into()).await.unwrap();
// Wait for the first message to fail. Don't use time, but listen for the first
// timeline item diff to get back signalling the error.
@@ -256,7 +248,7 @@ async fn test_clear_with_echoes() {
.await;
// (this one)
timeline.send(RoomMessageEventContent::text_plain("Pending").into()).await;
timeline.send(RoomMessageEventContent::text_plain("Pending").into()).await.unwrap();
// Another message comes in.
sync_builder.add_joined_room(JoinedRoomBuilder::new(room_id).add_timeline_event(
@@ -340,8 +332,11 @@ async fn test_no_duplicate_day_divider() {
.mount(&server)
.await;
timeline.send(RoomMessageEventContent::text_plain("First!").into()).await;
timeline.send(RoomMessageEventContent::text_plain("Second.").into()).await;
timeline.send(RoomMessageEventContent::text_plain("First!").into()).await.unwrap();
timeline.send(RoomMessageEventContent::text_plain("Second.").into()).await.unwrap();
// Let the sending queue handle the event.
yield_now().await;
// Local echoes are available as soon as `timeline.send` returns.
assert_next_matches!(timeline_stream, VectorDiff::PushBack { value } => {

View File

@@ -29,6 +29,7 @@ use ruma::{
};
use serde_json::json;
use stream_assert::assert_next_matches;
use tokio::task::yield_now;
use wiremock::{
matchers::{header, method, path_regex},
Mock, Request, ResponseTemplate,
@@ -322,6 +323,9 @@ async fn test_send_reply() {
.await
.unwrap();
// Let the sending queue handle the event.
yield_now().await;
let reply_item = assert_next_matches!(timeline_stream, VectorDiff::PushBack { value } => value);
assert_matches!(reply_item.send_state(), Some(EventSendState::NotSentYet));
@@ -426,6 +430,9 @@ async fn test_send_reply_to_self() {
.await
.unwrap();
// Let the sending queue handle the event.
yield_now().await;
let reply_item = assert_next_matches!(timeline_stream, VectorDiff::PushBack { value } => value);
assert_matches!(reply_item.send_state(), Some(EventSendState::NotSentYet));
@@ -513,6 +520,9 @@ async fn test_send_reply_to_threaded() {
.await
.unwrap();
// Let the sending queue handle the event.
yield_now().await;
let reply_item = assert_next_matches!(timeline_stream, VectorDiff::PushBack { value } => value);
assert_matches!(reply_item.send_state(), Some(EventSendState::NotSentYet));

View File

@@ -455,7 +455,7 @@ impl App {
.map(|timeline| timeline.timeline.clone())
})
{
sdk_timeline
match sdk_timeline
.send(
RoomMessageEventContent::text_plain(format!(
"hey {}",
@@ -463,9 +463,17 @@ impl App {
))
.into(),
)
.await;
self.set_status_message("message sent!".to_owned());
.await
{
Ok(_) => {
self.set_status_message("message sent!".to_owned());
}
Err(err) => {
self.set_status_message(format!(
"error when sending event: {err}"
));
}
}
} else {
self.set_status_message("missing timeline for room".to_owned());
};

View File

@@ -102,7 +102,7 @@ async fn test_toggling_reaction() -> Result<()> {
// Send message.
debug!("Sending initial message…");
timeline.send(RoomMessageEventContent::text_plain("hi!").into()).await;
timeline.send(RoomMessageEventContent::text_plain("hi!").into()).await.unwrap();
let event_id = timeout(Duration::from_secs(10), event_id_task)
.await