feat(sdk): Create SlidingSyncInner.

Move all `SlidingSync`'s fields into a new `SlidingSyncInner` type, and then
update `SlidingSync` to contain a single `inner: Arc<SlidingSyncInner>` field.

First off, it's much simpler to understand what's behind an `Arc` for
“clonability” of `SlidingSync`, and what's not. We don't need to worry about
adding a new field that is not behind an `Arc` for a specific reason or
anything. The pattern is clear now.

Second, this is required for next commits.
This commit is contained in:
Ivan Enderlin
2023-03-08 12:12:58 +01:00
parent 66d4ced90f
commit 832146b43d
2 changed files with 103 additions and 88 deletions

View File

@@ -16,8 +16,8 @@ use tracing::trace;
use url::Url;
use super::{
Error, FrozenSlidingSync, FrozenSlidingSyncList, SlidingSync, SlidingSyncList,
SlidingSyncListBuilder, SlidingSyncRoom,
Error, FrozenSlidingSync, FrozenSlidingSyncList, SlidingSync, SlidingSyncInner,
SlidingSyncList, SlidingSyncListBuilder, SlidingSyncRoom,
};
use crate::{Client, Result};
@@ -286,24 +286,26 @@ impl SlidingSyncBuilder {
trace!(len = rooms_found.len(), "rooms unfrozen");
let rooms = Arc::new(StdRwLock::new(rooms_found));
let lists = Arc::new(StdRwLock::new(self.lists));
let rooms = StdRwLock::new(rooms_found);
let lists = StdRwLock::new(self.lists);
Ok(SlidingSync {
homeserver: self.homeserver,
client,
storage_key: self.storage_key,
inner: Arc::new(SlidingSyncInner {
homeserver: self.homeserver,
client,
storage_key: self.storage_key,
lists,
rooms,
lists,
rooms,
extensions: Mutex::new(self.extensions).into(),
reset_counter: Default::default(),
extensions: Mutex::new(self.extensions).into(),
reset_counter: Default::default(),
pos: Arc::new(StdRwLock::new(Observable::new(None))),
delta_token: Arc::new(StdRwLock::new(Observable::new(delta_token_inner))),
subscriptions: Arc::new(StdRwLock::new(self.subscriptions)),
unsubscribe: Default::default(),
pos: StdRwLock::new(Observable::new(None)),
delta_token: StdRwLock::new(Observable::new(delta_token_inner)),
subscriptions: StdRwLock::new(self.subscriptions),
unsubscribe: Default::default(),
}),
})
}
}

View File

@@ -643,6 +643,11 @@ const MAXIMUM_SLIDING_SYNC_SESSION_EXPIRATION: u8 = 3;
/// The Sliding Sync instance.
#[derive(Clone, Debug)]
pub struct SlidingSync {
inner: Arc<SlidingSyncInner>,
}
#[derive(Debug)]
pub(super) struct SlidingSyncInner {
/// Customize the homeserver for sliding sync only
homeserver: Option<Url>,
@@ -653,55 +658,33 @@ pub struct SlidingSync {
storage_key: Option<String>,
/// The `pos` marker.
pos: Arc<StdRwLock<Observable<Option<String>>>>,
pos: StdRwLock<Observable<Option<String>>>,
delta_token: Arc<StdRwLock<Observable<Option<String>>>>,
delta_token: StdRwLock<Observable<Option<String>>>,
/// The lists of this Sliding Sync instance.
lists: Arc<StdRwLock<BTreeMap<String, SlidingSyncList>>>,
lists: StdRwLock<BTreeMap<String, SlidingSyncList>>,
/// The rooms details
rooms: Arc<StdRwLock<BTreeMap<OwnedRoomId, SlidingSyncRoom>>>,
rooms: StdRwLock<BTreeMap<OwnedRoomId, SlidingSyncRoom>>,
subscriptions: Arc<StdRwLock<BTreeMap<OwnedRoomId, v4::RoomSubscription>>>,
unsubscribe: Arc<StdRwLock<Vec<OwnedRoomId>>>,
subscriptions: StdRwLock<BTreeMap<OwnedRoomId, v4::RoomSubscription>>,
unsubscribe: StdRwLock<Vec<OwnedRoomId>>,
/// Number of times a Sliding Session session has been reset.
reset_counter: Arc<AtomicU8>,
reset_counter: AtomicU8,
/// the intended state of the extensions being supplied to sliding /sync
/// calls. May contain the latest next_batch for to_devices, etc.
extensions: Arc<Mutex<Option<ExtensionsConfig>>>,
}
#[derive(Serialize, Deserialize)]
struct FrozenSlidingSync {
#[serde(skip_serializing_if = "Option::is_none")]
to_device_since: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
delta_token: Option<String>,
}
impl From<&SlidingSync> for FrozenSlidingSync {
fn from(v: &SlidingSync) -> Self {
FrozenSlidingSync {
delta_token: v.delta_token.read().unwrap().clone(),
to_device_since: v
.extensions
.lock()
.unwrap()
.as_ref()
.and_then(|ext| ext.to_device.as_ref()?.since.clone()),
}
}
extensions: Mutex<Option<ExtensionsConfig>>,
}
impl SlidingSync {
async fn cache_to_storage(&self) -> Result<(), crate::Error> {
let Some(storage_key) = self.storage_key.as_ref() else { return Ok(()) };
let Some(storage_key) = self.inner.storage_key.as_ref() else { return Ok(()) };
trace!(storage_key, "Saving to storage for later use");
let store = self.client.store();
let store = self.inner.client.store();
// Write this `SlidingSync` instance, as a `FrozenSlidingSync` instance, inside
// the client store.
@@ -714,9 +697,10 @@ impl SlidingSync {
// Write every `SlidingSyncList` inside the client the store.
let frozen_lists = {
let rooms_lock = self.rooms.read().unwrap();
let rooms_lock = self.inner.rooms.read().unwrap();
self.lists
self.inner
.lists
.read()
.unwrap()
.iter()
@@ -747,16 +731,16 @@ impl SlidingSync {
/// lists but without the current state.
pub fn new_builder_copy(&self) -> SlidingSyncBuilder {
let mut builder = Self::builder()
.client(self.client.clone())
.subscriptions(self.subscriptions.read().unwrap().to_owned());
.client(self.inner.client.clone())
.subscriptions(self.inner.subscriptions.read().unwrap().to_owned());
for list in self.lists.read().unwrap().values().map(|list| {
for list in self.inner.lists.read().unwrap().values().map(|list| {
list.new_builder().build().expect("builder worked before, builder works now")
}) {
builder = builder.add_list(list);
}
if let Some(homeserver) = &self.homeserver {
if let Some(homeserver) = &self.inner.homeserver {
builder.homeserver(homeserver.clone())
} else {
builder
@@ -769,7 +753,7 @@ impl SlidingSync {
/// poll the stream after you've altered this. If you do that during, it
/// might take one round trip to take effect.
pub fn subscribe(&self, room_id: OwnedRoomId, settings: Option<v4::RoomSubscription>) {
self.subscriptions.write().unwrap().insert(room_id, settings.unwrap_or_default());
self.inner.subscriptions.write().unwrap().insert(room_id, settings.unwrap_or_default());
}
/// Unsubscribe from a given room.
@@ -778,14 +762,14 @@ impl SlidingSync {
/// poll the stream after you've altered this. If you do that during, it
/// might take one round trip to take effect.
pub fn unsubscribe(&self, room_id: OwnedRoomId) {
if self.subscriptions.write().unwrap().remove(&room_id).is_some() {
self.unsubscribe.write().unwrap().push(room_id);
if self.inner.subscriptions.write().unwrap().remove(&room_id).is_some() {
self.inner.unsubscribe.write().unwrap().push(room_id);
}
}
/// Add the common extensions if not already configured.
pub fn add_common_extensions(&self) {
let mut lock = self.extensions.lock().unwrap();
let mut lock = self.inner.extensions.lock().unwrap();
let mut cfg = lock.get_or_insert_with(Default::default);
if cfg.to_device.is_none() {
@@ -803,18 +787,19 @@ impl SlidingSync {
/// Lookup a specific room
pub fn get_room(&self, room_id: &RoomId) -> Option<SlidingSyncRoom> {
self.rooms.read().unwrap().get(room_id).cloned()
self.inner.rooms.read().unwrap().get(room_id).cloned()
}
/// Check the number of rooms.
pub fn get_number_of_rooms(&self) -> usize {
self.rooms.read().unwrap().len()
self.inner.rooms.read().unwrap().len()
}
fn update_to_device_since(&self, since: String) {
// FIXME: Find a better place where the to-device since token should be
// persisted.
self.extensions
self.inner
.extensions
.lock()
.unwrap()
.get_or_insert_with(Default::default)
@@ -829,7 +814,7 @@ impl SlidingSync {
/// listening to the stream and is therefor not necessarily up to date
/// with the lists used for the stream.
pub fn list(&self, list_name: &str) -> Option<SlidingSyncList> {
self.lists.read().unwrap().get(list_name).cloned()
self.inner.lists.read().unwrap().get(list_name).cloned()
}
/// Remove the SlidingSyncList named `list_name` from the lists list if
@@ -839,7 +824,7 @@ impl SlidingSync {
/// stream created after this. The old stream will still continue to use the
/// previous set of lists.
pub fn pop_list(&self, list_name: &String) -> Option<SlidingSyncList> {
self.lists.write().unwrap().remove(list_name)
self.inner.lists.write().unwrap().remove(list_name)
}
/// Add the list to the list of lists.
@@ -852,7 +837,7 @@ impl SlidingSync {
/// stream created after this. The old stream will still continue to use the
/// previous set of lists.
pub fn add_list(&self, list: SlidingSyncList) -> Option<SlidingSyncList> {
self.lists.write().unwrap().insert(list.name.clone(), list)
self.inner.lists.write().unwrap().insert(list.name.clone(), list)
}
/// Lookup a set of rooms
@@ -860,14 +845,14 @@ impl SlidingSync {
&self,
room_ids: I,
) -> Vec<Option<SlidingSyncRoom>> {
let rooms = self.rooms.read().unwrap();
let rooms = self.inner.rooms.read().unwrap();
room_ids.map(|room_id| rooms.get(&room_id).cloned()).collect()
}
/// Get all rooms.
pub fn get_all_rooms(&self) -> Vec<SlidingSyncRoom> {
self.rooms.read().unwrap().values().cloned().collect()
self.inner.rooms.read().unwrap().values().cloned().collect()
}
fn prepare_extension_config(&self, pos: Option<&str>) -> ExtensionsConfig {
@@ -875,7 +860,7 @@ impl SlidingSync {
// The pos is `None`, it's either our initial sync or the proxy forgot about us
// and sent us an `UnknownPos` error. We need to send out the config for our
// extensions.
let mut extensions = self.extensions.lock().unwrap().clone().unwrap_or_default();
let mut extensions = self.inner.extensions.lock().unwrap().clone().unwrap_or_default();
// Always enable to-device events and the e2ee-extension on the initial request,
// no matter what the caller wants.
@@ -898,6 +883,7 @@ impl SlidingSync {
// We already enabled all the things, just fetch out the to-device since token
// out of self.extensions and set it in a new, and empty, `ExtensionsConfig`.
let since = self
.inner
.extensions
.lock()
.unwrap()
@@ -919,12 +905,15 @@ impl SlidingSync {
mut sync_response: SyncResponse,
list_generators: &mut BTreeMap<String, SlidingSyncListRequestGenerator>,
) -> Result<UpdateSummary, crate::Error> {
Observable::set(&mut self.pos.write().unwrap(), Some(sliding_sync_response.pos));
Observable::set(&mut self.delta_token.write().unwrap(), sliding_sync_response.delta_token);
Observable::set(&mut self.inner.pos.write().unwrap(), Some(sliding_sync_response.pos));
Observable::set(
&mut self.inner.delta_token.write().unwrap(),
sliding_sync_response.delta_token,
);
let update_summary = {
let mut rooms = Vec::new();
let mut rooms_map = self.rooms.write().unwrap();
let mut rooms_map = self.inner.rooms.write().unwrap();
for (room_id, mut room_data) in sliding_sync_response.rooms.into_iter() {
// `sync_response` contains the rooms with decrypted events if any, so look at
@@ -948,7 +937,7 @@ impl SlidingSync {
rooms_map.insert(
room_id.clone(),
SlidingSyncRoom::new(
self.client.clone(),
self.inner.client.clone(),
room_id.clone(),
room_data,
timeline,
@@ -1014,10 +1003,10 @@ impl SlidingSync {
return Ok(None);
}
let pos = self.pos.read().unwrap().clone();
let delta_token = self.delta_token.read().unwrap().clone();
let room_subscriptions = self.subscriptions.read().unwrap().clone();
let unsubscribe_rooms = mem::take(&mut *self.unsubscribe.write().unwrap());
let pos = self.inner.pos.read().unwrap().clone();
let delta_token = self.inner.delta_token.read().unwrap().clone();
let room_subscriptions = self.inner.subscriptions.read().unwrap().clone();
let unsubscribe_rooms = mem::take(&mut *self.inner.unsubscribe.write().unwrap());
let timeout = Duration::from_secs(30);
let extensions = self.prepare_extension_config(pos.as_deref());
@@ -1028,7 +1017,7 @@ impl SlidingSync {
let request_config = RequestConfig::default().timeout(timeout + Duration::from_secs(30));
// Prepare the request.
let request = self.client.send_with_homeserver(
let request = self.inner.client.send_with_homeserver(
assign!(v4::Request::new(), {
pos,
delta_token,
@@ -1042,7 +1031,7 @@ impl SlidingSync {
extensions,
}),
Some(request_config),
self.homeserver.as_ref().map(ToString::to_string),
self.inner.homeserver.as_ref().map(ToString::to_string),
);
// Send the request and get a response with end-to-end encryption support.
@@ -1057,7 +1046,8 @@ impl SlidingSync {
#[cfg(feature = "e2e-encryption")]
let response = {
let (e2ee_uploads, response) =
futures_util::future::join(self.client.send_outgoing_requests(), request).await;
futures_util::future::join(self.inner.client.send_outgoing_requests(), request)
.await;
if let Err(error) = e2ee_uploads {
error!(?error, "Error while sending outgoing E2EE requests");
@@ -1103,7 +1093,7 @@ impl SlidingSync {
// move to Sliding Sync, i.e. to `v4::Response`), but processing the
// `sliding_sync_response` is vital, so it must be done somewhere; for now it
// happens here.
let sync_response = self.client.process_sliding_sync(&response).await?;
let sync_response = self.inner.client.process_sliding_sync(&response).await?;
debug!("Sliding sync response has been processed");
@@ -1120,12 +1110,12 @@ impl SlidingSync {
///
/// This stream will send requests and will handle responses automatically,
/// hence updating the lists.
#[instrument(name = "sync_stream", skip_all, parent = &self.client.root_span)]
#[instrument(name = "sync_stream", skip_all, parent = &self.inner.client.root_span)]
pub fn stream(&self) -> impl Stream<Item = Result<UpdateSummary, crate::Error>> + '_ {
// Collect all the lists that need to be updated.
let mut list_generators = {
let mut list_generators = BTreeMap::new();
let lock = self.lists.read().unwrap();
let lock = self.inner.lists.read().unwrap();
for (name, lists) in lock.iter() {
list_generators.insert(name.clone(), lists.request_generator());
@@ -1136,7 +1126,7 @@ impl SlidingSync {
let stream_id = Uuid::new_v4().to_string();
debug!(?self.extensions, stream_id, "About to run the sync stream");
debug!(?self.inner.extensions, stream_id, "About to run the sync stream");
let instrument_span = Span::current();
@@ -1145,12 +1135,12 @@ impl SlidingSync {
let sync_span = info_span!(parent: &instrument_span, "sync_once");
sync_span.in_scope(|| {
debug!(?self.extensions, "Sync stream loop is running");
debug!(?self.inner.extensions, "Sync stream loop is running");
});
match self.sync_once(&stream_id, &mut list_generators).instrument(sync_span.clone()).await {
Ok(Some(updates)) => {
self.reset_counter.store(0, Ordering::SeqCst);
self.inner.reset_counter.store(0, Ordering::SeqCst);
yield Ok(updates);
}
@@ -1164,7 +1154,7 @@ impl SlidingSync {
// The session has expired.
// Has it expired too many times?
if self.reset_counter.fetch_add(1, Ordering::SeqCst) >= MAXIMUM_SLIDING_SYNC_SESSION_EXPIRATION {
if self.inner.reset_counter.fetch_add(1, Ordering::SeqCst) >= MAXIMUM_SLIDING_SYNC_SESSION_EXPIRATION {
sync_span.in_scope(|| error!("Session expired {MAXIMUM_SLIDING_SYNC_SESSION_EXPIRATION} times in a row"));
// The session has expired too many times, let's raise an error!
@@ -1178,9 +1168,9 @@ impl SlidingSync {
warn!("Session expired. Restarting Sliding Sync.");
// To “restart” a Sliding Sync session, we set `pos` to its initial value.
Observable::set(&mut self.pos.write().unwrap(), None);
Observable::set(&mut self.inner.pos.write().unwrap(), None);
debug!(?self.extensions, "Sliding Sync has been reset");
debug!(?self.inner.extensions, "Sliding Sync has been reset");
});
}
@@ -1198,12 +1188,35 @@ impl SlidingSync {
impl SlidingSync {
/// Get a copy of the `pos` value.
pub fn pos(&self) -> Option<String> {
self.pos.read().unwrap().clone()
self.inner.pos.read().unwrap().clone()
}
/// Set a new value for `pos`.
pub fn set_pos(&self, new_pos: String) {
Observable::set(&mut self.pos.write().unwrap(), Some(new_pos));
Observable::set(&mut self.inner.pos.write().unwrap(), Some(new_pos));
}
}
#[derive(Serialize, Deserialize)]
struct FrozenSlidingSync {
#[serde(skip_serializing_if = "Option::is_none")]
to_device_since: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
delta_token: Option<String>,
}
impl From<&SlidingSync> for FrozenSlidingSync {
fn from(sliding_sync: &SlidingSync) -> Self {
FrozenSlidingSync {
delta_token: sliding_sync.inner.delta_token.read().unwrap().clone(),
to_device_since: sliding_sync
.inner
.extensions
.lock()
.unwrap()
.as_ref()
.and_then(|ext| ext.to_device.as_ref()?.since.clone()),
}
}
}