diff --git a/crates/matrix-sdk/src/sliding_sync/mod.rs b/crates/matrix-sdk/src/sliding_sync/mod.rs index 9aec09e4e..762eaa4bc 100644 --- a/crates/matrix-sdk/src/sliding_sync/mod.rs +++ b/crates/matrix-sdk/src/sliding_sync/mod.rs @@ -23,9 +23,8 @@ mod list; mod room; use std::{ - collections::BTreeMap, + collections::{BTreeMap, HashSet}, fmt::Debug, - mem, sync::{ atomic::{AtomicU8, Ordering}, Arc, Mutex, RwLock as StdRwLock, @@ -113,7 +112,7 @@ pub(super) struct SlidingSyncInner { room_subscriptions: StdRwLock>, /// Rooms to unsubscribe, see [`Self::room_subscriptions`]. - room_unsubscriptions: StdRwLock>, + room_unsubscriptions: StdRwLock>, /// Number of times a Sliding Session session has been reset. reset_counter: AtomicU8, @@ -163,8 +162,10 @@ impl SlidingSync { /// Unsubscribe from a given room. pub fn unsubscribe_to_room(&self, room_id: OwnedRoomId) -> Result<()> { + // If removing the subscription was successful… if self.inner.room_subscriptions.write().unwrap().remove(&room_id).is_some() { - self.inner.room_unsubscriptions.write().unwrap().push(room_id); + // … then keep the unsubscription for the next request. + self.inner.room_unsubscriptions.write().unwrap().insert(room_id); self.inner .internal_channel @@ -434,7 +435,7 @@ impl SlidingSync { #[instrument(skip_all, fields(pos))] async fn sync_once(&self) -> Result> { - let (request, request_config) = { + let (request, request_config, requested_room_unsubscriptions) = { // Collect requests for lists. let mut requests_lists = BTreeMap::new(); @@ -461,8 +462,7 @@ impl SlidingSync { // Collect other data. let room_subscriptions = self.inner.room_subscriptions.read().unwrap().clone(); - let unsubscribe_rooms = - mem::take(&mut *self.inner.room_unsubscriptions.write().unwrap()); + let room_unsubscriptions = self.inner.room_unsubscriptions.read().unwrap().clone(); let timeout = Duration::from_secs(30); let extensions = self.prepare_extension_config(pos.as_deref()); @@ -475,12 +475,13 @@ impl SlidingSync { lists: requests_lists, bump_event_types: self.inner.bump_event_types.clone(), room_subscriptions, - unsubscribe_rooms, + unsubscribe_rooms: room_unsubscriptions.iter().cloned().collect(), extensions, }), // Configure long-polling. We need 30 seconds for the long-poll itself, in // addition to 30 more extra seconds for the network delays. RequestConfig::default().timeout(timeout + Duration::from_secs(30)), + room_unsubscriptions, ) }; @@ -544,6 +545,17 @@ impl SlidingSync { // `response_handling_lock`. let response_handling_lock = this.response_handling_lock.lock().await; + // Room unsubscriptions have been received by the server. We can update the + // unsubscriptions buffer. However, it would be an error to empty it entirely as + // more unsubscriptions could have been inserted during the request/response + // dance. So let's cherry-pick which unsubscriptions to remove. + { + let room_unsubscriptions = &mut *this.inner.room_unsubscriptions.write().unwrap(); + + room_unsubscriptions + .retain(|room_id| !requested_room_unsubscriptions.contains(room_id)); + } + // Handle the response. let updates = this.handle_response(response).await?;