mirror of
https://github.com/matrix-org/matrix-rust-sdk.git
synced 2026-06-21 22:58:32 -04:00
fix(sdk): Remove SlidingSyncInner::past_positions.
The patch https://github.com/matrix-org/matrix-rust-sdk/pull/2395 has introduced `SlidingSyncInner::past_positions` as a mechanism to filter duplicated responses. It was a problem because the sliding sync `ops` could easily create corrupted states if they were applied more than once. Since https://github.com/matrix-org/matrix-rust-sdk/pull/3664/, `ops` are ignored. Now, `past_positions` create a problem with the sliding sync native implementation inside Synapse because `pos` can stay the same between multiple responses. While `past_positions` was helpful to fix bugs in the past, it's no longer necessary today. Moreover, it breaks an invariant about `pos`: we must consider it as a blackbox. It means we must ignore if a `pos` value has been received in the past or not. This invariant has been broken for good reasons, but it now creates new issues. This patch removes `past_positions`, along with the associated code (like `Error::ResponseAlreadyReceived` for example).
This commit is contained in:
@@ -1,13 +1,12 @@
|
||||
use std::{
|
||||
collections::BTreeMap,
|
||||
fmt::Debug,
|
||||
num::NonZeroUsize,
|
||||
sync::{Arc, RwLock as StdRwLock},
|
||||
time::Duration,
|
||||
};
|
||||
|
||||
use matrix_sdk_base::sliding_sync::http;
|
||||
use matrix_sdk_common::{ring_buffer::RingBuffer, timer};
|
||||
use matrix_sdk_common::timer;
|
||||
use ruma::OwnedRoomId;
|
||||
use tokio::sync::{broadcast::channel, Mutex as AsyncMutex, RwLock as AsyncRwLock};
|
||||
use url::Url;
|
||||
@@ -285,8 +284,6 @@ impl SlidingSyncBuilder {
|
||||
rooms,
|
||||
|
||||
position: Arc::new(AsyncMutex::new(SlidingSyncPositionMarkers { pos })),
|
||||
// SAFETY: `unwrap` is safe because 20 is not zero.
|
||||
past_positions: StdRwLock::new(RingBuffer::new(NonZeroUsize::new(20).unwrap())),
|
||||
|
||||
sticky: StdRwLock::new(SlidingSyncStickyManager::new(
|
||||
SlidingSyncStickyParameters::new(
|
||||
|
||||
@@ -13,14 +13,6 @@ pub enum Error {
|
||||
#[error("The sliding sync response could not be handled: {0}")]
|
||||
BadResponse(String),
|
||||
|
||||
/// The response we've received from the server has already been received in
|
||||
/// the past because it has a `pos` that we have recently seen.
|
||||
#[error("The sliding sync response has already been received: `pos={pos:?}`")]
|
||||
ResponseAlreadyReceived {
|
||||
/// The `pos`ition that has been received.
|
||||
pos: Option<String>,
|
||||
},
|
||||
|
||||
/// A `SlidingSyncListRequestGenerator` has been used without having been
|
||||
/// initialized. It happens when a response is handled before a request has
|
||||
/// been sent. It usually happens when testing.
|
||||
|
||||
@@ -35,7 +35,7 @@ use std::{
|
||||
use async_stream::stream;
|
||||
use futures_core::stream::Stream;
|
||||
pub use matrix_sdk_base::sliding_sync::http;
|
||||
use matrix_sdk_common::{ring_buffer::RingBuffer, timer};
|
||||
use matrix_sdk_common::timer;
|
||||
use ruma::{
|
||||
api::{client::error::ErrorKind, OutgoingRequest},
|
||||
assign, OwnedEventId, OwnedRoomId, RoomId,
|
||||
@@ -112,9 +112,6 @@ pub(super) struct SlidingSyncInner {
|
||||
/// `position` being updated, before sending a new request.
|
||||
position: Arc<AsyncMutex<SlidingSyncPositionMarkers>>,
|
||||
|
||||
/// Past position markers.
|
||||
past_positions: StdRwLock<RingBuffer<SlidingSyncPositionMarkers>>,
|
||||
|
||||
/// The lists of this Sliding Sync instance.
|
||||
lists: AsyncRwLock<BTreeMap<String, SlidingSyncList>>,
|
||||
|
||||
@@ -258,23 +255,6 @@ impl SlidingSync {
|
||||
) -> Result<UpdateSummary, crate::Error> {
|
||||
let pos = Some(sliding_sync_response.pos.clone());
|
||||
|
||||
{
|
||||
debug!("Update position markers");
|
||||
|
||||
// Look up for this new `pos` in the past position markers.
|
||||
let past_positions = self.inner.past_positions.read().unwrap();
|
||||
|
||||
// The `pos` received by the server has already been received in the past!
|
||||
if past_positions.iter().any(|position| position.pos == pos) {
|
||||
error!(
|
||||
?sliding_sync_response,
|
||||
"Sliding Sync response has ALREADY been handled by the client in the past"
|
||||
);
|
||||
|
||||
return Err(Error::ResponseAlreadyReceived { pos }.into());
|
||||
}
|
||||
}
|
||||
|
||||
let must_process_rooms_response = self.must_process_rooms_response().await;
|
||||
|
||||
trace!(yes = must_process_rooms_response, "Must process rooms response?");
|
||||
@@ -425,10 +405,6 @@ impl SlidingSync {
|
||||
// Save the new position markers.
|
||||
position.pos = pos;
|
||||
|
||||
// Keep this position markers in memory, in case it pops from the server.
|
||||
let mut past_positions = self.inner.past_positions.write().unwrap();
|
||||
past_positions.push(position.clone());
|
||||
|
||||
Ok(update_summary)
|
||||
}
|
||||
|
||||
@@ -734,11 +710,6 @@ impl SlidingSync {
|
||||
yield Ok(updates);
|
||||
}
|
||||
|
||||
// Here, errors we can safely ignore.
|
||||
Err(crate::Error::SlidingSync(Error::ResponseAlreadyReceived { .. })) => {
|
||||
continue;
|
||||
}
|
||||
|
||||
// Here, errors we **cannot** ignore, and that must stop the sync loop.
|
||||
Err(error) => {
|
||||
if error.client_api_error_kind() == Some(&ErrorKind::UnknownPos) {
|
||||
@@ -774,8 +745,8 @@ impl SlidingSync {
|
||||
|
||||
/// Expire the current Sliding Sync session.
|
||||
///
|
||||
/// Expiring a Sliding Sync session means: resetting `pos`. It also cleans
|
||||
/// up the `past_positions`, and resets sticky parameters.
|
||||
/// Expiring a Sliding Sync session means: resetting `pos`. It also resets
|
||||
/// sticky parameters.
|
||||
///
|
||||
/// This should only be used when it's clear that this session was about to
|
||||
/// expire anyways, and should be used only in very specific cases (e.g.
|
||||
@@ -796,9 +767,6 @@ impl SlidingSync {
|
||||
"couldn't invalidate sliding sync frozen state when expiring session: {err}"
|
||||
);
|
||||
}
|
||||
|
||||
let mut past_positions = self.inner.past_positions.write().unwrap();
|
||||
past_positions.clear();
|
||||
}
|
||||
|
||||
// Force invalidation of all the sticky parameters.
|
||||
@@ -1471,11 +1439,6 @@ mod tests {
|
||||
|
||||
// `pos` has been updated.
|
||||
assert_eq!(sliding_sync.inner.position.lock().await.pos, Some("0".to_owned()));
|
||||
|
||||
// `past_positions` has been updated.
|
||||
let past_positions = sliding_sync.inner.past_positions.read().unwrap();
|
||||
assert_eq!(past_positions.len(), 1);
|
||||
assert_eq!(past_positions.get(0).unwrap().pos, Some("0".to_owned()));
|
||||
}
|
||||
|
||||
// Next request doesn't ask to enable the extension.
|
||||
@@ -1503,19 +1466,12 @@ mod tests {
|
||||
|
||||
// `pos` has been updated.
|
||||
assert_eq!(sliding_sync.inner.position.lock().await.pos, Some("1".to_owned()));
|
||||
|
||||
// `past_positions` has been updated.
|
||||
let past_positions = sliding_sync.inner.past_positions.read().unwrap();
|
||||
assert_eq!(past_positions.len(), 2);
|
||||
assert_eq!(past_positions.get(0).unwrap().pos, Some("0".to_owned()));
|
||||
assert_eq!(past_positions.get(1).unwrap().pos, Some("1".to_owned()));
|
||||
}
|
||||
|
||||
// Next request isn't successful because it receives an already
|
||||
// Next request is successful despite it receives an already
|
||||
// received `pos` from the server.
|
||||
{
|
||||
// First response with an already seen `pos`.
|
||||
let _mock_guard1 = Mock::given(SlidingSyncMatcher)
|
||||
let _mock_guard = Mock::given(SlidingSyncMatcher)
|
||||
.respond_with(|request: &Request| {
|
||||
// Repeat the txn_id in the response, if set.
|
||||
let request: PartialRequest = request.body_json().unwrap();
|
||||
@@ -1529,37 +1485,16 @@ mod tests {
|
||||
.mount_as_scoped(&server)
|
||||
.await;
|
||||
|
||||
// Second response with a new `pos`.
|
||||
let _mock_guard2 = Mock::given(SlidingSyncMatcher)
|
||||
.respond_with(|request: &Request| {
|
||||
// Repeat the txn_id in the response, if set.
|
||||
let request: PartialRequest = request.body_json().unwrap();
|
||||
|
||||
ResponseTemplate::new(200).set_body_json(json!({
|
||||
"txn_id": request.txn_id,
|
||||
"pos": "2", // <- new!
|
||||
}))
|
||||
})
|
||||
.mount_as_scoped(&server)
|
||||
.await;
|
||||
|
||||
let next = sync.next().await;
|
||||
assert_matches!(next, Some(Ok(_update_summary)));
|
||||
|
||||
// `pos` has been updated.
|
||||
assert_eq!(sliding_sync.inner.position.lock().await.pos, Some("2".to_owned()));
|
||||
|
||||
// `past_positions` has been updated.
|
||||
let past_positions = sliding_sync.inner.past_positions.read().unwrap();
|
||||
assert_eq!(past_positions.len(), 3);
|
||||
assert_eq!(past_positions.get(0).unwrap().pos, Some("0".to_owned()));
|
||||
assert_eq!(past_positions.get(1).unwrap().pos, Some("1".to_owned()));
|
||||
assert_eq!(past_positions.get(2).unwrap().pos, Some("2".to_owned()));
|
||||
assert_eq!(sliding_sync.inner.position.lock().await.pos, Some("0".to_owned()));
|
||||
}
|
||||
|
||||
// Stop responding with successful requests!
|
||||
//
|
||||
// When responding with M_UNKNOWN_POS, that regenerates the sticky parameters,
|
||||
// When responding with `M_UNKNOWN_POS`, that regenerates the sticky parameters,
|
||||
// so they're reset. It also resets the `pos`.
|
||||
{
|
||||
let _mock_guard = Mock::given(SlidingSyncMatcher)
|
||||
@@ -1578,9 +1513,6 @@ mod tests {
|
||||
// `pos` has been reset.
|
||||
assert!(sliding_sync.inner.position.lock().await.pos.is_none());
|
||||
|
||||
// `past_positions` has been reset.
|
||||
assert!(sliding_sync.inner.past_positions.read().unwrap().is_empty());
|
||||
|
||||
// Next request asks to enable the extension again.
|
||||
let (request, _, _) =
|
||||
sliding_sync.generate_sync_request(&mut LazyTransactionId::new()).await?;
|
||||
|
||||
Reference in New Issue
Block a user