feat(sliding sync): remove the response_handling_lock and extend the position's lock responsibilities

In the previous situation, we had two locks with similar responsibilities, the `response_handling_lock`
and the `position` lock. The latter *almost* covered the former's critical zone, albeit for a single
function call, which left room for a deadlock situation (latter taken, then former, then latter).

This removes the former, and extends the critical zone of the latter up to the end of the response handling,
removing the possibility of the deadlock entirely.
This commit is contained in:
Benjamin Bouvier
2023-08-21 10:33:43 +02:00
parent 54771eadcf
commit 6988bd1e6f
3 changed files with 41 additions and 49 deletions

View File

@@ -281,7 +281,6 @@ impl SlidingSyncBuilder {
room_unsubscriptions: Default::default(),
internal_channel: internal_channel_sender,
response_handling_lock: Arc::new(AsyncMutex::new(())),
poll_timeout: self.poll_timeout,
network_timeout: self.network_timeout,

View File

@@ -10,7 +10,10 @@ use matrix_sdk_base::{StateStore, StoreError};
use ruma::UserId;
use tracing::{trace, warn};
use super::{FrozenSlidingSync, FrozenSlidingSyncList, SlidingSync, SlidingSyncList};
use super::{
FrozenSlidingSync, FrozenSlidingSyncList, SlidingSync, SlidingSyncList,
SlidingSyncPositionMarkers,
};
use crate::{sliding_sync::SlidingSyncListCachePolicy, Client, Result};
/// Be careful: as this is used as a storage key; changing it requires migrating
@@ -59,7 +62,10 @@ async fn clean_storage(
}
/// Store the `SlidingSync`'s state in the storage.
pub(super) async fn store_sliding_sync_state(sliding_sync: &SlidingSync) -> Result<()> {
pub(super) async fn store_sliding_sync_state(
sliding_sync: &SlidingSync,
position: &SlidingSyncPositionMarkers,
) -> Result<()> {
let storage_key = &sliding_sync.inner.storage_key;
let instance_storage_key = format_storage_key_for_sliding_sync(storage_key);
@@ -71,7 +77,7 @@ pub(super) async fn store_sliding_sync_state(sliding_sync: &SlidingSync) -> Resu
storage
.set_custom_value(
instance_storage_key.as_bytes(),
serde_json::to_vec(&FrozenSlidingSync::new(sliding_sync).await)?,
serde_json::to_vec(&FrozenSlidingSync::new(position).await)?,
)
.await?;
@@ -276,7 +282,9 @@ mod tests {
list_bar.set_maximum_number_of_rooms(Some(1337));
}
assert!(sliding_sync.cache_to_storage().await.is_ok());
let position_guard = sliding_sync.inner.position.lock().await;
assert!(sliding_sync.cache_to_storage(&*position_guard).await.is_ok());
storage_key
};
@@ -395,10 +403,13 @@ mod tests {
// Emulate some data to be cached.
let delta_token = "delta_token".to_owned();
sliding_sync.inner.position.lock().await.delta_token = Some(delta_token.clone());
{
let mut position_guard = sliding_sync.inner.position.lock().await;
position_guard.delta_token = Some(delta_token.clone());
// Then, we can correctly cache the sliding sync instance.
store_sliding_sync_state(&sliding_sync).await?;
// Then, we can correctly cache the sliding sync instance.
store_sliding_sync_state(&sliding_sync, &*position_guard).await?;
}
// The delta token has been correctly written to the state store (but not the
// to_device_since, since it's in the other store).

View File

@@ -127,9 +127,6 @@ pub(super) struct SlidingSyncInner {
/// Internal channel used to pass messages between Sliding Sync and other
/// types.
internal_channel: Sender<SlidingSyncInternalMessage>,
/// A lock to ensure that responses are handled one at a time.
response_handling_lock: Arc<AsyncMutex<()>>,
}
impl SlidingSync {
@@ -137,8 +134,8 @@ impl SlidingSync {
Self { inner: Arc::new(inner) }
}
async fn cache_to_storage(&self) -> Result<()> {
cache::store_sliding_sync_state(self).await
async fn cache_to_storage(&self, position: &SlidingSyncPositionMarkers) -> Result<()> {
cache::store_sliding_sync_state(self, position).await
}
/// Create a new [`SlidingSyncBuilder`].
@@ -282,7 +279,7 @@ impl SlidingSync {
async fn handle_response(
&self,
mut sliding_sync_response: v4::Response,
mut position_guard: OwnedMutexGuard<SlidingSyncPositionMarkers>,
position: &mut SlidingSyncPositionMarkers,
) -> Result<UpdateSummary, crate::Error> {
let pos = Some(sliding_sync_response.pos.clone());
@@ -427,16 +424,12 @@ impl SlidingSync {
// Everything went well, we can update the position markers.
//
// Save the new position markers.
position_guard.pos = pos;
position_guard.delta_token = sliding_sync_response.delta_token.clone();
position.pos = pos;
position.delta_token = sliding_sync_response.delta_token.clone();
// Keep this position markers in memory, in case it pops from the server.
let mut past_positions = self.inner.past_positions.write().unwrap();
past_positions.push(position_guard.clone());
// Release the position markers lock.
// It means that other requests can start to be sent.
drop(position_guard);
past_positions.push(position.clone());
Ok(update_summary)
}
@@ -548,7 +541,7 @@ impl SlidingSync {
#[instrument(skip_all, fields(pos))]
async fn sync_once(&self) -> Result<UpdateSummary> {
let (request, request_config, requested_room_unsubscriptions, position_guard) =
let (request, request_config, requested_room_unsubscriptions, mut position_guard) =
self.generate_sync_request(&mut LazyTransactionId::new()).await?;
debug!("Sending request");
@@ -636,9 +629,8 @@ impl SlidingSync {
debug!("Start handling response");
// In case the task running this future is detached, we must
// ensure responses are handled one at a time, hence we lock the
// `response_handling_lock`.
let response_handling_lock = this.inner.response_handling_lock.lock().await;
// ensure responses are handled one at a time. At this point we still own
// `position_guard`, so we're fine.
// Room unsubscriptions have been received by the server. We can update the
// unsubscriptions buffer. However, it would be an error to empty it entirely as
@@ -652,13 +644,13 @@ impl SlidingSync {
}
// Handle the response.
let updates = this.handle_response(response, position_guard).await?;
let updates = this.handle_response(response, &mut *position_guard).await?;
this.cache_to_storage().await?;
this.cache_to_storage(&*position_guard).await?;
// Release the response handling lock.
// It means that other responses can be handled.
drop(response_handling_lock);
// Release the position guard lock.
// It means that other responses can be generated and then handled later.
drop(position_guard);
debug!("Done handling response");
@@ -870,9 +862,7 @@ struct FrozenSlidingSync {
}
impl FrozenSlidingSync {
async fn new(sliding_sync: &SlidingSync) -> Self {
let position = sliding_sync.inner.position.lock().await;
async fn new(position: &SlidingSyncPositionMarkers) -> Self {
// The to-device token must be saved in the `FrozenCryptoSlidingSync` now.
Self { delta_token: position.delta_token.clone(), to_device_since: None }
}
@@ -1166,7 +1156,8 @@ mod tests {
// FrozenSlidingSync doesn't contain the to_device_token anymore, as it's saved
// in the crypto store since PR #2323.
let frozen = FrozenSlidingSync::new(&sliding_sync).await;
let position_guard = sliding_sync.inner.position.lock().await;
let frozen = FrozenSlidingSync::new(&*position_guard).await;
assert!(frozen.to_device_since.is_none());
Ok(())
@@ -1849,12 +1840,9 @@ mod tests {
.await?;
{
let sliding_sync_position_guard =
sliding_sync.inner.position.clone().lock_owned().await;
let mut position_guard = sliding_sync.inner.position.clone().lock_owned().await;
sliding_sync
.handle_response(server_response.clone(), sliding_sync_position_guard)
.await?;
sliding_sync.handle_response(server_response.clone(), &mut *position_guard).await?;
}
// E2EE has been properly handled.
@@ -1883,12 +1871,9 @@ mod tests {
.await?;
{
let sliding_sync_position_guard =
sliding_sync.inner.position.clone().lock_owned().await;
let mut position_guard = sliding_sync.inner.position.clone().lock_owned().await;
sliding_sync
.handle_response(server_response.clone(), sliding_sync_position_guard)
.await?;
sliding_sync.handle_response(server_response.clone(), &mut *position_guard).await?;
}
// E2EE response has been ignored.
@@ -1920,12 +1905,9 @@ mod tests {
.await?;
{
let sliding_sync_position_guard =
sliding_sync.inner.position.clone().lock_owned().await;
let mut position_guard = sliding_sync.inner.position.clone().lock_owned().await;
sliding_sync
.handle_response(server_response.clone(), sliding_sync_position_guard)
.await?;
sliding_sync.handle_response(server_response.clone(), &mut *position_guard).await?;
}
// E2EE has been properly handled.