fix(sdk): Remove room unsubscriptions once the server has received them.

This bug has been found by @bnjbvr, all the credits go to him. I've just
added some comments around his code.

Prior to this patch, the room unsubscription buffer
(`SlidingSync::room_unsubscriptions`) was reset before the request was
sent. So if something went wrong, the next request would not include the
room unsubscriptions.

This patch updates this behavior. First, it replaces `Vec` by `HashSet`
to avoid a O(n^2) look up.

Second, a copy of room unsubscriptions used by the request is kept, so
that it can be used to cherry-pick which room unsubscription to remove
from the buffer once a response from the server is received. It's
important to not clear the entire room unsubscriptions buffer as more
unsubscriptions could have been inserted meanwhile.
This commit is contained in:
Ivan Enderlin
2023-05-17 10:14:34 +02:00
parent 9b7122768f
commit d8a9408efe

View File

@@ -23,9 +23,8 @@ mod list;
mod room;
use std::{
collections::BTreeMap,
collections::{BTreeMap, HashSet},
fmt::Debug,
mem,
sync::{
atomic::{AtomicU8, Ordering},
Arc, Mutex, RwLock as StdRwLock,
@@ -113,7 +112,7 @@ pub(super) struct SlidingSyncInner {
room_subscriptions: StdRwLock<BTreeMap<OwnedRoomId, v4::RoomSubscription>>,
/// Rooms to unsubscribe, see [`Self::room_subscriptions`].
room_unsubscriptions: StdRwLock<Vec<OwnedRoomId>>,
room_unsubscriptions: StdRwLock<HashSet<OwnedRoomId>>,
/// Number of times a Sliding Session session has been reset.
reset_counter: AtomicU8,
@@ -163,8 +162,10 @@ impl SlidingSync {
/// Unsubscribe from a given room.
pub fn unsubscribe_to_room(&self, room_id: OwnedRoomId) -> Result<()> {
// If removing the subscription was successful…
if self.inner.room_subscriptions.write().unwrap().remove(&room_id).is_some() {
self.inner.room_unsubscriptions.write().unwrap().push(room_id);
// … then keep the unsubscription for the next request.
self.inner.room_unsubscriptions.write().unwrap().insert(room_id);
self.inner
.internal_channel
@@ -434,7 +435,7 @@ impl SlidingSync {
#[instrument(skip_all, fields(pos))]
async fn sync_once(&self) -> Result<Option<UpdateSummary>> {
let (request, request_config) = {
let (request, request_config, requested_room_unsubscriptions) = {
// Collect requests for lists.
let mut requests_lists = BTreeMap::new();
@@ -461,8 +462,7 @@ impl SlidingSync {
// Collect other data.
let room_subscriptions = self.inner.room_subscriptions.read().unwrap().clone();
let unsubscribe_rooms =
mem::take(&mut *self.inner.room_unsubscriptions.write().unwrap());
let room_unsubscriptions = self.inner.room_unsubscriptions.read().unwrap().clone();
let timeout = Duration::from_secs(30);
let extensions = self.prepare_extension_config(pos.as_deref());
@@ -475,12 +475,13 @@ impl SlidingSync {
lists: requests_lists,
bump_event_types: self.inner.bump_event_types.clone(),
room_subscriptions,
unsubscribe_rooms,
unsubscribe_rooms: room_unsubscriptions.iter().cloned().collect(),
extensions,
}),
// Configure long-polling. We need 30 seconds for the long-poll itself, in
// addition to 30 more extra seconds for the network delays.
RequestConfig::default().timeout(timeout + Duration::from_secs(30)),
room_unsubscriptions,
)
};
@@ -544,6 +545,17 @@ impl SlidingSync {
// `response_handling_lock`.
let response_handling_lock = this.response_handling_lock.lock().await;
// Room unsubscriptions have been received by the server. We can update the
// unsubscriptions buffer. However, it would be an error to empty it entirely as
// more unsubscriptions could have been inserted during the request/response
// dance. So let's cherry-pick which unsubscriptions to remove.
{
let room_unsubscriptions = &mut *this.inner.room_unsubscriptions.write().unwrap();
room_unsubscriptions
.retain(|room_id| !requested_room_unsubscriptions.contains(room_id));
}
// Handle the response.
let updates = this.handle_response(response).await?;