diff --git a/crates/matrix-sdk/src/send_queue/mod.rs b/crates/matrix-sdk/src/send_queue/mod.rs index 5f4ac0e8f..c100fa1f8 100644 --- a/crates/matrix-sdk/src/send_queue/mod.rs +++ b/crates/matrix-sdk/src/send_queue/mod.rs @@ -164,7 +164,8 @@ use ruma::{ AnyMessageLikeEventContent, EventContent as _, Mentions, }, serde::Raw, - MilliSecondsSinceUnixEpoch, OwnedEventId, OwnedRoomId, OwnedTransactionId, TransactionId, + MilliSecondsSinceUnixEpoch, OwnedEventId, OwnedRoomId, OwnedTransactionId, RoomId, + TransactionId, }; use tokio::sync::{broadcast, oneshot, Mutex, Notify, OwnedMutexGuard}; use tracing::{debug, error, info, instrument, trace, warn}; @@ -242,7 +243,8 @@ impl SendQueue { let owned_room_id = room_id.to_owned(); let room_q = RoomSendQueue::new( self.is_enabled(), - data.error_reporter.clone(), + data.global_update_sender.clone(), + data.error_sender.clone(), data.is_dropping.clone(), &self.client, owned_room_id.clone(), @@ -283,10 +285,18 @@ impl SendQueue { self.data().globally_enabled.load(Ordering::SeqCst) } + /// Subscribe to all updates for all rooms. + /// + /// Use [`RoomSendQueue::subscribe`] to subscribe to update for a _specific + /// room_. + pub fn subscribe(&self) -> broadcast::Receiver { + self.data().global_update_sender.subscribe() + } + /// A subscriber to the enablement status (enabled or disabled) of the /// send queue, along with useful errors. pub fn subscribe_errors(&self) -> broadcast::Receiver { - self.data().error_reporter.subscribe() + self.data().error_sender.subscribe() } } @@ -325,8 +335,13 @@ pub(super) struct SendQueueData { /// initial enablement state. globally_enabled: AtomicBool, + /// Global sender to send [`SendQueueUpdate`]. + /// + /// See [`SendQueue::subscribe`]. + global_update_sender: broadcast::Sender, + /// Global error updates for the send queue. - error_reporter: broadcast::Sender, + error_sender: broadcast::Sender, /// Are we currently dropping the Client? is_dropping: Arc, @@ -335,12 +350,14 @@ pub(super) struct SendQueueData { impl SendQueueData { /// Create the data for a send queue, in the given enabled state. pub fn new(globally_enabled: bool) -> Self { - let (sender, _) = broadcast::channel(32); + let (global_update_sender, _) = broadcast::channel(32); + let (error_sender, _) = broadcast::channel(32); Self { rooms: Default::default(), globally_enabled: AtomicBool::new(globally_enabled), - error_reporter: sender, + global_update_sender, + error_sender, is_dropping: Arc::new(false.into()), } } @@ -385,12 +402,13 @@ impl std::fmt::Debug for RoomSendQueue { impl RoomSendQueue { fn new( globally_enabled: bool, - global_error_reporter: broadcast::Sender, + global_update_sender: broadcast::Sender, + global_error_sender: broadcast::Sender, is_dropping: Arc, client: &Client, room_id: OwnedRoomId, ) -> Self { - let (updates_sender, _) = broadcast::channel(32); + let (update_sender, _) = broadcast::channel(32); let queue = QueueStorage::new(WeakClient::from_client(client), room_id.clone()); let notifier = Arc::new(Notify::new()); @@ -402,16 +420,18 @@ impl RoomSendQueue { weak_room.clone(), queue.clone(), notifier.clone(), - updates_sender.clone(), + global_update_sender.clone(), + update_sender.clone(), locally_enabled.clone(), - global_error_reporter, + global_error_sender, is_dropping, )); Self { inner: Arc::new(RoomSendQueueInner { room: weak_room, - updates: updates_sender, + global_update_sender, + update_sender, _task: task, queue, notifier, @@ -461,7 +481,7 @@ impl RoomSendQueue { created_at, }; - let _ = self.inner.updates.send(RoomSendQueueUpdate::NewLocalEvent(LocalEcho { + self.send_update(RoomSendQueueUpdate::NewLocalEvent(LocalEcho { transaction_id, content: LocalEchoContent::Event { serialized_event: content, @@ -500,13 +520,16 @@ impl RoomSendQueue { /// Returns the current local requests as well as a receiver to listen to /// the send queue updates, as defined in [`RoomSendQueueUpdate`]. + /// + /// Use [`SendQueue::subscribe`] to subscribe to update for _all rooms_ with + /// a single receiver. pub async fn subscribe( &self, ) -> Result<(Vec, broadcast::Receiver), RoomSendQueueError> { let local_echoes = self.inner.queue.local_echoes(self).await?; - Ok((local_echoes, self.inner.updates.subscribe())) + Ok((local_echoes, self.inner.update_sender.subscribe())) } /// A task that must be spawned in the async runtime, running in the @@ -514,18 +537,33 @@ impl RoomSendQueue { /// /// It only progresses forward: nothing can be cancelled at any point, which /// makes the implementation not overly complicated to follow. + #[allow(clippy::too_many_arguments)] #[instrument(skip_all, fields(room_id = %room.room_id()))] async fn sending_task( room: WeakRoom, queue: QueueStorage, notifier: Arc, - updates: broadcast::Sender, + global_update_sender: broadcast::Sender, + update_sender: broadcast::Sender, locally_enabled: Arc, - global_error_reporter: broadcast::Sender, + global_error_sender: broadcast::Sender, is_dropping: Arc, ) { trace!("spawned the sending task"); + fn send_update( + global_update_sender: &broadcast::Sender, + update_sender: &broadcast::Sender, + room_id: &RoomId, + update: RoomSendQueueUpdate, + ) { + let _ = update_sender.send(update.clone()); + let _ = + global_update_sender.send(SendQueueUpdate { room_id: room_id.to_owned(), update }); + } + + let room_id = room.room_id(); + loop { // A request to shut down should be preferred above everything else. if is_dropping.load(Ordering::SeqCst) { @@ -541,7 +579,7 @@ impl RoomSendQueue { } for up in new_updates { - let _ = updates.send(up); + send_update(&global_update_sender, &update_sender, room_id, up); } if !locally_enabled.load(Ordering::SeqCst) { @@ -585,17 +623,24 @@ impl RoomSendQueue { { Ok(()) => match parent_key { SentRequestKey::Event(event_id) => { - let _ = updates.send(RoomSendQueueUpdate::SentEvent { - transaction_id: txn_id, - event_id, - }); + send_update( + &global_update_sender, + &update_sender, + room_id, + RoomSendQueueUpdate::SentEvent { transaction_id: txn_id, event_id }, + ); } SentRequestKey::Media(media_info) => { - let _ = updates.send(RoomSendQueueUpdate::UploadedMedia { - related_to: related_txn_id.as_ref().unwrap_or(&txn_id).clone(), - file: media_info.file, - }); + send_update( + &global_update_sender, + &update_sender, + room_id, + RoomSendQueueUpdate::UploadedMedia { + related_to: related_txn_id.as_ref().unwrap_or(&txn_id).clone(), + file: media_info.file, + }, + ); } }, @@ -656,17 +701,22 @@ impl RoomSendQueue { let error = Arc::new(err); - let _ = global_error_reporter.send(SendQueueRoomError { - room_id: room.room_id().to_owned(), + let _ = global_error_sender.send(SendQueueRoomError { + room_id: room_id.to_owned(), error: error.clone(), is_recoverable, }); - let _ = updates.send(RoomSendQueueUpdate::SendError { - transaction_id: related_txn_id.unwrap_or(txn_id), - error, - is_recoverable, - }); + send_update( + &global_update_sender, + &update_sender, + room_id, + RoomSendQueueUpdate::SendError { + transaction_id: related_txn_id.unwrap_or(txn_id), + error, + is_recoverable, + }, + ); } } } @@ -804,6 +854,17 @@ impl RoomSendQueue { self.inner.notifier.notify_one(); } } + + /// Send an update on the room send queue channel, and on the global send + /// queue channel, i.e. it sends a [`RoomSendQueueUpdate`] and a + /// [`SendQueueUpdate`]. + fn send_update(&self, update: RoomSendQueueUpdate) { + let _ = self.inner.update_sender.send(update.clone()); + let _ = self + .inner + .global_update_sender + .send(SendQueueUpdate { room_id: self.inner.room.room_id().to_owned(), update }); + } } impl From<&crate::Error> for QueueWedgeError { @@ -840,10 +901,17 @@ struct RoomSendQueueInner { /// The room which this send queue relates to. room: WeakRoom, + /// Global sender to send [`SendQueueUpdate`]. + /// + /// See [`SendQueue::subscribe`]. + global_update_sender: broadcast::Sender, + /// Broadcaster for notifications about the statuses of requests to be sent. /// /// Can be subscribed to from the outside. - updates: broadcast::Sender, + /// + /// See [`RoomSendQueue::subscribe`]. + update_sender: broadcast::Sender, /// Queue of requests that are either to be sent, or being sent. /// @@ -2078,6 +2146,19 @@ pub enum RoomSendQueueUpdate { }, } +/// A [`RoomSendQueueUpdate`] with an associated [`OwnedRoomId`]. +/// +/// This is used by [`SendQueue::subscribe`] to get a single channel to receive +/// updates for all [`RoomSendQueue`]s. +#[derive(Clone, Debug)] +pub struct SendQueueUpdate { + /// The room where the update happened. + pub room_id: OwnedRoomId, + + /// The update for this room. + pub update: RoomSendQueueUpdate, +} + /// An error triggered by the send queue module. #[derive(Debug, thiserror::Error)] pub enum RoomSendQueueError { @@ -2200,7 +2281,7 @@ impl SendHandle { for handles in &self.media_handles { if queue.abort_upload(&self.transaction_id, handles).await? { // Propagate a cancelled update. - let _ = self.room.inner.updates.send(RoomSendQueueUpdate::CancelledLocalEvent { + self.room.send_update(RoomSendQueueUpdate::CancelledLocalEvent { transaction_id: self.transaction_id.clone(), }); @@ -2216,7 +2297,7 @@ impl SendHandle { trace!("successful abort"); // Propagate a cancelled update too. - let _ = self.room.inner.updates.send(RoomSendQueueUpdate::CancelledLocalEvent { + self.room.send_update(RoomSendQueueUpdate::CancelledLocalEvent { transaction_id: self.transaction_id.clone(), }); @@ -2249,7 +2330,7 @@ impl SendHandle { self.room.inner.notifier.notify_one(); // Propagate a replaced update too. - let _ = self.room.inner.updates.send(RoomSendQueueUpdate::ReplacedLocalEvent { + self.room.send_update(RoomSendQueueUpdate::ReplacedLocalEvent { transaction_id: self.transaction_id.clone(), new_content: serializable, }); @@ -2302,7 +2383,7 @@ impl SendHandle { .map_err(RoomSendQueueStorageError::JsonSerialization)?; // Propagate a replaced update too. - let _ = self.room.inner.updates.send(RoomSendQueueUpdate::ReplacedLocalEvent { + self.room.send_update(RoomSendQueueUpdate::ReplacedLocalEvent { transaction_id: self.transaction_id.clone(), new_content, }); @@ -2344,9 +2425,9 @@ impl SendHandle { // Wake up the queue, in case the room was asleep before unwedging the request. room.notifier.notify_one(); - let _ = room - .updates - .send(RoomSendQueueUpdate::RetryEvent { transaction_id: self.transaction_id.clone() }); + self.room.send_update(RoomSendQueueUpdate::RetryEvent { + transaction_id: self.transaction_id.clone(), + }); Ok(()) } @@ -2377,9 +2458,9 @@ impl SendHandle { transaction_id: reaction_txn_id.clone(), }; - let _ = self.room.inner.updates.send(RoomSendQueueUpdate::NewLocalEvent(LocalEcho { - // Note: we do want to use the txn_id we're going to use for the reaction, not the - // one for the event we're reacting to. + self.room.send_update(RoomSendQueueUpdate::NewLocalEvent(LocalEcho { + // Note: we do want to use the `txn_id` we're going to use for the reaction, not + // the one for the event we're reacting to. transaction_id: reaction_txn_id.into(), content: LocalEchoContent::React { key, @@ -2415,7 +2496,7 @@ impl SendReactionHandle { // Simple case: the reaction was found in the dependent event list. // Propagate a cancelled update too. - let _ = self.room.inner.updates.send(RoomSendQueueUpdate::CancelledLocalEvent { + self.room.send_update(RoomSendQueueUpdate::CancelledLocalEvent { transaction_id: self.transaction_id.clone().into(), }); diff --git a/crates/matrix-sdk/src/send_queue/upload.rs b/crates/matrix-sdk/src/send_queue/upload.rs index f3e61757e..0ba73221d 100644 --- a/crates/matrix-sdk/src/send_queue/upload.rs +++ b/crates/matrix-sdk/src/send_queue/upload.rs @@ -267,7 +267,7 @@ impl RoomSendQueue { created_at, }; - let _ = self.inner.updates.send(RoomSendQueueUpdate::NewLocalEvent(LocalEcho { + self.send_update(RoomSendQueueUpdate::NewLocalEvent(LocalEcho { transaction_id: send_event_txn.clone().into(), content: LocalEchoContent::Event { serialized_event: SerializableEventContent::new(&event_content.into()) @@ -400,7 +400,7 @@ impl RoomSendQueue { created_at, }; - let _ = self.inner.updates.send(RoomSendQueueUpdate::NewLocalEvent(LocalEcho { + self.send_update(RoomSendQueueUpdate::NewLocalEvent(LocalEcho { transaction_id: send_event_txn.clone().into(), content: LocalEchoContent::Event { serialized_event: SerializableEventContent::new(&event_content.into())