fix(sdk): SlidingSync has a lock for both pos _and_ delta_token

fix(sdk): `SlidingSync` has a lock for both `pos` _and_ `delta_token`
This commit is contained in:
Ivan Enderlin
2023-03-09 17:10:42 +01:00
committed by GitHub
2 changed files with 46 additions and 26 deletions

View File

@@ -17,7 +17,7 @@ use url::Url;
use super::{
Error, FrozenSlidingSync, FrozenSlidingSyncList, SlidingSync, SlidingSyncInner,
SlidingSyncList, SlidingSyncListBuilder, SlidingSyncRoom,
SlidingSyncList, SlidingSyncListBuilder, SlidingSyncPositionMarkers, SlidingSyncRoom,
};
use crate::{Client, Result};
@@ -232,7 +232,7 @@ impl SlidingSyncBuilder {
pub async fn build(mut self) -> Result<SlidingSync> {
let client = self.client.ok_or(Error::BuildMissingField("client"))?;
let mut delta_token_inner = None;
let mut delta_token = None;
let mut rooms_found: BTreeMap<OwnedRoomId, SlidingSyncRoom> = BTreeMap::new();
if let Some(storage_key) = &self.storage_key {
@@ -261,12 +261,13 @@ impl SlidingSyncBuilder {
}
}
if let Some(FrozenSlidingSync { to_device_since, delta_token }) = client
.store()
.get_custom_value(storage_key.as_bytes())
.await?
.map(|v| serde_json::from_slice::<FrozenSlidingSync>(&v))
.transpose()?
if let Some(FrozenSlidingSync { to_device_since, delta_token: frozen_delta_token }) =
client
.store()
.get_custom_value(storage_key.as_bytes())
.await?
.map(|v| serde_json::from_slice::<FrozenSlidingSync>(&v))
.transpose()?
{
trace!("frozen for generic found");
@@ -278,7 +279,7 @@ impl SlidingSyncBuilder {
}
}
delta_token_inner = delta_token;
delta_token = frozen_delta_token;
}
trace!("sync unfrozen done");
@@ -300,8 +301,11 @@ impl SlidingSyncBuilder {
extensions: Mutex::new(self.extensions),
reset_counter: Default::default(),
pos: StdRwLock::new(Observable::new(None)),
delta_token: StdRwLock::new(Observable::new(delta_token_inner)),
position: StdRwLock::new(SlidingSyncPositionMarkers {
pos: Observable::new(None),
delta_token: Observable::new(delta_token),
}),
subscriptions: StdRwLock::new(self.subscriptions),
unsubscribe: Default::default(),
}))

View File

@@ -666,10 +666,8 @@ pub(super) struct SlidingSyncInner {
/// The storage key to keep this cache at and load it from
storage_key: Option<String>,
/// The `pos` marker.
pos: StdRwLock<Observable<Option<String>>>,
delta_token: StdRwLock<Observable<Option<String>>>,
/// The `pos` and `delta_token` markers.
position: StdRwLock<SlidingSyncPositionMarkers>,
/// The lists of this Sliding Sync instance.
lists: StdRwLock<BTreeMap<String, SlidingSyncList>>,
@@ -918,11 +916,11 @@ impl SlidingSync {
mut sync_response: SyncResponse,
list_generators: &mut BTreeMap<String, SlidingSyncListRequestGenerator>,
) -> Result<UpdateSummary, crate::Error> {
Observable::set(&mut self.inner.pos.write().unwrap(), Some(sliding_sync_response.pos));
Observable::set(
&mut self.inner.delta_token.write().unwrap(),
sliding_sync_response.delta_token,
);
{
let mut position_lock = self.inner.position.write().unwrap();
Observable::set(&mut position_lock.pos, Some(sliding_sync_response.pos));
Observable::set(&mut position_lock.delta_token, sliding_sync_response.delta_token);
}
let update_summary = {
let mut rooms = Vec::new();
@@ -1018,8 +1016,12 @@ impl SlidingSync {
}
}
let pos = self.inner.pos.read().unwrap().clone();
let delta_token = self.inner.delta_token.read().unwrap().clone();
let (pos, delta_token) = {
let position_lock = self.inner.position.read().unwrap();
(position_lock.pos.clone(), position_lock.delta_token.clone())
};
let room_subscriptions = self.inner.subscriptions.read().unwrap().clone();
let unsubscribe_rooms = mem::take(&mut *self.inner.unsubscribe.write().unwrap());
let timeout = Duration::from_secs(30);
@@ -1198,7 +1200,11 @@ impl SlidingSync {
warn!("Session expired. Restarting Sliding Sync.");
// To “restart” a Sliding Sync session, we set `pos` to its initial value.
Observable::set(&mut self.inner.pos.write().unwrap(), None);
{
let mut position_lock = self.inner.position.write().unwrap();
Observable::set(&mut position_lock.pos, None);
}
debug!(?self.inner.extensions, "Sliding Sync has been reset");
});
@@ -1218,15 +1224,25 @@ impl SlidingSync {
impl SlidingSync {
/// Get a copy of the `pos` value.
pub fn pos(&self) -> Option<String> {
self.inner.pos.read().unwrap().clone()
let position_lock = self.inner.position.read().unwrap();
position_lock.pos.clone()
}
/// Set a new value for `pos`.
pub fn set_pos(&self, new_pos: String) {
Observable::set(&mut self.inner.pos.write().unwrap(), Some(new_pos));
let mut position_lock = self.inner.position.write().unwrap();
Observable::set(&mut position_lock.pos, Some(new_pos));
}
}
#[derive(Debug)]
pub(super) struct SlidingSyncPositionMarkers {
pos: Observable<Option<String>>,
delta_token: Observable<Option<String>>,
}
#[derive(Serialize, Deserialize)]
struct FrozenSlidingSync {
#[serde(skip_serializing_if = "Option::is_none")]
@@ -1238,7 +1254,7 @@ struct FrozenSlidingSync {
impl From<&SlidingSync> for FrozenSlidingSync {
fn from(sliding_sync: &SlidingSync) -> Self {
FrozenSlidingSync {
delta_token: sliding_sync.inner.delta_token.read().unwrap().clone(),
delta_token: sliding_sync.inner.position.read().unwrap().delta_token.clone(),
to_device_since: sliding_sync
.inner
.extensions