send queue: control enabled on a per-room basis in addition to globally

This commit is contained in:
Benjamin Bouvier
2024-06-06 18:08:21 +02:00
parent 9c1d62a039
commit e5487da160
8 changed files with 301 additions and 120 deletions

View File

@@ -146,14 +146,12 @@ pub trait ProgressWatcher: Send + Sync {
fn transmission_progress(&self, progress: TransmissionProgress);
}
/// A listener to the global (client-wide) status of the send queue.
/// A listener to the global (client-wide) error reporter of the send queue.
#[uniffi::export(callback_interface)]
pub trait SendQueueStatusListener: Sync + Send {
/// Called every time the send queue has received a new status.
///
/// This can be set automatically (in case of sending failure), or manually
/// via an API call.
fn on_update(&self, new_value: bool);
pub trait SendQueueRoomErrorListener: Sync + Send {
/// Called every time the send queue has ran into an error for a given room,
/// which will disable the send queue for that particular room.
fn on_error(&self, room_id: String, error: ClientError);
}
#[derive(Clone, Copy, uniffi::Record)]
@@ -315,18 +313,15 @@ impl Client {
Ok(())
}
/// Enables or disables the send queue, according to the given parameter.
/// Enables or disables all the room send queues at once.
///
/// The send queue automatically disables itself whenever sending an
/// event with it failed (e.g., sending an event via the high-level Timeline
/// object), so it's required to manually re-enable it as soon as
/// connectivity is back on the device.
pub fn enable_send_queue(&self, enable: bool) {
if enable {
self.inner.send_queue().enable();
} else {
self.inner.send_queue().disable();
}
/// When connectivity is lost on a device, it is recommended to disable the
/// room sending queues.
///
/// This can be controlled for individual rooms, using
/// [`Room::enable_send_queue`].
pub fn enable_all_send_queues(&self, enable: bool) {
self.inner.send_queue().set_enabled(enable);
}
/// Subscribe to the global enablement status of the send queue, at the
@@ -336,17 +331,19 @@ impl Client {
/// the enablement status.
pub fn subscribe_to_send_queue_status(
&self,
listener: Box<dyn SendQueueStatusListener>,
listener: Box<dyn SendQueueRoomErrorListener>,
) -> Arc<TaskHandle> {
let mut subscriber = self.inner.send_queue().subscribe_status();
let mut subscriber = self.inner.send_queue().subscribe_errors();
Arc::new(TaskHandle::new(RUNTIME.spawn(async move {
// Call with the initial value.
listener.on_update(subscriber.next_now());
// Call every time the value changes.
while let Some(next_val) = subscriber.next().await {
listener.on_update(next_val);
loop {
match subscriber.recv().await {
Ok(report) => listener
.on_error(report.room_id.to_string(), ClientError::new(report.error)),
Err(err) => {
error!("error when listening to the send queue error reporter: {err}");
}
}
}
})))
}

View File

@@ -14,7 +14,7 @@ pub enum ClientError {
}
impl ClientError {
fn new<E: Display>(error: E) -> Self {
pub(crate) fn new<E: Display>(error: E) -> Self {
Self::Generic { msg: error.to_string() }
}
}

View File

@@ -687,6 +687,17 @@ impl Room {
.await?;
Ok(())
}
/// Returns whether the send queue for that particular room is enabled or
/// not.
pub fn is_send_queue_enabled(&self) -> bool {
self.inner.send_queue().is_enabled()
}
/// Enable or disable the send queue for that particular room.
pub fn enable_send_queue(&self, enable: bool) {
self.inner.send_queue().set_enabled(enable);
}
}
/// Generates a `matrix.to` permalink to the given room alias.

View File

@@ -142,7 +142,7 @@ async fn test_retry_failed() {
mock_encryption_state(&server, false).await;
client.send_queue().enable();
client.send_queue().set_enabled(true);
let room = client.get_room(room_id).unwrap();
let timeline = Arc::new(room.timeline().await.unwrap());
@@ -173,9 +173,12 @@ async fn test_retry_failed() {
.mount(&server)
.await;
assert!(!client.send_queue().is_enabled());
// This doesn't disable the send queue at the global level…
assert!(client.send_queue().is_enabled());
// …but does so at the local level.
assert!(!room.send_queue().is_enabled());
client.send_queue().enable();
room.send_queue().set_enabled(true);
// Let the send queue handle the event.
tokio::time::sleep(Duration::from_millis(300)).await;

View File

@@ -182,7 +182,7 @@ async fn test_retry_order() {
.await;
// Retry the second message first
client.send_queue().enable();
client.send_queue().set_enabled(true);
// Wait 200ms for the first msg, 100ms for the second, 300ms for overhead
sleep(Duration::from_millis(600)).await;

View File

@@ -22,7 +22,6 @@ use std::{
},
};
use eyeball::{SharedObservable, Subscriber};
use matrix_sdk_base::RoomState;
use matrix_sdk_common::executor::{spawn, JoinHandle};
use ruma::{
@@ -51,8 +50,13 @@ impl SendQueue {
Self { client }
}
#[inline(always)]
fn data(&self) -> &SendQueueData {
&self.client.inner.send_queue_data
}
fn for_room(&self, room: Room) -> RoomSendQueue {
let data = &self.client.inner.send_queue_data;
let data = self.data();
let mut map = data.rooms.write().unwrap();
@@ -63,7 +67,8 @@ impl SendQueue {
let owned_room_id = room_id.to_owned();
let room_q = RoomSendQueue::new(
data.globally_enabled.clone(),
data.globally_enabled.load(Ordering::SeqCst),
data.error_reporter.clone(),
data.is_dropping.clone(),
&self.client,
owned_room_id.clone(),
@@ -72,51 +77,49 @@ impl SendQueue {
room_q
}
/// Enable the send queue for the entire client, i.e. all rooms.
/// Enable or disable the send queue for the entire client, i.e. all rooms.
///
/// If we're disabling the queue, and requests were being sent, they're not
/// aborted, and will continue until a status resolves (error responses
/// will keep the events in the buffer of events to send later). The
/// disablement will happen before the next event is sent.
///
/// This may wake up background tasks and resume sending of events in the
/// background.
pub fn enable(&self) {
if self.client.inner.send_queue_data.globally_enabled.set_if_not_eq(true).is_some() {
debug!("globally enabling send queue");
let rooms = self.client.inner.send_queue_data.rooms.read().unwrap();
// Wake up the rooms, in case events have been queued in the meanwhile.
for room in rooms.values() {
room.inner.notifier.notify_one();
}
}
}
pub fn set_enabled(&self, enabled: bool) {
debug!(?enabled, "setting global send queue enablement");
/// Disable the send queue for the entire client, i.e. all rooms.
///
/// If requests were being sent, they're not aborted, and will continue
/// until a status resolves (error responses will keep the events in the
/// buffer of events to send later). The disablement will happen before
/// the next event is sent.
pub fn disable(&self) {
// Note: it's not required to wake the tasks just to let them know they're
// disabled:
// - either they were busy, will continue to the next iteration and realize
// the queue is now disabled,
// - or they were not, and it's not worth it waking them to let them they're
// disabled, which causes them to go to sleep again.
debug!("globally disabling send queue");
self.client.inner.send_queue_data.globally_enabled.set(false);
self.data().globally_enabled.store(enabled, Ordering::SeqCst);
let rooms = self.data().rooms.read().unwrap();
for room in rooms.values() {
room.set_enabled(enabled);
}
}
/// Returns whether the send queue is enabled, at a client-wide
/// granularity.
pub fn is_enabled(&self) -> bool {
self.client.inner.send_queue_data.globally_enabled.get()
self.data().globally_enabled.load(Ordering::SeqCst)
}
/// A subscriber to the enablement status (enabled or disabled) of the
/// send queue.
pub fn subscribe_status(&self) -> Subscriber<bool> {
self.client.inner.send_queue_data.globally_enabled.subscribe()
pub fn subscribe_errors(&self) -> broadcast::Receiver<SendQueueRoomError> {
self.data().error_reporter.subscribe()
}
}
/// A specific room ran into an error, and has disabled itself.
#[derive(Clone, Debug)]
pub struct SendQueueRoomError {
/// Which room is failing?
pub room_id: OwnedRoomId,
/// The error the room has ran into, when trying to send an event.
pub error: Arc<crate::Error>,
}
impl Client {
/// Returns a [`SendQueue`] that handles sending, retrying and not
/// forgetting about messages that are to be sent.
@@ -130,7 +133,13 @@ pub(super) struct SendQueueData {
rooms: SyncRwLock<BTreeMap<OwnedRoomId, RoomSendQueue>>,
/// Is the whole mechanism enabled or disabled?
globally_enabled: SharedObservable<bool>,
///
/// This is only kept in memory to initialize new room queues with an
/// initial enablement state.
globally_enabled: AtomicBool,
/// Global error updates for the send queue.
error_reporter: broadcast::Sender<SendQueueRoomError>,
/// Are we currently dropping the Client?
is_dropping: Arc<AtomicBool>,
@@ -139,9 +148,12 @@ pub(super) struct SendQueueData {
impl SendQueueData {
/// Create the data for a send queue, in the given enabled state.
pub fn new(globally_enabled: bool) -> Self {
let (sender, _) = broadcast::channel(32);
Self {
rooms: Default::default(),
globally_enabled: SharedObservable::new(globally_enabled),
globally_enabled: AtomicBool::new(globally_enabled),
error_reporter: sender,
is_dropping: Arc::new(false.into()),
}
}
@@ -184,7 +196,8 @@ impl std::fmt::Debug for RoomSendQueue {
impl RoomSendQueue {
fn new(
globally_enabled: SharedObservable<bool>,
globally_enabled: bool,
global_error_reporter: broadcast::Sender<SendQueueRoomError>,
is_dropping: Arc<AtomicBool>,
client: &Client,
room_id: OwnedRoomId,
@@ -195,13 +208,15 @@ impl RoomSendQueue {
let notifier = Arc::new(Notify::new());
let weak_room = WeakRoom::new(WeakClient::from_client(client), room_id);
let locally_enabled = Arc::new(AtomicBool::new(globally_enabled));
let task = spawn(Self::sending_task(
weak_room.clone(),
queue.clone(),
notifier.clone(),
updates_sender.clone(),
globally_enabled,
locally_enabled.clone(),
global_error_reporter,
is_dropping,
));
@@ -212,6 +227,7 @@ impl RoomSendQueue {
_task: task,
queue,
notifier,
locally_enabled,
}),
}
}
@@ -282,7 +298,8 @@ impl RoomSendQueue {
queue: QueueStorage,
notifier: Arc<Notify>,
updates: broadcast::Sender<RoomSendQueueUpdate>,
globally_enabled: SharedObservable<bool>,
locally_enabled: Arc<AtomicBool>,
global_error_reporter: broadcast::Sender<SendQueueRoomError>,
is_dropping: Arc<AtomicBool>,
) {
info!("spawned the sending task");
@@ -294,7 +311,7 @@ impl RoomSendQueue {
break;
}
if !globally_enabled.get() {
if !locally_enabled.load(Ordering::SeqCst) {
trace!("not enabled, sleeping");
// Wait for an explicit wakeup.
notifier.notified().await;
@@ -347,13 +364,19 @@ impl RoomSendQueue {
// try to remove an item, while it's still marked as being sent, resulting in a
// cancellation failure.
// Disable the queue after an error.
// See comment in [`SendQueue::disable()`].
globally_enabled.set(false);
// Disable the queue for this room after an error.
locally_enabled.store(false, Ordering::SeqCst);
let error = Arc::new(err);
let _ = global_error_reporter.send(SendQueueRoomError {
room_id: room.room_id().to_owned(),
error: error.clone(),
});
let _ = updates.send(RoomSendQueueUpdate::SendError {
transaction_id: queued_event.transaction_id,
error: Arc::new(err),
error,
});
}
}
@@ -361,6 +384,22 @@ impl RoomSendQueue {
info!("exited sending task");
}
/// Returns whether the room is enabled, at the room level.
pub fn is_enabled(&self) -> bool {
self.inner.locally_enabled.load(Ordering::SeqCst)
}
/// Set the locally enabled flag for this room queue.
pub fn set_enabled(&self, enabled: bool) {
self.inner.locally_enabled.store(enabled, Ordering::SeqCst);
// No need to wake a task to tell it it's been disabled, so only notify if we're
// re-enabling the queue.
if enabled {
self.inner.notifier.notify_one();
}
}
}
struct RoomSendQueueInner {
@@ -389,6 +428,10 @@ struct RoomSendQueueInner {
/// enabled statuses), or the associated room [`QueueStorage`].
notifier: Arc<Notify>,
/// Should the room process new events or not (because e.g. it might be
/// running off the network)?
locally_enabled: Arc<AtomicBool>,
/// Handle to the actual sending task. Unused, but kept alive along this
/// data structure.
_task: JoinHandle<()>,
@@ -624,11 +667,7 @@ mod tests {
let _watcher = q.subscribe().await;
if enabled {
client.send_queue().enable();
} else {
client.send_queue().disable();
}
client.send_queue().set_enabled(enabled);
}
drop(client);

View File

@@ -1,7 +1,6 @@
use std::{sync::Arc, time::Duration};
use assert_matches2::{assert_let, assert_matches};
use futures_util::FutureExt as _;
use matrix_sdk::{
send_queue::{LocalEcho, RoomSendQueueError, RoomSendQueueUpdate},
test_utils::logged_in_client_with_server,
@@ -101,7 +100,7 @@ async fn test_nothing_sent_when_disabled() {
let event_id = event_id!("$1");
mock_send_event(event_id).expect(0).mount(&server).await;
client.send_queue().disable();
client.send_queue().set_enabled(false);
// A message is queued, but never sent.
room.send_queue()
@@ -211,13 +210,14 @@ async fn test_smoke() {
}
#[async_test]
async fn test_error() {
async fn test_error_then_locally_reenabling() {
let (client, server) = logged_in_client_with_server().await;
let mut global_status = client.send_queue().subscribe_status();
let mut errors = client.send_queue().subscribe_errors();
// Starting with a globally enabled queue.
assert!(client.send_queue().is_enabled());
assert!(global_status.next_now());
assert!(errors.is_empty());
// Mark the room as joined.
let room_id = room_id!("!a:b.c");
@@ -285,8 +285,8 @@ async fn test_error() {
assert!(watch.is_empty());
// No new update on the global status.
assert!(global_status.next().now_or_never().is_none());
// No new update on the global error reporter.
assert!(errors.is_empty());
drop(lock_guard);
@@ -308,28 +308,24 @@ async fn test_error() {
assert!(watch.is_empty());
assert!(!global_status.next().await.unwrap(), "the queue should be disabled next");
assert!(!client.send_queue().is_enabled());
let report = errors.recv().await.unwrap();
assert_eq!(report.room_id, room.room_id());
// The send queue is still globally enabled,
assert!(client.send_queue().is_enabled());
// But the room send queue is disabled.
assert!(!room.send_queue().is_enabled());
server.reset().await;
Mock::given(method("PUT"))
.and(path_regex(r"^/_matrix/client/r0/rooms/.*/send/.*"))
.and(header("authorization", "Bearer 1234"))
.respond_with(ResponseTemplate::new(200).set_body_json(json!({
"event_id": "$42"
})))
.expect(1)
.mount(&server)
.await;
mock_send_event(event_id!("$42")).expect(1).mount(&server).await;
// Re-enabling the queue will re-send the same message in that room.
client.send_queue().enable();
// Re-enabling the *room* queue will re-send the same message in that room.
room.send_queue().set_enabled(true);
assert!(errors.is_empty());
assert!(
global_status.next().await.unwrap(),
"the queue should be re-enabled after the user action"
);
assert!(client.send_queue().is_enabled());
assert!(room.send_queue().is_enabled());
assert_let!(
Ok(Ok(RoomSendQueueUpdate::SentEvent { event_id, transaction_id: txn3 })) =
@@ -339,7 +335,94 @@ async fn test_error() {
assert_eq!(txn1, txn3);
assert_eq!(event_id, event_id!("$42"));
assert!(global_status.next().now_or_never().is_none());
assert!(errors.is_empty());
}
#[async_test]
async fn test_error_then_globally_reenabling() {
let (client, server) = logged_in_client_with_server().await;
let mut errors = client.send_queue().subscribe_errors();
// Starting with a globally enabled queue.
assert!(client.send_queue().is_enabled());
assert!(errors.is_empty());
// Mark the room as joined.
let room_id = room_id!("!a:b.c");
let room = mock_sync_with_new_room(
|builder| {
builder.add_joined_room(JoinedRoomBuilder::new(room_id));
},
&client,
&server,
room_id,
)
.await;
let q = room.send_queue();
let (local_echoes, mut watch) = q.subscribe().await;
assert!(local_echoes.is_empty());
assert!(watch.is_empty());
q.send(RoomMessageEventContent::text_plain("1").into()).await.unwrap();
assert_let!(
Ok(Ok(RoomSendQueueUpdate::NewLocalEvent(LocalEcho {
content: AnyMessageLikeEventContent::RoomMessage(msg),
transaction_id: txn1,
..
}))) = timeout(Duration::from_secs(1), watch.recv()).await
);
assert_eq!(msg.body(), "1");
assert!(watch.is_empty());
// We should receive an error because the mocking endpoint hasn't been set up
// yet.
let report = errors.recv().await.unwrap();
assert_eq!(report.room_id, room.room_id());
// The exponential backoff used when retrying a request introduces a bit of
// non-determinism, so let it fail after a large amount of time (10
// seconds).
assert_let!(
Ok(Ok(RoomSendQueueUpdate::SendError { transaction_id: txn2, .. })) =
timeout(Duration::from_secs(10), watch.recv()).await
);
// It's the same transaction id that's used to signal the send error.
assert_eq!(txn1, txn2);
// The send queue is still globally enabled,
assert!(client.send_queue().is_enabled());
// But the room send queue is disabled.
assert!(!room.send_queue().is_enabled());
assert!(watch.is_empty());
server.reset().await;
mock_encryption_state(&server, false).await;
mock_send_event(event_id!("$42")).expect(1).mount(&server).await;
// Re-enabling the global queue will cause the event to be sent.
client.send_queue().set_enabled(true);
assert!(client.send_queue().is_enabled());
assert!(room.send_queue().is_enabled());
assert_let!(
Ok(Ok(RoomSendQueueUpdate::SentEvent { event_id, transaction_id: txn3 })) =
timeout(Duration::from_secs(1), watch.recv()).await
);
assert_eq!(txn1, txn3);
assert_eq!(event_id, event_id!("$42"));
assert!(errors.is_empty());
assert!(watch.is_empty());
}
#[async_test]
@@ -359,15 +442,16 @@ async fn test_reenabling_queue() {
)
.await;
let mut global_status = client.send_queue().subscribe_status();
let errors = client.send_queue().subscribe_errors();
assert!(global_status.next_now());
assert!(errors.is_empty());
// When I start with a disabled send queue,
client.send_queue().disable();
client.send_queue().set_enabled(false);
assert!(!client.send_queue().is_enabled());
assert!(!global_status.next().await.unwrap());
assert!(!room.send_queue().is_enabled());
assert!(errors.is_empty());
let q = room.send_queue();
@@ -423,11 +507,12 @@ async fn test_reenabling_queue() {
.mount(&server)
.await;
// But when reenabling the queue,
client.send_queue().enable();
// But when reenabling the queue globally,
client.send_queue().set_enabled(true);
assert!(client.send_queue().is_enabled());
assert!(global_status.next().await.unwrap());
assert!(room.send_queue().is_enabled());
assert!(errors.is_empty());
// They're sent, in the same ordering.
for i in 1..=3 {
@@ -438,7 +523,53 @@ async fn test_reenabling_queue() {
assert_eq!(event_id.as_str(), format!("${i}"));
}
assert!(global_status.next().now_or_never().is_none());
assert!(errors.is_empty());
assert!(watch.is_empty());
}
#[async_test]
async fn test_disjoint_enabled_status() {
let (client, server) = logged_in_client_with_server().await;
// Mark the room as joined.
let room_id1 = room_id!("!a:b.c");
let room_id2 = room_id!("!b:b.c");
let room1 = mock_sync_with_new_room(
|builder| {
builder
.add_joined_room(JoinedRoomBuilder::new(room_id1))
.add_joined_room(JoinedRoomBuilder::new(room_id2));
},
&client,
&server,
room_id1,
)
.await;
let room2 = client.get_room(room_id2).unwrap();
// When I start with a disabled send queue,
client.send_queue().set_enabled(false);
// All queues are marked as disabled.
assert!(!client.send_queue().is_enabled());
assert!(!room1.send_queue().is_enabled());
assert!(!room2.send_queue().is_enabled());
// When I enable globally,
client.send_queue().set_enabled(true);
// This enables globally and locally.
assert!(client.send_queue().is_enabled());
assert!(room1.send_queue().is_enabled());
assert!(room2.send_queue().is_enabled());
// I can disable one locally,
room1.send_queue().set_enabled(false);
// And it doesn't touch the state of other rooms.
assert!(client.send_queue().is_enabled());
assert!(!room1.send_queue().is_enabled());
assert!(room2.send_queue().is_enabled());
}
#[async_test]
@@ -649,12 +780,12 @@ async fn test_abort_reenable() {
)
.await;
let mut global_status = client.send_queue().subscribe_status();
let mut errors = client.send_queue().subscribe_errors();
assert!(global_status.next_now());
assert!(errors.is_empty());
// When I start with an enabled sending queue,
client.send_queue().enable();
client.send_queue().set_enabled(true);
assert!(client.send_queue().is_enabled());
@@ -679,7 +810,10 @@ async fn test_abort_reenable() {
assert_eq!(msg.body(), format!("hey there"));
// Waiting for the global status to report the queue is getting disabled.
assert!(!global_status.next().await.unwrap());
let report = errors.recv().await.unwrap();
assert_eq!(report.room_id, room.room_id());
assert!(!room.send_queue().is_enabled());
assert!(client.send_queue().is_enabled());
// Aborting the sending should work.
assert!(abort_send_handle.abort().await);
@@ -696,4 +830,5 @@ async fn test_abort_reenable() {
);
assert!(watch.is_empty());
assert!(errors.is_empty());
}

View File

@@ -438,11 +438,7 @@ impl App {
Char('Q') => {
let q = self.client.send_queue();
let enabled = q.is_enabled();
if enabled {
q.disable();
} else {
q.enable();
}
q.set_enabled(!enabled);
}
Char('M') => {