diff --git a/bindings/matrix-sdk-ffi/src/timeline/mod.rs b/bindings/matrix-sdk-ffi/src/timeline/mod.rs index ec4041337..c1b045470 100644 --- a/bindings/matrix-sdk-ffi/src/timeline/mod.rs +++ b/bindings/matrix-sdk-ffi/src/timeline/mod.rs @@ -51,7 +51,7 @@ use tokio::{ sync::Mutex, task::{AbortHandle, JoinHandle}, }; -use tracing::{error, info, warn}; +use tracing::{error, warn}; use uuid::Uuid; use self::content::{Reaction, ReactionSenderData, TimelineItemContent}; @@ -221,7 +221,9 @@ impl Timeline { pub fn send(self: Arc, msg: Arc) { RUNTIME.spawn(async move { - self.inner.send((*msg).to_owned().with_relation(None).into()).await; + if let Err(err) = self.inner.send((*msg).to_owned().with_relation(None).into()).await { + error!("error when sending a message: {err}"); + } }); } @@ -390,7 +392,9 @@ impl Timeline { AnyMessageLikeEventContent::UnstablePollStart(poll_start_event_content.into()); RUNTIME.spawn(async move { - self.inner.send(event_content).await; + if let Err(err) = self.inner.send(event_content).await { + error!("unable to start poll: {err}"); + } }); Ok(()) @@ -409,7 +413,9 @@ impl Timeline { AnyMessageLikeEventContent::UnstablePollResponse(poll_response_event_content); RUNTIME.spawn(async move { - self.inner.send(event_content).await; + if let Err(err) = self.inner.send(event_content).await { + error!("unable to send poll response: {err}"); + } }); Ok(()) @@ -426,7 +432,9 @@ impl Timeline { let event_content = AnyMessageLikeEventContent::UnstablePollEnd(poll_end_event_content); RUNTIME.spawn(async move { - self.inner.send(event_content).await; + if let Err(err) = self.inner.send(event_content).await { + error!("unable to end poll: {err}"); + } }); Ok(()) @@ -511,22 +519,6 @@ impl Timeline { Ok(()) } - pub fn retry_send(self: Arc, txn_id: String) { - RUNTIME.spawn(async move { - if let Err(e) = self.inner.retry_send(txn_id.as_str().into()).await { - error!(txn_id, "Failed to retry sending: {e}"); - } - }); - } - - pub fn cancel_send(self: Arc, txn_id: String) { - RUNTIME.spawn(async move { - if !self.inner.cancel_send(txn_id.as_str().into()).await { - info!(txn_id, "Failed to discard local echo: Not found"); - } - }); - } - pub async fn get_event_timeline_item_by_event_id( &self, event_id: String, diff --git a/crates/matrix-sdk-ui/src/timeline/builder.rs b/crates/matrix-sdk-ui/src/timeline/builder.rs index 96d9fec8a..7680eb418 100644 --- a/crates/matrix-sdk-ui/src/timeline/builder.rs +++ b/crates/matrix-sdk-ui/src/timeline/builder.rs @@ -18,21 +18,24 @@ use futures_util::{pin_mut, StreamExt}; use matrix_sdk::{ event_cache::{EventsOrigin, RoomEventCacheUpdate}, executor::spawn, + send_queue::RoomSendingQueueUpdate, Room, }; use ruma::{events::AnySyncTimelineEvent, RoomVersionId}; -use tokio::sync::{broadcast, mpsc}; +use tokio::sync::broadcast::error::RecvError; use tracing::{info, info_span, trace, warn, Instrument, Span}; #[cfg(feature = "e2e-encryption")] use super::to_device::{handle_forwarded_room_key_event, handle_room_key_event}; use super::{ inner::{TimelineInner, TimelineInnerSettings}, - queue::send_queued_messages, Error, Timeline, TimelineDropHandle, TimelineFocus, }; use crate::{ - timeline::{event_item::RemoteEventOrigin, inner::TimelineEnd}, + timeline::{ + event_handler::TimelineEventKind, event_item::RemoteEventOrigin, inner::TimelineEnd, + EventSendState, + }, unable_to_decrypt_hook::UtdHookManager, }; @@ -182,8 +185,8 @@ impl TimelineBuilder { let update = match event_subscriber.recv().await { Ok(up) => up, - Err(broadcast::error::RecvError::Closed) => break, - Err(broadcast::error::RecvError::Lagged(num_skipped)) => { + Err(RecvError::Closed) => break, + Err(RecvError::Lagged(num_skipped)) => { warn!( num_skipped, "Lagged behind event cache updates, resetting timeline" @@ -261,6 +264,85 @@ impl TimelineBuilder { .instrument(span) }); + let local_echo_listener_handle = spawn({ + let timeline = inner.clone(); + let (local_echoes, mut listener) = room.sending_queue().subscribe().await; + + // Handles existing local echoes first. + for echo in local_echoes { + timeline + .handle_local_event( + echo.transaction_id, + TimelineEventKind::Message { + content: echo.content, + relations: Default::default(), + }, + ) + .await; + } + + let span = + info_span!(parent: Span::none(), "local_echo_handler", room_id = ?room.room_id()); + span.follows_from(Span::current()); + + // React to future local echoes too. + async move { + info!("spawned the local echo handler!"); + + loop { + match listener.recv().await { + Ok(update) => match update { + RoomSendingQueueUpdate::NewLocalEvent(echo) => { + timeline + .handle_local_event( + echo.transaction_id, + TimelineEventKind::Message { + content: echo.content, + relations: Default::default(), + }, + ) + .await; + } + + RoomSendingQueueUpdate::CancelledLocalEvent { transaction_id } => { + if !timeline.discard_local_echo(&transaction_id).await { + warn!("couldn't find the local echo to discard"); + } + } + + RoomSendingQueueUpdate::SendError { transaction_id, error } => { + timeline + .update_event_send_state( + &transaction_id, + EventSendState::SendingFailed { error }, + ) + .await; + } + + RoomSendingQueueUpdate::SentEvent { transaction_id, event_id } => { + timeline + .update_event_send_state( + &transaction_id, + EventSendState::Sent { event_id }, + ) + .await; + } + }, + + Err(RecvError::Lagged(num_missed)) => { + warn!("missed {num_missed} local echoes, ignoring those missed"); + } + + Err(RecvError::Closed) => { + info!("channel closed, exiting the local echo handler"); + break; + } + } + } + } + .instrument(span) + }); + // Not using room.add_event_handler here because RoomKey events are // to-device events that are not received in the context of a room. @@ -309,19 +391,15 @@ impl TimelineBuilder { }) }; - let (msg_sender, msg_receiver) = mpsc::channel(1); - info!("Starting message-sending loop"); - spawn(send_queued_messages(inner.clone(), room.clone(), msg_receiver)); - let timeline = Timeline { inner, - msg_sender, event_cache: room_event_cache, drop_handle: Arc::new(TimelineDropHandle { client, event_handler_handles: handles, room_update_join_handle, room_key_from_backups_join_handle, + local_echo_listener_handle, _event_cache_drop_handle: event_cache_drop, }), }; diff --git a/crates/matrix-sdk-ui/src/timeline/error.rs b/crates/matrix-sdk-ui/src/timeline/error.rs index 8ed3e0b4a..28a28461f 100644 --- a/crates/matrix-sdk-ui/src/timeline/error.rs +++ b/crates/matrix-sdk-ui/src/timeline/error.rs @@ -14,7 +14,10 @@ use std::fmt; -use matrix_sdk::event_cache::{paginator::PaginatorError, EventCacheError}; +use matrix_sdk::{ + event_cache::{paginator::PaginatorError, EventCacheError}, + send_queue::RoomSendingQueueError, +}; use thiserror::Error; /// Errors specific to the timeline. @@ -127,3 +130,15 @@ enum UnsupportedEditItemInner { #[error("tried to edit a non-poll event")] NotPollEvent, } + +#[derive(Debug, Error)] +pub enum SendEventError { + #[error(transparent)] + UnsupportedReplyItem(#[from] UnsupportedReplyItem), + + #[error(transparent)] + UnsupportedEditItem(#[from] UnsupportedEditItem), + + #[error(transparent)] + SendError(#[from] RoomSendingQueueError), +} diff --git a/crates/matrix-sdk-ui/src/timeline/inner/mod.rs b/crates/matrix-sdk-ui/src/timeline/inner/mod.rs index 10a7e4014..6b54f95ca 100644 --- a/crates/matrix-sdk-ui/src/timeline/inner/mod.rs +++ b/crates/matrix-sdk-ui/src/timeline/inner/mod.rs @@ -722,12 +722,6 @@ impl TimelineInner

{ // When there is an error, sending further messages is paused. This should be // reflected in the timeline, so we set all other pending events to // cancelled. - // - // TODO(bnjbvr): spooky action at a distance here^. The sending task is the one - // deciding to clear the sending queue, so we're updating based on that implicit - // knowledge here. Instead, the sending queue should notify the timeline that - // it's deciding to not send those messages, and then only the - // timeline should mark these items as cancelled. let items = &mut txn.items; let num_items = items.len(); for idx in 0..num_items { @@ -804,42 +798,6 @@ impl TimelineInner

{ Ok(follow_up_action) } - /// Attempts to find a local echo item for some event to be sent. - /// - /// If it's found, it's moved back to the end of the timeline, then its - /// content is returned by this function. - pub(super) async fn prepare_retry( - &self, - txn_id: &TransactionId, - ) -> Option { - let mut state = self.state.write().await; - - let (idx, item) = rfind_event_item(&state.items, |it| it.transaction_id() == Some(txn_id))?; - let local_item = item.as_local()?; - - match &local_item.send_state { - EventSendState::NotSentYet => { - warn!("Attempted to retry the sending of an item that is already pending"); - return None; - } - EventSendState::Sent { .. } => { - warn!("Attempted to retry the sending of an item that has already succeeded"); - return None; - } - EventSendState::SendingFailed { .. } | EventSendState::Cancelled => {} - } - - let new_item = item.with_inner_kind(local_item.with_send_state(EventSendState::NotSentYet)); - let content = item.content.clone(); - - let mut txn = state.items.transaction(); - txn.remove(idx); - txn.push_back(new_item); - txn.commit(); - - Some(content) - } - pub(super) async fn discard_local_echo(&self, txn_id: &TransactionId) -> bool { let mut state = self.state.write().await; diff --git a/crates/matrix-sdk-ui/src/timeline/mod.rs b/crates/matrix-sdk-ui/src/timeline/mod.rs index 9099c2a15..e7c85b4a3 100644 --- a/crates/matrix-sdk-ui/src/timeline/mod.rs +++ b/crates/matrix-sdk-ui/src/timeline/mod.rs @@ -27,6 +27,7 @@ use matrix_sdk::{ event_handler::EventHandlerHandle, executor::JoinHandle, room::{Receipts, Room}, + send_queue::{AbortSendHandle, RoomSendingQueueError}, Client, Result, }; use matrix_sdk_base::RoomState; @@ -55,10 +56,9 @@ use ruma::{ TransactionId, UserId, }; use thiserror::Error; -use tokio::sync::mpsc::Sender; -use tracing::{debug, error, instrument, trace, warn}; +use tracing::{error, instrument, trace, warn}; -use self::{event_handler::TimelineEventKind, futures::SendAttachment}; +use self::{error::SendEventError, futures::SendAttachment}; mod builder; mod day_dividers; @@ -71,7 +71,6 @@ mod inner; mod item; mod pagination; mod polls; -mod queue; mod reactions; mod read_receipts; mod sliding_sync_ext; @@ -104,7 +103,6 @@ pub use self::{ }; use self::{ inner::{ReactionAction, TimelineInner}, - queue::LocalMessage, reactions::ReactionToggleResult, util::rfind_event_by_id, }; @@ -123,10 +121,6 @@ pub struct Timeline { /// The event cache specialized for this room's view. event_cache: RoomEventCache, - /// A sender to the task which responsibility is to send messages to the - /// current room. - msg_sender: Sender, - /// References to long-running tasks held by the timeline. drop_handle: Arc, } @@ -277,20 +271,11 @@ impl Timeline { /// [`MessageLikeUnsigned`]: ruma::events::MessageLikeUnsigned /// [`SyncMessageLikeEvent`]: ruma::events::SyncMessageLikeEvent #[instrument(skip(self, content), fields(room_id = ?self.room().room_id()))] - pub async fn send(&self, content: AnyMessageLikeEventContent) { - let txn_id = TransactionId::new(); - self.inner - .handle_local_event( - txn_id.clone(), - TimelineEventKind::Message { - content: content.clone(), - relations: Default::default(), - }, - ) - .await; - if self.msg_sender.send(LocalMessage { content, txn_id }).await.is_err() { - error!("Internal error: timeline message receiver is closed"); - } + pub async fn send( + &self, + content: AnyMessageLikeEventContent, + ) -> Result { + self.room().sending_queue().send(content).await } /// Send a reply to the given event. @@ -318,11 +303,11 @@ impl Timeline { content: RoomMessageEventContentWithoutRelation, reply_item: &EventTimelineItem, forward_thread: ForwardThread, - ) -> Result<(), UnsupportedReplyItem> { + ) -> Result<(), SendEventError> { // Error returns here must be in sync with // `EventTimelineItem::can_be_replied_to` let Some(event_id) = reply_item.event_id() else { - return Err(UnsupportedReplyItem::MISSING_EVENT_ID); + return Err(UnsupportedReplyItem::MISSING_EVENT_ID.into()); }; // [The specification](https://spec.matrix.org/v1.10/client-server-api/#user-and-room-mentions) says: @@ -352,7 +337,7 @@ impl Timeline { } _ => { let Some(raw_event) = reply_item.latest_json() else { - return Err(UnsupportedReplyItem::MISSING_JSON); + return Err(UnsupportedReplyItem::MISSING_JSON.into()); }; content.make_reply_to_raw( @@ -365,7 +350,8 @@ impl Timeline { } }; - self.send(content.into()).await; + self.send(content.into()).await?; + Ok(()) } @@ -384,14 +370,14 @@ impl Timeline { &self, new_content: RoomMessageEventContentWithoutRelation, edit_item: &EventTimelineItem, - ) -> Result<(), UnsupportedEditItem> { + ) -> Result<(), SendEventError> { // Early returns here must be in sync with // `EventTimelineItem::can_be_edited` let Some(event_id) = edit_item.event_id() else { - return Err(UnsupportedEditItem::MISSING_EVENT_ID); + return Err(UnsupportedEditItem::MISSING_EVENT_ID.into()); }; let TimelineItemContent::Message(original_content) = edit_item.content() else { - return Err(UnsupportedEditItem::NOT_ROOM_MESSAGE); + return Err(UnsupportedEditItem::NOT_ROOM_MESSAGE.into()); }; let replied_to_message = @@ -419,7 +405,8 @@ impl Timeline { replied_to_message.as_ref(), ); - self.send(content.into()).await; + self.send(content.into()).await?; + Ok(()) } @@ -428,12 +415,12 @@ impl Timeline { fallback_text: impl Into, poll: UnstablePollStartContentBlock, edit_item: &EventTimelineItem, - ) -> Result<(), UnsupportedEditItem> { + ) -> Result<(), SendEventError> { let Some(event_id) = edit_item.event_id() else { - return Err(UnsupportedEditItem::MISSING_EVENT_ID); + return Err(UnsupportedEditItem::MISSING_EVENT_ID.into()); }; let TimelineItemContent::Poll(_) = edit_item.content() else { - return Err(UnsupportedEditItem::NOT_POLL_EVENT); + return Err(UnsupportedEditItem::NOT_POLL_EVENT.into()); }; let replacement_poll = ReplacementUnstablePollStartEventContent::plain_text( @@ -441,7 +428,7 @@ impl Timeline { poll, event_id.into(), ); - self.send(UnstablePollStartEventContent::from(replacement_poll).into()).await; + self.send(UnstablePollStartEventContent::from(replacement_poll).into()).await?; Ok(()) } @@ -556,79 +543,6 @@ impl Timeline { SendAttachment::new(self, path.into(), mime_type, config) } - /// Retry sending a message that previously failed to send. - /// - /// # Arguments - /// - /// * `txn_id` - The transaction ID of a local echo timeline item that has a - /// `send_state()` of `SendState::FailedToSend { .. }` - #[instrument(skip(self))] - pub async fn retry_send(&self, txn_id: &TransactionId) -> Result<(), Error> { - macro_rules! error_return { - ($msg:literal) => {{ - error!($msg); - return Ok(()); - }}; - } - - let item = self.inner.prepare_retry(txn_id).await.ok_or(Error::RetryEventNotInTimeline)?; - - let content = match item { - TimelineItemContent::Message(msg) => { - AnyMessageLikeEventContent::RoomMessage(msg.into()) - } - TimelineItemContent::Sticker(sticker) => { - AnyMessageLikeEventContent::Sticker(sticker.content) - } - TimelineItemContent::Poll(poll_state) => AnyMessageLikeEventContent::UnstablePollStart( - UnstablePollStartEventContent::New(poll_state.into()), - ), - TimelineItemContent::MembershipChange(_) - | TimelineItemContent::ProfileChange(_) - | TimelineItemContent::OtherState(_) - | TimelineItemContent::CallInvite => { - error_return!("Retrying state events/call invites is not supported"); - } - TimelineItemContent::CallNotify => { - error_return!("Retrying call notifies is not supported"); - } - TimelineItemContent::FailedToParseMessageLike { .. } - | TimelineItemContent::FailedToParseState { .. } => { - error_return!("Invalid state: attempting to retry a failed-to-parse item"); - } - TimelineItemContent::RedactedMessage => { - error_return!("Invalid state: attempting to retry a redacted message"); - } - TimelineItemContent::UnableToDecrypt(_) => { - error_return!("Invalid state: attempting to retry a UTD item"); - } - }; - - debug!("Retrying failed local echo"); - let txn_id = txn_id.to_owned(); - if self.msg_sender.send(LocalMessage { content, txn_id }).await.is_err() { - error!("Internal error: timeline message receiver is closed"); - } - - Ok(()) - } - - /// Discard a local echo for a message that failed to send. - /// - /// Returns whether the local echo with the given transaction ID was found. - /// - /// # Argument - /// - /// * `txn_id` - The transaction ID of a local echo timeline item that has a - /// `send_state()` of `SendState::FailedToSend { .. }`. *Note:* A send - /// state of `SendState::NotYetSent` might be supported in the future as - /// well, but there can be no guarantee for that actually stopping the - /// event from reaching the server. - #[instrument(skip(self))] - pub async fn cancel_send(&self, txn_id: &TransactionId) -> bool { - self.inner.discard_local_echo(txn_id).await - } - /// Fetch unavailable details about the event with the given ID. /// /// This method only works for IDs of remote [`EventTimelineItem`]s, @@ -819,6 +733,7 @@ struct TimelineDropHandle { event_handler_handles: Vec, room_update_join_handle: JoinHandle<()>, room_key_from_backups_join_handle: JoinHandle<()>, + local_echo_listener_handle: JoinHandle<()>, _event_cache_drop_handle: Arc, } @@ -827,6 +742,7 @@ impl Drop for TimelineDropHandle { for handle in self.event_handler_handles.drain(..) { self.client.remove_event_handler(handle); } + self.local_echo_listener_handle.abort(); self.room_update_join_handle.abort(); self.room_key_from_backups_join_handle.abort(); } diff --git a/crates/matrix-sdk-ui/src/timeline/queue.rs b/crates/matrix-sdk-ui/src/timeline/queue.rs deleted file mode 100644 index b99836aa2..000000000 --- a/crates/matrix-sdk-ui/src/timeline/queue.rs +++ /dev/null @@ -1,275 +0,0 @@ -// Copyright 2023 The Matrix.org Foundation C.I.C. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -use std::{ - collections::VecDeque, - future::{pending, Future, Pending}, - mem, - pin::Pin, - sync::Arc, - task::{Context, Poll}, -}; - -use futures_util::future::Either; -use matrix_sdk::{ - executor::{spawn, JoinError, JoinHandle}, - Room, -}; -use matrix_sdk_base::RoomState; -use ruma::{events::AnyMessageLikeEventContent, OwnedEventId, OwnedTransactionId}; -use tokio::{select, sync::mpsc::Receiver}; -use tracing::{debug, error, info, instrument, trace, warn}; - -use super::{inner::TimelineInner, EventSendState}; - -/// A locally-created message that is supposed to be sent. -pub(super) struct LocalMessage { - /// The transaction ID. - /// - /// Used for finding the corresponding local echo in the timeline. - pub txn_id: OwnedTransactionId, - - /// The message contents. - pub content: AnyMessageLikeEventContent, -} - -#[instrument(skip_all, fields(room_id = ?room.room_id()))] -pub(super) async fn send_queued_messages( - timeline: TimelineInner, - room: Room, - mut msg_receiver: Receiver, -) { - let mut queue = VecDeque::new(); - let mut send_task: SendMessageTask = SendMessageTask::Idle; - let mut recv_fut: Either<_, Pending>> = - Either::Left(Box::pin(msg_receiver.recv())); - - loop { - select! { - send_result = &mut send_task => { - trace!("SendMessageTask finished"); - - send_task.reset(); - - handle_send_result( - send_result, - &mut send_task, - &mut queue, - &timeline, - ).await; - } - - recv_res = &mut recv_fut => { - recv_fut = if let Some(msg) = recv_res { - trace!("Got a LocalMessage"); - - send_or_queue_msg( - msg, - room.clone(), - &mut send_task, - &mut queue, - &timeline, - ).await; - - // appease the borrow checker - drop(recv_fut); - - Either::Left(Box::pin(msg_receiver.recv())) - } else { - info!("Message receiver closed"); - Either::Right(pending()) - }; - } - } - - if send_task.is_idle() && matches!(recv_fut, Either::Right(_)) { - break; - } - } - - info!("Stopped"); -} - -async fn send_or_queue_msg( - msg: LocalMessage, - room: Room, - send_task: &mut SendMessageTask, - queue: &mut VecDeque, - timeline: &TimelineInner, -) { - // Only send the message immediately if there aren't other messages to be sent - // first, and we're not currently sending a message. - if !queue.is_empty() || !send_task.is_idle() { - queue.push_back(msg); - return; - } - - if room.state() == RoomState::Joined { - send_task.start(room, msg); - } else { - info!("Refusing to send message, room is not joined"); - timeline - .update_event_send_state( - &msg.txn_id, - EventSendState::SendingFailed { - // FIXME: Probably not exactly right - error: Arc::new(matrix_sdk::Error::InconsistentState), - }, - ) - .await; - } -} - -async fn handle_send_result( - send_result: SendMessageResult, - send_task: &mut SendMessageTask, - queue: &mut VecDeque, - timeline: &TimelineInner, -) { - match send_result { - SendMessageResult::Success { event_id, txn_id } => { - timeline.update_event_send_state(&txn_id, EventSendState::Sent { event_id }).await; - - // Event was successfully sent, move on to the next queued event. - if let Some(msg) = queue.pop_front() { - send_task.start(timeline.room().clone(), msg); - } - } - - SendMessageResult::SendingFailed { send_error, txn_id } => { - timeline - .update_event_send_state( - &txn_id, - EventSendState::SendingFailed { error: Arc::new(send_error) }, - ) - .await; - - // Clear the queue and wait for the user to explicitly retry (which will - // re-append the to-be-sent events in the queue). - queue.clear(); - } - - SendMessageResult::TaskError { join_error, txn_id } => { - error!("Message-sending task failed: {join_error}"); - - timeline - .update_event_send_state( - &txn_id, - EventSendState::SendingFailed { - // FIXME: Probably not exactly right - error: Arc::new(matrix_sdk::Error::InconsistentState), - }, - ) - .await; - - // See above comment in the `SendingFailed` arm. - queue.clear(); - } - } -} - -/// Result of [`SendMessageTask`]. -enum SendMessageResult { - /// The message was sent successfully. - Success { - /// The event id returned by the server. - event_id: OwnedEventId, - /// The transaction ID of the message that was being sent by the task. - txn_id: OwnedTransactionId, - }, - - /// Sending failed. - SendingFailed { - /// The reason of the sending failure. - send_error: matrix_sdk::Error, - /// The transaction ID of the message that was being sent by the task. - txn_id: OwnedTransactionId, - }, - - /// The [`SendMessageTask`] failed, likely due to a panic. - TaskError { - /// The error with which the task failed. - join_error: JoinError, - /// The transaction ID of the message that was being sent by the task. - txn_id: OwnedTransactionId, - }, -} - -/// Future that tracks the process of an event-sending background task, if one -/// is currently active for a given message-sending queue. -enum SendMessageTask { - /// No background task is active right now. - Idle, - - /// A background task has been spawned, we're waiting for its result. - Running { - /// The transaction ID of the message that is being sent. - txn_id: OwnedTransactionId, - /// Handle to the task itself. - task: JoinHandle, - }, -} - -impl SendMessageTask { - #[must_use] - fn is_idle(&self) -> bool { - matches!(self, Self::Idle) - } - - /// Spawns a task sending the message to the room, and updating the timeline - /// once the result has been processed. - fn start(&mut self, room: Room, msg: LocalMessage) { - debug!("Spawning message-sending task"); - - let txn_id = msg.txn_id.clone(); - - let task = spawn(async move { - match room.send(msg.content).with_transaction_id(&msg.txn_id).await { - Ok(response) => { - SendMessageResult::Success { event_id: response.event_id, txn_id: msg.txn_id } - } - Err(error) => { - SendMessageResult::SendingFailed { send_error: error, txn_id: msg.txn_id } - } - } - }); - - *self = Self::Running { txn_id, task }; - } - - fn reset(&mut self) { - *self = Self::Idle; - } -} - -impl Future for SendMessageTask { - type Output = SendMessageResult; - - fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - match &mut *self { - SendMessageTask::Idle => Poll::Pending, - - SendMessageTask::Running { txn_id, task } => Pin::new(task).poll(cx).map(|result| { - let txn_id = mem::replace(txn_id, OwnedTransactionId::from("")); - if txn_id.as_str().is_empty() { - warn!("SendMessageTask polled after returning Poll::Ready!"); - } - result.unwrap_or_else(|error| SendMessageResult::TaskError { - join_error: error, - txn_id, - }) - }), - } - } -} diff --git a/crates/matrix-sdk-ui/tests/integration/room_list_service.rs b/crates/matrix-sdk-ui/tests/integration/room_list_service.rs index 1d10ddb6c..0821b0f5b 100644 --- a/crates/matrix-sdk-ui/tests/integration/room_list_service.rs +++ b/crates/matrix-sdk-ui/tests/integration/room_list_service.rs @@ -2537,7 +2537,10 @@ async fn test_room_latest_event() -> Result<(), Error> { ); // Insert a local event in the `Timeline`. - timeline.send(RoomMessageEventContent::text_plain("Hello, World!").into()).await; + timeline.send(RoomMessageEventContent::text_plain("Hello, World!").into()).await.unwrap(); + + // Let the sending queue send the message, and the timeline process it. + yield_now().await; // The latest event of the `Timeline` is a local event. assert_matches!( diff --git a/crates/matrix-sdk-ui/tests/integration/timeline/echo.rs b/crates/matrix-sdk-ui/tests/integration/timeline/echo.rs index a62e852f2..e12d4eb7a 100644 --- a/crates/matrix-sdk-ui/tests/integration/timeline/echo.rs +++ b/crates/matrix-sdk-ui/tests/integration/timeline/echo.rs @@ -31,6 +31,7 @@ use ruma::{ }; use serde_json::json; use stream_assert::assert_next_matches; +use tokio::task::yield_now; use wiremock::{ matchers::{header, method, path_regex}, Mock, ResponseTemplate, @@ -87,7 +88,7 @@ async fn test_echo() { assert_eq!(text.body, "Hello, World!"); // Wait for the sending to finish and assert everything was successful - send_hdl.await.unwrap(); + send_hdl.await.unwrap().unwrap(); assert_let!( Some(VectorDiff::Set { index: 1, value: sent_confirmation }) = timeline_stream.next().await @@ -141,12 +142,17 @@ async fn test_retry_failed() { mock_encryption_state(&server, false).await; + client.sending_queue().enable(); + let room = client.get_room(room_id).unwrap(); let timeline = Arc::new(room.timeline().await.unwrap()); let (_, mut timeline_stream) = timeline.subscribe_filter_map(|item| item.as_event().cloned()).await; - timeline.send(RoomMessageEventContent::text_plain("Hello, World!").into()).await; + timeline.send(RoomMessageEventContent::text_plain("Hello, World!").into()).await.unwrap(); + + // Let the sending queue handle the event. + yield_now().await; // First, local echo is added assert_next_matches!(timeline_stream, VectorDiff::PushBack { value } => { @@ -156,7 +162,6 @@ async fn test_retry_failed() { // Sending fails, the mock server has no matching route yet assert_let!(Some(VectorDiff::Set { index: 0, value: item }) = timeline_stream.next().await); assert_matches!(item.send_state(), Some(EventSendState::SendingFailed { .. })); - let txn_id = item.transaction_id().unwrap().to_owned(); Mock::given(method("PUT")) .and(path_regex(r"^/_matrix/client/r0/rooms/.*/send/.*")) @@ -168,16 +173,14 @@ async fn test_retry_failed() { .mount(&server) .await; - timeline.retry_send(&txn_id).await.unwrap(); + assert!(!client.sending_queue().is_enabled()); - // After mocking the endpoint and retrying, it first transitions back out of - // the error state - assert_next_matches!(timeline_stream, VectorDiff::Remove { index: 0 }); - assert_next_matches!(timeline_stream, VectorDiff::PushBack { value } => { - assert_matches!(value.send_state(), Some(EventSendState::NotSentYet)); - }); + client.sending_queue().enable(); - // … before succeeding. + // Let the sending queue handle the event. + tokio::time::sleep(Duration::from_millis(300)).await; + + // After mocking the endpoint and retrying, it succeeds. assert_let!(Some(VectorDiff::Set { index: 0, value }) = timeline_stream.next().await); assert_matches!(value.send_state(), Some(EventSendState::Sent { .. })); } @@ -216,7 +219,7 @@ async fn test_dedup_by_event_id_late() { .mount(&server) .await; - timeline.send(RoomMessageEventContent::text_plain("Hello, World!").into()).await; + timeline.send(RoomMessageEventContent::text_plain("Hello, World!").into()).await.unwrap(); assert_let!(Some(VectorDiff::PushBack { value: local_echo }) = timeline_stream.next().await); let item = local_echo.as_event().unwrap(); @@ -275,12 +278,15 @@ async fn test_cancel_failed() { let (_, mut timeline_stream) = timeline.subscribe_filter_map(|item| item.as_event().cloned()).await; - timeline.send(RoomMessageEventContent::text_plain("Hello, World!").into()).await; + let handle = + timeline.send(RoomMessageEventContent::text_plain("Hello, World!").into()).await.unwrap(); + + // Let the sending queue handle the event. + yield_now().await; // Local echo is added (immediately) - let txn_id = assert_next_matches!(timeline_stream, VectorDiff::PushBack { value } => { + assert_next_matches!(timeline_stream, VectorDiff::PushBack { value } => { assert_matches!(value.send_state(), Some(EventSendState::NotSentYet)); - value.transaction_id().unwrap().to_owned() }); // Sending fails, the mock server has no matching route @@ -288,7 +294,7 @@ async fn test_cancel_failed() { assert_matches!(value.send_state(), Some(EventSendState::SendingFailed { .. })); // Discard, assert the local echo is found - assert!(timeline.cancel_send(&txn_id).await); + assert!(handle.abort().await); // Observable local echo being removed assert_matches!(timeline_stream.next().await, Some(VectorDiff::Remove { index: 0 })); diff --git a/crates/matrix-sdk-ui/tests/integration/timeline/edit.rs b/crates/matrix-sdk-ui/tests/integration/timeline/edit.rs index 6580e000e..a3edc5fc5 100644 --- a/crates/matrix-sdk-ui/tests/integration/timeline/edit.rs +++ b/crates/matrix-sdk-ui/tests/integration/timeline/edit.rs @@ -40,7 +40,7 @@ use ruma::{ }; use serde_json::json; use stream_assert::assert_next_matches; -use tokio::time::sleep; +use tokio::{task::yield_now, time::sleep}; use wiremock::{ matchers::{method, path_regex}, Mock, ResponseTemplate, @@ -206,6 +206,9 @@ async fn test_send_edit() { .await .unwrap(); + // Let the sending queue handle the event. + yield_now().await; + let edit_item = assert_next_matches!(timeline_stream, VectorDiff::Set { index: 0, value } => value); @@ -293,6 +296,9 @@ async fn test_send_reply_edit() { .await .unwrap(); + // Let the sending queue handle the event. + yield_now().await; + let edit_item = assert_next_matches!(timeline_stream, VectorDiff::Set { index: 1, value } => value); @@ -381,6 +387,9 @@ async fn test_send_edit_poll() { UnstablePollStartContentBlock::new("Edited Test".to_owned(), edited_poll_answers); timeline.edit_poll("poll_fallback_text", edited_poll, &poll_event).await.unwrap(); + // Let the sending queue handle the event. + yield_now().await; + let edit_item = assert_next_matches!(timeline_stream, VectorDiff::Set { index: 0, value } => value); diff --git a/crates/matrix-sdk-ui/tests/integration/timeline/queue.rs b/crates/matrix-sdk-ui/tests/integration/timeline/queue.rs index 66048d581..09198bfb9 100644 --- a/crates/matrix-sdk-ui/tests/integration/timeline/queue.rs +++ b/crates/matrix-sdk-ui/tests/integration/timeline/queue.rs @@ -26,7 +26,7 @@ use ruma::{ }; use serde_json::json; use stream_assert::{assert_next_matches, assert_pending}; -use tokio::time::sleep; +use tokio::{task::yield_now, time::sleep}; use wiremock::{ matchers::{body_string_contains, method, path_regex}, Mock, ResponseTemplate, @@ -79,10 +79,13 @@ async fn test_message_order() { .mount(&server) .await; - timeline.send(RoomMessageEventContent::text_plain("First!").into()).await; - timeline.send(RoomMessageEventContent::text_plain("Second.").into()).await; + timeline.send(RoomMessageEventContent::text_plain("First!").into()).await.unwrap(); + timeline.send(RoomMessageEventContent::text_plain("Second.").into()).await.unwrap(); - // Local echoes are available as soon as `timeline.send` returns + // Let the sending queue handle the event. + yield_now().await; + + // Local echoes are available after the sending queue has processed these. assert_next_matches!(timeline_stream, VectorDiff::PushBack { value } => { assert_eq!(value.content().as_message().unwrap().body(), "First!"); }); @@ -128,10 +131,13 @@ async fn test_retry_order() { // Send two messages without mocking the server response. // It will respond with a 404, resulting in a failed-to-send state. - timeline.send(RoomMessageEventContent::text_plain("First!").into()).await; - timeline.send(RoomMessageEventContent::text_plain("Second.").into()).await; + timeline.send(RoomMessageEventContent::text_plain("First!").into()).await.unwrap(); + timeline.send(RoomMessageEventContent::text_plain("Second.").into()).await.unwrap(); - // Local echoes are available as soon as `timeline.send` returns + // Let the sending queue handle the event. + yield_now().await; + + // Local echoes are available after the sending queue has processed these. assert_next_matches!(timeline_stream, VectorDiff::PushBack { value } => { assert_eq!(value.content().as_message().unwrap().body(), "First!"); }); @@ -140,16 +146,14 @@ async fn test_retry_order() { }); // Local echoes are updated with the failed send state as soon as - // the 404 response is received + // the 404 response is received. assert_let!(Some(VectorDiff::Set { index: 0, value: first }) = timeline_stream.next().await); assert_matches!(first.send_state().unwrap(), EventSendState::SendingFailed { .. }); - let txn_id_1 = first.transaction_id().unwrap().to_owned(); - // The second one is cancelled without an extra delay + // The second one is cancelled without an extra delay. let second = assert_next_matches!(timeline_stream, VectorDiff::Set { index: 1, value } => value); assert_matches!(second.send_state().unwrap(), EventSendState::Cancelled); - let txn_id_2 = second.transaction_id().unwrap().to_owned(); // Response for first message takes 100ms to respond Mock::given(method("PUT")) @@ -177,38 +181,26 @@ async fn test_retry_order() { .await; // Retry the second message first - timeline.retry_send(&txn_id_2).await.unwrap(); - timeline.retry_send(&txn_id_1).await.unwrap(); - - // Both items are immediately updated and moved to the bottom in the order - // of the function calls to indicate they are being sent - assert_next_matches!(timeline_stream, VectorDiff::Remove { index: 1 }); - assert_next_matches!(timeline_stream, VectorDiff::PushBack { value } => { - assert_matches!(value.send_state().unwrap(), EventSendState::NotSentYet); - assert_eq!(value.content().as_message().unwrap().body(), "Second."); - }); - assert_next_matches!(timeline_stream, VectorDiff::Remove { index: 0 }); - assert_next_matches!(timeline_stream, VectorDiff::PushBack { value } => { - assert_matches!(value.send_state().unwrap(), EventSendState::NotSentYet); - assert_eq!(value.content().as_message().unwrap().body(), "First!"); - }); + client.sending_queue().enable(); // Wait 200ms for the first msg, 100ms for the second, 300ms for overhead sleep(Duration::from_millis(600)).await; - // The second item (now at index 0) should be updated first, since it was - // retried first + // With the sending queue, sending is retried in the same order as the events + // were sent. So we first see the first message. assert_next_matches!(timeline_stream, VectorDiff::Set { index: 0, value } => { - assert_eq!(value.content().as_message().unwrap().body(), "Second."); - assert_matches!(value.send_state().unwrap(), EventSendState::Sent { .. }); - assert_eq!(value.event_id().unwrap(), "$5E2kLK/Sg342bgBU9ceEIEPYpbFaqJpZ"); - }); - // Then the first one - assert_next_matches!(timeline_stream, VectorDiff::Set { index: 1, value } => { assert_eq!(value.content().as_message().unwrap().body(), "First!"); assert_matches!(value.send_state().unwrap(), EventSendState::Sent { .. }); assert_eq!(value.event_id().unwrap(), "$PyHxV5mYzjetBUT3qZq7V95GOzxb02EP"); }); + + // Then the second. + assert_next_matches!(timeline_stream, VectorDiff::Set { index: 1, value } => { + assert_eq!(value.content().as_message().unwrap().body(), "Second."); + assert_matches!(value.send_state().unwrap(), EventSendState::Sent { .. }); + assert_eq!(value.event_id().unwrap(), "$5E2kLK/Sg342bgBU9ceEIEPYpbFaqJpZ"); + }); + assert_pending!(timeline_stream); } @@ -235,7 +227,7 @@ async fn test_clear_with_echoes() { { let (_, mut timeline_stream) = timeline.subscribe().await; - timeline.send(RoomMessageEventContent::text_plain("Send failure").into()).await; + timeline.send(RoomMessageEventContent::text_plain("Send failure").into()).await.unwrap(); // Wait for the first message to fail. Don't use time, but listen for the first // timeline item diff to get back signalling the error. @@ -256,7 +248,7 @@ async fn test_clear_with_echoes() { .await; // (this one) - timeline.send(RoomMessageEventContent::text_plain("Pending").into()).await; + timeline.send(RoomMessageEventContent::text_plain("Pending").into()).await.unwrap(); // Another message comes in. sync_builder.add_joined_room(JoinedRoomBuilder::new(room_id).add_timeline_event( @@ -340,8 +332,11 @@ async fn test_no_duplicate_day_divider() { .mount(&server) .await; - timeline.send(RoomMessageEventContent::text_plain("First!").into()).await; - timeline.send(RoomMessageEventContent::text_plain("Second.").into()).await; + timeline.send(RoomMessageEventContent::text_plain("First!").into()).await.unwrap(); + timeline.send(RoomMessageEventContent::text_plain("Second.").into()).await.unwrap(); + + // Let the sending queue handle the event. + yield_now().await; // Local echoes are available as soon as `timeline.send` returns. assert_next_matches!(timeline_stream, VectorDiff::PushBack { value } => { diff --git a/crates/matrix-sdk-ui/tests/integration/timeline/replies.rs b/crates/matrix-sdk-ui/tests/integration/timeline/replies.rs index d710903e7..2a3ac068b 100644 --- a/crates/matrix-sdk-ui/tests/integration/timeline/replies.rs +++ b/crates/matrix-sdk-ui/tests/integration/timeline/replies.rs @@ -29,6 +29,7 @@ use ruma::{ }; use serde_json::json; use stream_assert::assert_next_matches; +use tokio::task::yield_now; use wiremock::{ matchers::{header, method, path_regex}, Mock, Request, ResponseTemplate, @@ -322,6 +323,9 @@ async fn test_send_reply() { .await .unwrap(); + // Let the sending queue handle the event. + yield_now().await; + let reply_item = assert_next_matches!(timeline_stream, VectorDiff::PushBack { value } => value); assert_matches!(reply_item.send_state(), Some(EventSendState::NotSentYet)); @@ -426,6 +430,9 @@ async fn test_send_reply_to_self() { .await .unwrap(); + // Let the sending queue handle the event. + yield_now().await; + let reply_item = assert_next_matches!(timeline_stream, VectorDiff::PushBack { value } => value); assert_matches!(reply_item.send_state(), Some(EventSendState::NotSentYet)); @@ -513,6 +520,9 @@ async fn test_send_reply_to_threaded() { .await .unwrap(); + // Let the sending queue handle the event. + yield_now().await; + let reply_item = assert_next_matches!(timeline_stream, VectorDiff::PushBack { value } => value); assert_matches!(reply_item.send_state(), Some(EventSendState::NotSentYet)); diff --git a/labs/multiverse/src/main.rs b/labs/multiverse/src/main.rs index a1f9efc8d..dfab54f28 100644 --- a/labs/multiverse/src/main.rs +++ b/labs/multiverse/src/main.rs @@ -455,7 +455,7 @@ impl App { .map(|timeline| timeline.timeline.clone()) }) { - sdk_timeline + match sdk_timeline .send( RoomMessageEventContent::text_plain(format!( "hey {}", @@ -463,9 +463,17 @@ impl App { )) .into(), ) - .await; - - self.set_status_message("message sent!".to_owned()); + .await + { + Ok(_) => { + self.set_status_message("message sent!".to_owned()); + } + Err(err) => { + self.set_status_message(format!( + "error when sending event: {err}" + )); + } + } } else { self.set_status_message("missing timeline for room".to_owned()); }; diff --git a/testing/matrix-sdk-integration-testing/src/tests/reactions.rs b/testing/matrix-sdk-integration-testing/src/tests/reactions.rs index fc62a1305..b5cbd1794 100644 --- a/testing/matrix-sdk-integration-testing/src/tests/reactions.rs +++ b/testing/matrix-sdk-integration-testing/src/tests/reactions.rs @@ -102,7 +102,7 @@ async fn test_toggling_reaction() -> Result<()> { // Send message. debug!("Sending initial message…"); - timeline.send(RoomMessageEventContent::text_plain("hi!").into()).await; + timeline.send(RoomMessageEventContent::text_plain("hi!").into()).await.unwrap(); let event_id = timeout(Duration::from_secs(10), event_id_task) .await