diff --git a/crates/matrix-sdk/src/sliding_sync/client.rs b/crates/matrix-sdk/src/sliding_sync/client.rs new file mode 100644 index 000000000..fef794f09 --- /dev/null +++ b/crates/matrix-sdk/src/sliding_sync/client.rs @@ -0,0 +1,24 @@ +use matrix_sdk_base::sync::SyncResponse; +use ruma::api::client::sync::sync_events::v4; +use tracing::{debug, instrument}; + +use super::SlidingSyncBuilder; +use crate::{Client, Result}; + +impl Client { + /// Create a SlidingSyncBuilder tied to this client + pub async fn sliding_sync(&self) -> SlidingSyncBuilder { + SlidingSyncBuilder::default().client(self.clone()) + } + + #[instrument(skip(self, response))] + pub(crate) async fn process_sliding_sync( + &self, + response: v4::Response, + ) -> Result { + let response = self.base_client().process_sliding_sync(response).await?; + debug!("done processing on base_client"); + self.handle_sync_response(&response).await?; + Ok(response) + } +} diff --git a/crates/matrix-sdk/src/sliding_sync/config.rs b/crates/matrix-sdk/src/sliding_sync/config.rs new file mode 100644 index 000000000..eb0f3706d --- /dev/null +++ b/crates/matrix-sdk/src/sliding_sync/config.rs @@ -0,0 +1,332 @@ +use std::{ + collections::BTreeMap, + fmt::Debug, + sync::{Arc, Mutex}, +}; + +use derive_builder::Builder; +use futures_signals::{signal::Mutable, signal_map::MutableBTreeMap}; +use ruma::{ + api::client::sync::sync_events::v4::{ + self, AccountDataConfig, E2EEConfig, ExtensionsConfig, ReceiptConfig, ToDeviceConfig, + TypingConfig, + }, + assign, OwnedRoomId, +}; +use tracing::trace; +use url::Url; + +use super::{ + Error, FrozenSlidingSync, FrozenSlidingSyncView, SlidingSync, SlidingSyncRoom, SlidingSyncView, + SlidingSyncViewBuilder, +}; +use crate::{Client, Result}; + +/// Configuration for a Sliding Sync Instance +#[derive(Clone, Debug, Builder)] +#[builder( + public, + name = "SlidingSyncBuilder", + pattern = "owned", + build_fn(name = "build_no_cache", private), + derive(Clone, Debug) +)] +pub(super) struct SlidingSyncConfig { + /// The storage key to keep this cache at and load it from + #[builder(setter(strip_option), default)] + storage_key: Option, + /// Customize the homeserver for sliding sync only + #[builder(setter(strip_option), default)] + homeserver: Option, + /// The client this sliding sync will be using + client: Client, + /// Views. + #[builder(private, default)] + views: BTreeMap, + /// Extensions. + #[builder(private, default)] + extensions: Option, + /// Subscriptions. + #[builder(default)] + subscriptions: BTreeMap, +} + +impl SlidingSyncConfig { + pub async fn build(self) -> Result { + let SlidingSyncConfig { + homeserver, + storage_key, + client, + mut views, + mut extensions, + subscriptions, + } = self; + let mut delta_token_inner = None; + let mut rooms_found: BTreeMap = BTreeMap::new(); + + if let Some(storage_key) = storage_key.as_ref() { + trace!(storage_key, "trying to load from cold"); + + for (name, view) in views.iter_mut() { + if let Some(frozen_view) = client + .store() + .get_custom_value(format!("{storage_key}::{name}").as_bytes()) + .await? + .map(|v| serde_json::from_slice::(&v)) + .transpose()? + { + trace!(name, "frozen for view found"); + + let FrozenSlidingSyncView { rooms_count, rooms_list, rooms } = frozen_view; + view.set_from_cold(rooms_count, rooms_list); + for (key, frozen_room) in rooms.into_iter() { + rooms_found.entry(key).or_insert_with(|| { + SlidingSyncRoom::from_frozen(frozen_room, client.clone()) + }); + } + } else { + trace!(name, "no frozen state for view found"); + } + } + + if let Some(FrozenSlidingSync { to_device_since, delta_token }) = client + .store() + .get_custom_value(storage_key.as_bytes()) + .await? + .map(|v| serde_json::from_slice::(&v)) + .transpose()? + { + trace!("frozen for generic found"); + if let Some(since) = to_device_since { + if let Some(to_device_ext) = + extensions.get_or_insert_with(Default::default).to_device.as_mut() + { + to_device_ext.since = Some(since); + } + } + delta_token_inner = delta_token; + } + trace!("sync unfrozen done"); + }; + + trace!(len = rooms_found.len(), "rooms unfrozen"); + let rooms = Arc::new(MutableBTreeMap::with_values(rooms_found)); + + let views = Arc::new(MutableBTreeMap::with_values(views)); + + Ok(SlidingSync { + homeserver, + client, + storage_key, + + views, + rooms, + + extensions: Mutex::new(extensions).into(), + sent_extensions: Mutex::new(None).into(), + failure_count: Default::default(), + + pos: Mutable::new(None), + delta_token: Mutable::new(delta_token_inner), + subscriptions: Arc::new(MutableBTreeMap::with_values(subscriptions)), + unsubscribe: Default::default(), + }) + } +} + +impl SlidingSyncBuilder { + /// Convenience function to add a full-sync view to the builder + pub fn add_fullsync_view(self) -> Self { + self.add_view( + SlidingSyncViewBuilder::default_with_fullsync() + .build() + .expect("Building default full sync view doesn't fail"), + ) + } + + /// The cold cache key to read from and store the frozen state at + pub fn cold_cache(mut self, name: T) -> Self { + self.storage_key = Some(Some(name.to_string())); + self + } + + /// Do not use the cold cache + pub fn no_cold_cache(mut self) -> Self { + self.storage_key = None; + self + } + + /// Reset the views to `None` + pub fn no_views(mut self) -> Self { + self.views = None; + self + } + + /// Add the given view to the views. + /// + /// Replace any view with the name. + pub fn add_view(mut self, v: SlidingSyncView) -> Self { + let views = self.views.get_or_insert_with(Default::default); + views.insert(v.name.clone(), v); + self + } + + /// Activate e2ee, to-device-message and account data extensions if not yet + /// configured. + /// + /// Will leave any extension configuration found untouched, so the order + /// does not matter. + pub fn with_common_extensions(mut self) -> Self { + { + let mut cfg = self + .extensions + .get_or_insert_with(Default::default) + .get_or_insert_with(Default::default); + if cfg.to_device.is_none() { + cfg.to_device = Some(assign!(ToDeviceConfig::default(), { enabled: Some(true) })); + } + + if cfg.e2ee.is_none() { + cfg.e2ee = Some(assign!(E2EEConfig::default(), { enabled: Some(true) })); + } + + if cfg.account_data.is_none() { + cfg.account_data = + Some(assign!(AccountDataConfig::default(), { enabled: Some(true) })); + } + } + self + } + + /// Activate e2ee, to-device-message, account data, typing and receipt + /// extensions if not yet configured. + /// + /// Will leave any extension configuration found untouched, so the order + /// does not matter. + pub fn with_all_extensions(mut self) -> Self { + { + let mut cfg = self + .extensions + .get_or_insert_with(Default::default) + .get_or_insert_with(Default::default); + if cfg.to_device.is_none() { + cfg.to_device = Some(assign!(ToDeviceConfig::default(), { enabled: Some(true) })); + } + + if cfg.e2ee.is_none() { + cfg.e2ee = Some(assign!(E2EEConfig::default(), { enabled: Some(true) })); + } + + if cfg.account_data.is_none() { + cfg.account_data = + Some(assign!(AccountDataConfig::default(), { enabled: Some(true) })); + } + + if cfg.receipt.is_none() { + cfg.receipt = Some(assign!(ReceiptConfig::default(), { enabled: Some(true) })); + } + + if cfg.typing.is_none() { + cfg.typing = Some(assign!(TypingConfig::default(), { enabled: Some(true) })); + } + } + self + } + + /// Set the E2EE extension configuration. + pub fn with_e2ee_extension(mut self, e2ee: E2EEConfig) -> Self { + self.extensions + .get_or_insert_with(Default::default) + .get_or_insert_with(Default::default) + .e2ee = Some(e2ee); + self + } + + /// Unset the E2EE extension configuration. + pub fn without_e2ee_extension(mut self) -> Self { + self.extensions + .get_or_insert_with(Default::default) + .get_or_insert_with(Default::default) + .e2ee = None; + self + } + + /// Set the ToDevice extension configuration. + pub fn with_to_device_extension(mut self, to_device: ToDeviceConfig) -> Self { + self.extensions + .get_or_insert_with(Default::default) + .get_or_insert_with(Default::default) + .to_device = Some(to_device); + self + } + + /// Unset the ToDevice extension configuration. + pub fn without_to_device_extension(mut self) -> Self { + self.extensions + .get_or_insert_with(Default::default) + .get_or_insert_with(Default::default) + .to_device = None; + self + } + + /// Set the account data extension configuration. + pub fn with_account_data_extension(mut self, account_data: AccountDataConfig) -> Self { + self.extensions + .get_or_insert_with(Default::default) + .get_or_insert_with(Default::default) + .account_data = Some(account_data); + self + } + + /// Unset the account data extension configuration. + pub fn without_account_data_extension(mut self) -> Self { + self.extensions + .get_or_insert_with(Default::default) + .get_or_insert_with(Default::default) + .account_data = None; + self + } + + /// Set the Typing extension configuration. + pub fn with_typing_extension(mut self, typing: TypingConfig) -> Self { + self.extensions + .get_or_insert_with(Default::default) + .get_or_insert_with(Default::default) + .typing = Some(typing); + self + } + + /// Unset the Typing extension configuration. + pub fn without_typing_extension(mut self) -> Self { + self.extensions + .get_or_insert_with(Default::default) + .get_or_insert_with(Default::default) + .typing = None; + self + } + + /// Set the Receipt extension configuration. + pub fn with_receipt_extension(mut self, receipt: ReceiptConfig) -> Self { + self.extensions + .get_or_insert_with(Default::default) + .get_or_insert_with(Default::default) + .receipt = Some(receipt); + self + } + + /// Unset the Receipt extension configuration. + pub fn without_receipt_extension(mut self) -> Self { + self.extensions + .get_or_insert_with(Default::default) + .get_or_insert_with(Default::default) + .receipt = None; + self + } + + /// Build the Sliding Sync + /// + /// if configured, load the cached data from cold storage + pub async fn build(self) -> Result { + self.build_no_cache().map_err(Error::SlidingSyncBuilder)?.build().await + } +} diff --git a/crates/matrix-sdk/src/sliding_sync.rs b/crates/matrix-sdk/src/sliding_sync/mod.rs similarity index 50% rename from crates/matrix-sdk/src/sliding_sync.rs rename to crates/matrix-sdk/src/sliding_sync/mod.rs index 3d4c6cc78..4e5e76126 100644 --- a/crates/matrix-sdk/src/sliding_sync.rs +++ b/crates/matrix-sdk/src/sliding_sync/mod.rs @@ -634,47 +634,42 @@ //! [ruma-types]: https://docs.rs/ruma/latest/ruma/api/client/sync/sync_events/v4/index.html //! [future-signals-tutorial]: https://docs.rs/futures-signals/latest/futures_signals/tutorial/index.html +mod client; +mod config; +mod room; +mod view; + use std::{ collections::BTreeMap, fmt::Debug, - ops::{Deref, Not}, + ops::Deref, sync::{ - atomic::{AtomicBool, AtomicU8, Ordering}, + atomic::{AtomicU8, Ordering}, Arc, Mutex, }, time::Duration, }; +pub use client::*; +pub use config::*; use futures_core::stream::Stream; -use futures_signals::{ - signal::Mutable, - signal_map::{MutableBTreeMap, MutableBTreeMapLockRef}, - signal_vec::{MutableVec, MutableVecLockMut}, -}; -use matrix_sdk_base::{deserialized_responses::SyncTimelineEvent, sync::SyncResponse}; +use futures_signals::{signal::Mutable, signal_map::MutableBTreeMap, signal_vec::MutableVec}; +pub use room::*; use ruma::{ api::client::{ error::ErrorKind, - sync::sync_events::{ - v4::{ - self, AccountDataConfig, E2EEConfig, ExtensionsConfig, ReceiptConfig, - ToDeviceConfig, TypingConfig, - }, - UnreadNotificationsCount, + sync::sync_events::v4::{ + self, AccountDataConfig, E2EEConfig, ExtensionsConfig, ToDeviceConfig, }, }, - assign, - events::{AnySyncStateEvent, StateEventType}, - serde::Raw, - OwnedRoomId, RoomId, UInt, + assign, OwnedRoomId, RoomId, UInt, }; use serde::{Deserialize, Serialize}; use thiserror::Error; use tracing::{debug, error, info_span, instrument, trace, warn, Instrument, Span}; use url::Url; +pub use view::*; -#[cfg(feature = "experimental-timeline")] -use crate::room::timeline::{EventTimelineItem, Timeline, TimelineBuilder}; use crate::{config::RequestConfig, Client, Result}; /// Internal representation of errors in Sliding Sync @@ -764,345 +759,6 @@ impl RoomListEntry { } } -type AliveRoomTimeline = Arc>; - -/// Room info as giving by the SlidingSync Feature. -#[derive(Debug, Clone)] -pub struct SlidingSyncRoom { - client: Client, - room_id: OwnedRoomId, - inner: v4::SlidingSyncRoom, - is_loading_more: Mutable, - is_cold: Arc, - prev_batch: Mutable>, - timeline_queue: AliveRoomTimeline, -} - -#[derive(Serialize, Deserialize)] -struct FrozenSlidingSyncRoom { - room_id: OwnedRoomId, - inner: v4::SlidingSyncRoom, - prev_batch: Option, - #[serde(rename = "timeline")] - timeline_queue: Vec, -} - -#[cfg(test)] -mod tests { - use matrix_sdk_base::deserialized_responses::TimelineEvent; - use ruma::events::room::message::RoomMessageEventContent; - use serde_json::json; - - use super::*; - - #[test] - fn test_frozen_sliding_sync_room_serialize() { - let frozen_sliding_sync_room = FrozenSlidingSyncRoom { - room_id: <&RoomId>::try_from("!29fhd83h92h0:example.com").unwrap().to_owned(), - inner: v4::SlidingSyncRoom::default(), - prev_batch: Some("let it go!".to_owned()), - timeline_queue: vec![TimelineEvent { - event: Raw::new(&json! ({ - "content": RoomMessageEventContent::text_plain("let it gooo!"), - "type": "m.room.message", - "event_id": "$xxxxx:example.org", - "room_id": "!someroom:example.com", - "origin_server_ts": 2189, - "sender": "@bob:example.com", - })) - .unwrap() - .cast(), - encryption_info: None, - } - .into()], - }; - - assert_eq!( - serde_json::to_string(&frozen_sliding_sync_room).unwrap(), - "{\"room_id\":\"!29fhd83h92h0:example.com\",\"inner\":{},\"prev_batch\":\"let it go!\",\"timeline\":[{\"event\":{\"content\":{\"body\":\"let it gooo!\",\"msgtype\":\"m.text\"},\"event_id\":\"$xxxxx:example.org\",\"origin_server_ts\":2189,\"room_id\":\"!someroom:example.com\",\"sender\":\"@bob:example.com\",\"type\":\"m.room.message\"},\"encryption_info\":null}]}", - ); - } -} - -impl From<&SlidingSyncRoom> for FrozenSlidingSyncRoom { - fn from(value: &SlidingSyncRoom) -> Self { - let locked_tl = value.timeline_queue.lock_ref(); - let tl_len = locked_tl.len(); - // To not overflow the database, we only freeze the newest 10 items. On doing - // so, we must drop the `prev_batch` key however, as we'd otherwise - // create a gap between what we have loaded and where the - // prev_batch-key will start loading when paginating backwards. - let (prev_batch, timeline) = if tl_len > 10 { - let pos = tl_len - 10; - (None, locked_tl.iter().skip(pos).cloned().collect()) - } else { - (value.prev_batch.lock_ref().clone(), locked_tl.to_vec()) - }; - FrozenSlidingSyncRoom { - prev_batch, - timeline_queue: timeline, - room_id: value.room_id.clone(), - inner: value.inner.clone(), - } - } -} - -impl SlidingSyncRoom { - fn from_frozen(val: FrozenSlidingSyncRoom, client: Client) -> Self { - let FrozenSlidingSyncRoom { room_id, inner, prev_batch, timeline_queue: timeline } = val; - SlidingSyncRoom { - client, - room_id, - inner, - is_loading_more: Mutable::new(false), - is_cold: Arc::new(AtomicBool::new(true)), - prev_batch: Mutable::new(prev_batch), - timeline_queue: Arc::new(MutableVec::new_with_values(timeline)), - } - } -} - -impl SlidingSyncRoom { - fn from( - client: Client, - room_id: OwnedRoomId, - mut inner: v4::SlidingSyncRoom, - timeline: Vec, - ) -> Self { - // we overwrite to only keep one copy - inner.timeline = vec![]; - Self { - client, - room_id, - is_loading_more: Mutable::new(false), - is_cold: Arc::new(AtomicBool::new(false)), - prev_batch: Mutable::new(inner.prev_batch.clone()), - timeline_queue: Arc::new(MutableVec::new_with_values(timeline)), - inner, - } - } - - /// RoomId of this SlidingSyncRoom - pub fn room_id(&self) -> &OwnedRoomId { - &self.room_id - } - - /// Are we currently fetching more timeline events in this room? - pub fn is_loading_more(&self) -> bool { - *self.is_loading_more.lock_ref() - } - - /// the `prev_batch` key to fetch more timeline events for this room - pub fn prev_batch(&self) -> Option { - self.prev_batch.lock_ref().clone() - } - - /// `Timeline` of this room - pub async fn timeline(&self) -> Option { - Some(self.timeline_builder()?.track_fully_read().build().await) - } - - fn timeline_builder(&self) -> Option { - if let Some(room) = self.client.get_room(&self.room_id) { - let timeline_queue = self.timeline_queue.lock_ref().to_vec(); - let prev_batch = self.prev_batch.lock_ref().clone(); - Some(Timeline::builder(&room).events(prev_batch, timeline_queue)) - } else if let Some(invited_room) = self.client.get_invited_room(&self.room_id) { - Some(Timeline::builder(&invited_room).events(None, vec![])) - } else { - error!( - room_id = ?self.room_id, - "Room not found in client. Can't provide a timeline for it" - ); - None - } - } - - /// The latest timeline item of this room. - /// - /// Use `Timeline::latest_event` instead if you already have a timeline for - /// this `SlidingSyncRoom`. - #[instrument(skip_all, parent = &self.client.root_span)] - pub async fn latest_event(&self) -> Option { - self.timeline_builder()?.build().await.latest_event() - } - - /// This rooms name as calculated by the server, if any - pub fn name(&self) -> Option<&str> { - self.inner.name.as_deref() - } - - /// Is this a direct message? - pub fn is_dm(&self) -> Option { - self.inner.is_dm - } - - /// Was this an initial response. - pub fn is_initial_response(&self) -> Option { - self.inner.initial - } - - /// Is there any unread notifications? - pub fn has_unread_notifications(&self) -> bool { - self.inner.unread_notifications.is_empty().not() - } - - /// Get unread notifications. - pub fn unread_notifications(&self) -> &UnreadNotificationsCount { - &self.inner.unread_notifications - } - - /// Get the required state. - pub fn required_state(&self) -> &Vec> { - &self.inner.required_state - } - - fn update( - &mut self, - room_data: &v4::SlidingSyncRoom, - timeline_updates: Vec, - ) { - let v4::SlidingSyncRoom { - name, - initial, - limited, - is_dm, - invite_state, - unread_notifications, - required_state, - prev_batch, - .. - } = room_data; - - self.inner.unread_notifications = unread_notifications.clone(); - - if name.is_some() { - self.inner.name = name.clone(); - } - if initial.is_some() { - self.inner.initial = *initial; - } - if is_dm.is_some() { - self.inner.is_dm = *is_dm; - } - if !invite_state.is_empty() { - self.inner.invite_state = invite_state.clone(); - } - if !required_state.is_empty() { - self.inner.required_state = required_state.clone(); - } - - if let Some(batch) = prev_batch { - self.prev_batch.lock_mut().replace(batch.clone()); - } - - // There is timeline updates. - if !timeline_updates.is_empty() { - if self.is_cold.load(Ordering::SeqCst) { - // If we come from a cold storage, we overwrite the timeline queue with the - // timeline updates. - - self.timeline_queue.lock_mut().replace_cloned(timeline_updates); - self.is_cold.store(false, Ordering::SeqCst); - } else if *limited { - // The server alerted us that we missed items in between. - - self.timeline_queue.lock_mut().replace_cloned(timeline_updates); - } else { - // It's the hot path. We have new updates that must be added to the existing - // timeline queue. - - let mut timeline_queue = self.timeline_queue.lock_mut(); - - // If the `timeline_queue` contains: - // [D, E, F] - // and if the `timeline_updates` contains: - // [A, B, C, D, E, F] - // the resulting `timeline_queue` must be: - // [A, B, C, D, E, F] - // - // To do that, we find the longest suffix between `timeline_queue` and - // `timeline_updates`, in this case: - // [D, E, F] - // Remove the suffix from `timeline_updates`, we get `[A, B, C]` that is - // prepended to `timeline_queue`. - // - // If the `timeline_queue` contains: - // [A, B, C, D, E, F] - // and if the `timeline_updates` contains: - // [D, E, F] - // the resulting `timeline_queue` must be: - // [A, B, C, D, E, F] - // - // To do that, we continue with the longest suffix. In this case, it is: - // [D, E, F] - // Remove the suffix from `timeline_updates`, we get `[]`. It's empty, we don't - // touch at `timeline_queue`. - - { - let timeline_queue_len = timeline_queue.len(); - let timeline_updates_len = timeline_updates.len(); - - let position = match timeline_queue - .iter() - .rev() - .zip(timeline_updates.iter().rev()) - .position(|(queue, update)| queue.event_id() != update.event_id()) - { - // We have found a suffix that equals the size of `timeline_queue` or - // `timeline_update`, typically: - // timeline_queue = [D, E, F] - // timeline_update = [A, B, C, D, E, F] - // or - // timeline_queue = [A, B, C, D, E, F] - // timeline_update = [D, E, F] - // in both case, `position` will return `None` because we are looking for - // (from the end) an item that is different. - None => std::cmp::min(timeline_queue_len, timeline_updates_len), - - // We may have found a suffix. - // - // If we have `Some(0)`, it means we don't have found a suffix. That's the - // hot path, `timeline_updates` will just be appended to `timeline_queue`. - // - // If we have `Some(n)` with `n > 0`, it means we have a prefix but it - // doesn't cover all `timeline_queue` or `timeline_update`, typically: - // timeline_queue = [B, D, E, F] - // timeline_update = [A, B, C, D, E, F] - // in this case, `position` will return `Some(3)`. - // That's annoying because it means we have an invalid `timeline_queue` or - // `timeline_update`, but let's try to do our best. - Some(position) => position, - }; - - if position == 0 { - // No prefix found. - - timeline_queue.extend(timeline_updates); - } else { - // Prefix found. - - let new_timeline_updates = - &timeline_updates[..timeline_updates_len - position]; - - if !new_timeline_updates.is_empty() { - for (at, update) in new_timeline_updates.iter().cloned().enumerate() { - timeline_queue.insert_cloned(at, update); - } - } - } - } - } - } else if *limited { - // The timeline updates are empty. But `limited` is set to true. It's a way to - // alert that we are stale. In this case, we should just clear the - // existing timeline. - - self.timeline_queue.lock_mut().clear(); - } - } -} - type ViewState = Mutable; type SyncMode = Mutable; type StringState = Mutable>; @@ -1114,8 +770,6 @@ type RoomsSubscriptions = Arc type RoomUnsubscribe = Arc>; type Views = Arc>; -use derive_builder::Builder; - /// The Summary of a new SlidingSync Update received #[derive(Debug, Clone)] pub struct UpdateSummary { @@ -1125,313 +779,6 @@ pub struct UpdateSummary { pub rooms: Vec, } -/// Configuration for a Sliding Sync Instance -#[derive(Clone, Debug, Builder)] -#[builder( - public, - name = "SlidingSyncBuilder", - pattern = "owned", - build_fn(name = "build_no_cache", private), - derive(Clone, Debug) -)] -struct SlidingSyncConfig { - /// The storage key to keep this cache at and load it from - #[builder(setter(strip_option), default)] - storage_key: Option, - /// Customize the homeserver for sliding sync only - #[builder(setter(strip_option), default)] - homeserver: Option, - - /// The client this sliding sync will be using - client: Client, - #[builder(private, default)] - views: BTreeMap, - #[builder(private, default)] - extensions: Option, - #[builder(private, default)] - subscriptions: BTreeMap, -} - -impl SlidingSyncConfig { - pub async fn build(self) -> Result { - let SlidingSyncConfig { - homeserver, - storage_key, - client, - mut views, - mut extensions, - subscriptions, - } = self; - let mut delta_token_inner = None; - let mut rooms_found: BTreeMap = BTreeMap::new(); - - if let Some(storage_key) = storage_key.as_ref() { - trace!(storage_key, "trying to load from cold"); - - for (name, view) in views.iter_mut() { - if let Some(frozen_view) = client - .store() - .get_custom_value(format!("{storage_key}::{name}").as_bytes()) - .await? - .map(|v| serde_json::from_slice::(&v)) - .transpose()? - { - trace!(name, "frozen for view found"); - - let FrozenSlidingSyncView { rooms_count, rooms_list, rooms } = frozen_view; - view.set_from_cold(rooms_count, rooms_list); - for (key, frozen_room) in rooms.into_iter() { - rooms_found.entry(key).or_insert_with(|| { - SlidingSyncRoom::from_frozen(frozen_room, client.clone()) - }); - } - } else { - trace!(name, "no frozen state for view found"); - } - } - - if let Some(FrozenSlidingSync { to_device_since, delta_token }) = client - .store() - .get_custom_value(storage_key.as_bytes()) - .await? - .map(|v| serde_json::from_slice::(&v)) - .transpose()? - { - trace!("frozen for generic found"); - if let Some(since) = to_device_since { - if let Some(to_device_ext) = - extensions.get_or_insert_with(Default::default).to_device.as_mut() - { - to_device_ext.since = Some(since); - } - } - delta_token_inner = delta_token; - } - trace!("sync unfrozen done"); - }; - - trace!(len = rooms_found.len(), "rooms unfrozen"); - let rooms = Arc::new(MutableBTreeMap::with_values(rooms_found)); - - let views = Arc::new(MutableBTreeMap::with_values(views)); - - Ok(SlidingSync { - homeserver, - client, - storage_key, - - views, - rooms, - - extensions: Mutex::new(extensions).into(), - sent_extensions: Mutex::new(None).into(), - failure_count: Default::default(), - - pos: Mutable::new(None), - delta_token: Mutable::new(delta_token_inner), - subscriptions: Arc::new(MutableBTreeMap::with_values(subscriptions)), - unsubscribe: Default::default(), - }) - } -} - -impl SlidingSyncBuilder { - /// Convenience function to add a full-sync view to the builder - pub fn add_fullsync_view(self) -> Self { - self.add_view( - SlidingSyncViewBuilder::default_with_fullsync() - .build() - .expect("Building default full sync view doesn't fail"), - ) - } - - /// The cold cache key to read from and store the frozen state at - pub fn cold_cache(mut self, name: T) -> Self { - self.storage_key = Some(Some(name.to_string())); - self - } - - /// Do not use the cold cache - pub fn no_cold_cache(mut self) -> Self { - self.storage_key = None; - self - } - - /// Reset the views to `None` - pub fn no_views(mut self) -> Self { - self.views = None; - self - } - - /// Add the given view to the views. - /// - /// Replace any view with the name. - pub fn add_view(mut self, v: SlidingSyncView) -> Self { - let views = self.views.get_or_insert_with(Default::default); - views.insert(v.name.clone(), v); - self - } - - /// Activate e2ee, to-device-message and account data extensions if not yet - /// configured. - /// - /// Will leave any extension configuration found untouched, so the order - /// does not matter. - pub fn with_common_extensions(mut self) -> Self { - { - let mut cfg = self - .extensions - .get_or_insert_with(Default::default) - .get_or_insert_with(Default::default); - if cfg.to_device.is_none() { - cfg.to_device = Some(assign!(ToDeviceConfig::default(), { enabled: Some(true) })); - } - - if cfg.e2ee.is_none() { - cfg.e2ee = Some(assign!(E2EEConfig::default(), { enabled: Some(true) })); - } - - if cfg.account_data.is_none() { - cfg.account_data = - Some(assign!(AccountDataConfig::default(), { enabled: Some(true) })); - } - } - self - } - - /// Activate e2ee, to-device-message, account data, typing and receipt - /// extensions if not yet configured. - /// - /// Will leave any extension configuration found untouched, so the order - /// does not matter. - pub fn with_all_extensions(mut self) -> Self { - { - let mut cfg = self - .extensions - .get_or_insert_with(Default::default) - .get_or_insert_with(Default::default); - if cfg.to_device.is_none() { - cfg.to_device = Some(assign!(ToDeviceConfig::default(), { enabled: Some(true) })); - } - - if cfg.e2ee.is_none() { - cfg.e2ee = Some(assign!(E2EEConfig::default(), { enabled: Some(true) })); - } - - if cfg.account_data.is_none() { - cfg.account_data = - Some(assign!(AccountDataConfig::default(), { enabled: Some(true) })); - } - - if cfg.receipt.is_none() { - cfg.receipt = Some(assign!(ReceiptConfig::default(), { enabled: Some(true) })); - } - - if cfg.typing.is_none() { - cfg.typing = Some(assign!(TypingConfig::default(), { enabled: Some(true) })); - } - } - self - } - - /// Set the E2EE extension configuration. - pub fn with_e2ee_extension(mut self, e2ee: E2EEConfig) -> Self { - self.extensions - .get_or_insert_with(Default::default) - .get_or_insert_with(Default::default) - .e2ee = Some(e2ee); - self - } - - /// Unset the E2EE extension configuration. - pub fn without_e2ee_extension(mut self) -> Self { - self.extensions - .get_or_insert_with(Default::default) - .get_or_insert_with(Default::default) - .e2ee = None; - self - } - - /// Set the ToDevice extension configuration. - pub fn with_to_device_extension(mut self, to_device: ToDeviceConfig) -> Self { - self.extensions - .get_or_insert_with(Default::default) - .get_or_insert_with(Default::default) - .to_device = Some(to_device); - self - } - - /// Unset the ToDevice extension configuration. - pub fn without_to_device_extension(mut self) -> Self { - self.extensions - .get_or_insert_with(Default::default) - .get_or_insert_with(Default::default) - .to_device = None; - self - } - - /// Set the account data extension configuration. - pub fn with_account_data_extension(mut self, account_data: AccountDataConfig) -> Self { - self.extensions - .get_or_insert_with(Default::default) - .get_or_insert_with(Default::default) - .account_data = Some(account_data); - self - } - - /// Unset the account data extension configuration. - pub fn without_account_data_extension(mut self) -> Self { - self.extensions - .get_or_insert_with(Default::default) - .get_or_insert_with(Default::default) - .account_data = None; - self - } - - /// Set the Typing extension configuration. - pub fn with_typing_extension(mut self, typing: TypingConfig) -> Self { - self.extensions - .get_or_insert_with(Default::default) - .get_or_insert_with(Default::default) - .typing = Some(typing); - self - } - - /// Unset the Typing extension configuration. - pub fn without_typing_extension(mut self) -> Self { - self.extensions - .get_or_insert_with(Default::default) - .get_or_insert_with(Default::default) - .typing = None; - self - } - - /// Set the Receipt extension configuration. - pub fn with_receipt_extension(mut self, receipt: ReceiptConfig) -> Self { - self.extensions - .get_or_insert_with(Default::default) - .get_or_insert_with(Default::default) - .receipt = Some(receipt); - self - } - - /// Unset the Receipt extension configuration. - pub fn without_receipt_extension(mut self) -> Self { - self.extensions - .get_or_insert_with(Default::default) - .get_or_insert_with(Default::default) - .receipt = None; - self - } - - /// Build the Sliding Sync - /// - /// if configured, load the cached data from cold storage - pub async fn build(self) -> Result { - self.build_no_cache().map_err(Error::SlidingSyncBuilder)?.build().await - } -} - /// The sliding sync instance #[derive(Clone, Debug)] pub struct SlidingSync { @@ -1518,9 +865,7 @@ impl SlidingSync { } Ok(()) } -} -impl SlidingSync { /// Generate a new SlidingSyncBuilder with the same inner settings and views /// but without the current state pub fn new_builder_copy(&self) -> SlidingSyncBuilder { @@ -1867,783 +1212,6 @@ impl SlidingSync { } } -/// Holding a specific filtered view within the concept of sliding sync. -/// Main entrypoint to the SlidingSync -/// -/// -/// ```no_run -/// # use futures::executor::block_on; -/// # use matrix_sdk::Client; -/// # use url::Url; -/// # block_on(async { -/// # let homeserver = Url::parse("http://example.com")?; -/// let client = Client::new(homeserver).await?; -/// let sliding_sync = -/// client.sliding_sync().await.add_fullsync_view().build().await?; -/// -/// # anyhow::Ok(()) -/// # }); -/// ``` -#[derive(Clone, Debug, Builder)] -#[builder(build_fn(name = "finish_build"), pattern = "owned", derive(Clone, Debug))] -pub struct SlidingSyncView { - /// Which SyncMode to start this view under - #[builder(setter(custom), default)] - sync_mode: SyncMode, - - /// Sort the rooms list by this - #[builder(default = "SlidingSyncViewBuilder::default_sort()")] - sort: Vec, - - /// Required states to return per room - #[builder(default = "SlidingSyncViewBuilder::default_required_state()")] - required_state: Vec<(StateEventType, String)>, - - /// How many rooms request at a time when doing a full-sync catch up - #[builder(default = "20")] - batch_size: u32, - - /// Whether the view should send `UpdatedAt`-Diff signals for rooms - /// that have changed - #[builder(default = "false")] - send_updates_for_items: bool, - - /// How many rooms request a total hen doing a full-sync catch up - #[builder(setter(into), default)] - limit: Option, - - /// Any filters to apply to the query - #[builder(default)] - filters: Option, - - /// The maximum number of timeline events to query for - #[builder(setter(name = "timeline_limit_raw"), default)] - pub timeline_limit: Mutable>, - - // ----- Public state - /// Name of this view to easily recognize them - #[builder(setter(into))] - pub name: String, - - /// The state this view is in - #[builder(private, default)] - pub state: ViewState, - - /// The total known number of rooms, - #[builder(private, default)] - pub rooms_count: RoomsCount, - - /// The rooms in order - #[builder(private, default)] - pub rooms_list: RoomsList, - - /// The ranges windows of the view - #[builder(setter(name = "ranges_raw"), default)] - ranges: RangeState, - - /// Signaling updates on the room list after processing - #[builder(private)] - rooms_updated_signal: futures_signals::signal::Sender<()>, - - #[builder(private)] - is_cold: Arc, - - /// Get informed if anything in the room changed - /// - /// If you only care to know about changes once all of them have applied - /// (including the total) listen to a clone of this signal. - #[builder(private)] - pub rooms_updated_broadcaster: - futures_signals::signal::Broadcaster>, -} - -#[derive(Serialize, Deserialize)] -struct FrozenSlidingSyncView { - #[serde(default, skip_serializing_if = "Option::is_none")] - rooms_count: Option, - #[serde(default, skip_serializing_if = "Vec::is_empty")] - rooms_list: Vec, - #[serde(default, skip_serializing_if = "BTreeMap::is_empty")] - rooms: BTreeMap, -} - -impl FrozenSlidingSyncView { - fn freeze( - source_view: &SlidingSyncView, - rooms_map: &MutableBTreeMapLockRef<'_, OwnedRoomId, SlidingSyncRoom>, - ) -> Self { - let mut rooms = BTreeMap::new(); - let mut rooms_list = Vec::new(); - for entry in source_view.rooms_list.lock_ref().iter() { - match entry { - RoomListEntry::Filled(o) | RoomListEntry::Invalidated(o) => { - rooms.insert(o.clone(), rooms_map.get(o).expect("rooms always exists").into()); - } - _ => {} - }; - - rooms_list.push(entry.freeze()); - } - FrozenSlidingSyncView { - rooms_count: *source_view.rooms_count.lock_ref(), - rooms_list, - rooms, - } - } -} - -impl SlidingSyncView { - fn set_from_cold(&mut self, rooms_count: Option, rooms_list: Vec) { - self.state.set(SlidingSyncState::Preload); - self.is_cold.store(true, Ordering::SeqCst); - self.rooms_count.replace(rooms_count); - self.rooms_list.lock_mut().replace_cloned(rooms_list); - } -} - -/// the default name for the full sync view -pub const FULL_SYNC_VIEW_NAME: &str = "full-sync"; - -impl SlidingSyncViewBuilder { - /// Create a Builder set up for full sync - pub fn default_with_fullsync() -> Self { - Self::default().name(FULL_SYNC_VIEW_NAME).sync_mode(SlidingSyncMode::PagingFullSync) - } - - /// Build the view - pub fn build(mut self) -> Result { - let (sender, receiver) = futures_signals::signal::channel(()); - self.is_cold = Some(Arc::new(AtomicBool::new(false))); - self.rooms_updated_signal = Some(sender); - self.rooms_updated_broadcaster = Some(futures_signals::signal::Broadcaster::new(receiver)); - self.finish_build() - } - - fn default_sort() -> Vec { - vec!["by_recency".to_owned(), "by_name".to_owned()] - } - - fn default_required_state() -> Vec<(StateEventType, String)> { - vec![ - (StateEventType::RoomEncryption, "".to_owned()), - (StateEventType::RoomTombstone, "".to_owned()), - ] - } - - /// Set the Syncing mode - pub fn sync_mode(mut self, sync_mode: SlidingSyncMode) -> Self { - self.sync_mode = Some(SyncMode::new(sync_mode)); - self - } - - /// Set the ranges to fetch - pub fn ranges>(mut self, range: Vec<(U, U)>) -> Self { - self.ranges = - Some(RangeState::new(range.into_iter().map(|(a, b)| (a.into(), b.into())).collect())); - self - } - - /// Set a single range fetch - pub fn set_range>(mut self, from: U, to: U) -> Self { - self.ranges = Some(RangeState::new(vec![(from.into(), to.into())])); - self - } - - /// Set the ranges to fetch - pub fn add_range>(mut self, from: U, to: U) -> Self { - let r = self.ranges.get_or_insert_with(|| RangeState::new(Vec::new())); - r.lock_mut().push((from.into(), to.into())); - self - } - - /// Set the ranges to fetch - pub fn reset_ranges(mut self) -> Self { - self.ranges = None; - self - } - - /// Set the limit of regular events to fetch for the timeline. - pub fn timeline_limit>(mut self, timeline_limit: U) -> Self { - self.timeline_limit = Some(Mutable::new(Some(timeline_limit.into()))); - self - } - - /// Reset the limit of regular events to fetch for the timeline. It is left - /// to the server to decide how many to send back - pub fn no_timeline_limit(mut self) -> Self { - self.timeline_limit = None; - self - } -} - -enum InnerSlidingSyncViewRequestGenerator { - GrowingFullSync { position: u32, batch_size: u32, limit: Option, live: bool }, - PagingFullSync { position: u32, batch_size: u32, limit: Option, live: bool }, - Live, -} - -struct SlidingSyncViewRequestGenerator { - view: SlidingSyncView, - ranges: Vec<(usize, usize)>, - inner: InnerSlidingSyncViewRequestGenerator, -} - -impl SlidingSyncViewRequestGenerator { - fn new_with_paging_syncup(view: SlidingSyncView) -> Self { - let batch_size = view.batch_size; - let limit = view.limit; - let position = view - .ranges - .get_cloned() - .first() - .map(|(_start, end)| u32::try_from(*end).unwrap()) - .unwrap_or_default(); - - SlidingSyncViewRequestGenerator { - view, - ranges: Default::default(), - inner: InnerSlidingSyncViewRequestGenerator::PagingFullSync { - position, - batch_size, - limit, - live: false, - }, - } - } - - fn new_with_growing_syncup(view: SlidingSyncView) -> Self { - let batch_size = view.batch_size; - let limit = view.limit; - let position = view - .ranges - .get_cloned() - .first() - .map(|(_start, end)| u32::try_from(*end).unwrap()) - .unwrap_or_default(); - - SlidingSyncViewRequestGenerator { - view, - ranges: Default::default(), - inner: InnerSlidingSyncViewRequestGenerator::GrowingFullSync { - position, - batch_size, - limit, - live: false, - }, - } - } - - fn new_live(view: SlidingSyncView) -> Self { - SlidingSyncViewRequestGenerator { - view, - ranges: Default::default(), - inner: InnerSlidingSyncViewRequestGenerator::Live, - } - } - - fn prefetch_request( - &mut self, - start: u32, - batch_size: u32, - limit: Option, - ) -> v4::SyncRequestList { - let calc_end = start + batch_size; - let end = match limit { - Some(l) => std::cmp::min(l, calc_end), - _ => calc_end, - }; - self.make_request_for_ranges(vec![(start.into(), end.into())]) - } - - #[instrument(skip(self), fields(name = self.view.name))] - fn make_request_for_ranges(&mut self, ranges: Vec<(UInt, UInt)>) -> v4::SyncRequestList { - let sort = self.view.sort.clone(); - let required_state = self.view.required_state.clone(); - let timeline_limit = self.view.timeline_limit.get_cloned(); - let filters = self.view.filters.clone(); - - self.ranges = ranges - .iter() - .map(|(a, b)| { - ( - usize::try_from(*a).expect("range is a valid u32"), - usize::try_from(*b).expect("range is a valid u32"), - ) - }) - .collect(); - - assign!(v4::SyncRequestList::default(), { - ranges: ranges, - room_details: assign!(v4::RoomDetailsConfig::default(), { - required_state, - timeline_limit, - }), - sort, - filters, - }) - } - - // generate the next live request - fn live_request(&mut self) -> v4::SyncRequestList { - let ranges = self.view.ranges.read_only().get_cloned(); - self.make_request_for_ranges(ranges) - } - - #[instrument(skip_all, fields(name = self.view.name, rooms_count, has_ops = !ops.is_empty()))] - fn handle_response( - &mut self, - rooms_count: u32, - ops: &Vec, - rooms: &Vec, - ) -> Result { - let res = self.view.handle_response(rooms_count, ops, &self.ranges, rooms)?; - self.update_state(rooms_count.saturating_sub(1)); // index is 0 based, count is 1 based - Ok(res) - } - - fn update_state(&mut self, max_index: u32) { - let Some((_start, range_end)) = self.ranges.first() else { - error!("Why don't we have any ranges?"); - return - }; - - let end = if &(max_index as usize) < range_end { max_index } else { *range_end as u32 }; - - trace!(end, max_index, range_end, name = self.view.name, "updating state"); - - match &mut self.inner { - InnerSlidingSyncViewRequestGenerator::PagingFullSync { - position, live, limit, .. - } - | InnerSlidingSyncViewRequestGenerator::GrowingFullSync { - position, live, limit, .. - } => { - let max = limit.map(|limit| std::cmp::min(limit, max_index)).unwrap_or(max_index); - trace!(end, max, name = self.view.name, "updating state"); - if end >= max { - trace!(name = self.view.name, "going live"); - // we are switching to live mode - self.view.set_range(0, max); - *position = max; - *live = true; - - self.view.state.set_if(SlidingSyncState::Live, |before, _now| { - !matches!(before, SlidingSyncState::Live) - }); - } else { - *position = end; - *live = false; - self.view.set_range(0, end); - self.view.state.set_if(SlidingSyncState::CatchingUp, |before, _now| { - !matches!(before, SlidingSyncState::CatchingUp) - }); - } - } - InnerSlidingSyncViewRequestGenerator::Live => { - self.view.state.set_if(SlidingSyncState::Live, |before, _now| { - !matches!(before, SlidingSyncState::Live) - }); - } - } - } -} - -impl Iterator for SlidingSyncViewRequestGenerator { - type Item = v4::SyncRequestList; - - fn next(&mut self) -> Option { - match self.inner { - InnerSlidingSyncViewRequestGenerator::PagingFullSync { live, .. } - | InnerSlidingSyncViewRequestGenerator::GrowingFullSync { live, .. } - if live => - { - Some(self.live_request()) - } - InnerSlidingSyncViewRequestGenerator::PagingFullSync { - position, - batch_size, - limit, - .. - } => Some(self.prefetch_request(position, batch_size, limit)), - InnerSlidingSyncViewRequestGenerator::GrowingFullSync { - position, - batch_size, - limit, - .. - } => Some(self.prefetch_request(0, position + batch_size, limit)), - InnerSlidingSyncViewRequestGenerator::Live => Some(self.live_request()), - } - } -} - -#[instrument(skip(ops))] -fn room_ops( - rooms_list: &mut MutableVecLockMut<'_, RoomListEntry>, - ops: &Vec, - room_ranges: &Vec<(usize, usize)>, -) -> Result<(), Error> { - let index_in_range = |idx| room_ranges.iter().any(|(start, end)| idx >= *start && idx <= *end); - for op in ops { - match &op.op { - v4::SlidingOp::Sync => { - let start: u32 = op - .range - .ok_or_else(|| { - Error::BadResponse( - "`range` must be present for Sync and Update operation".to_owned(), - ) - })? - .0 - .try_into() - .map_err(|e| Error::BadResponse(format!("`range` not a valid int: {e:}")))?; - let room_ids = op.room_ids.clone(); - room_ids - .into_iter() - .enumerate() - .map(|(i, r)| { - let idx = start as usize + i; - if idx >= rooms_list.len() { - rooms_list.insert_cloned(idx, RoomListEntry::Filled(r)); - } else { - rooms_list.set_cloned(idx, RoomListEntry::Filled(r)); - } - }) - .count(); - } - v4::SlidingOp::Delete => { - let pos: u32 = op - .index - .ok_or_else(|| { - Error::BadResponse( - "`index` must be present for DELETE operation".to_owned(), - ) - })? - .try_into() - .map_err(|e| { - Error::BadResponse(format!("`index` not a valid int for DELETE: {e:}")) - })?; - rooms_list.set_cloned(pos as usize, RoomListEntry::Empty); - } - v4::SlidingOp::Insert => { - let pos: usize = op - .index - .ok_or_else(|| { - Error::BadResponse( - "`index` must be present for INSERT operation".to_owned(), - ) - })? - .try_into() - .map_err(|e| { - Error::BadResponse(format!("`index` not a valid int for INSERT: {e:}")) - })?; - let sliced = rooms_list.as_slice(); - let room = RoomListEntry::Filled(op.room_id.clone().ok_or_else(|| { - Error::BadResponse("`room_id` must be present for INSERT operation".to_owned()) - })?); - let mut dif = 0usize; - loop { - // find the next empty slot and drop it - let (prev_p, prev_overflow) = pos.overflowing_sub(dif); - let check_prev = !prev_overflow && index_in_range(prev_p); - let (next_p, overflown) = pos.overflowing_add(dif); - let check_after = !overflown && next_p < sliced.len() && index_in_range(next_p); - if !check_prev && !check_after { - return Err(Error::BadResponse( - "We were asked to insert but could not find any direction to shift to" - .to_owned(), - )); - } - - if check_prev && sliced[prev_p].empty_or_invalidated() { - // we only check for previous, if there are items left - rooms_list.remove(prev_p); - break; - } else if check_after && sliced[next_p].empty_or_invalidated() { - rooms_list.remove(next_p); - break; - } else { - // let's check the next position; - dif += 1; - } - } - rooms_list.insert_cloned(pos, room); - } - v4::SlidingOp::Invalidate => { - let max_len = rooms_list.len(); - let (mut pos, end): (u32, u32) = if let Some(range) = op.range { - ( - range.0.try_into().map_err(|e| { - Error::BadResponse(format!("`range.0` not a valid int: {e:}")) - })?, - range.1.try_into().map_err(|e| { - Error::BadResponse(format!("`range.1` not a valid int: {e:}")) - })?, - ) - } else { - return Err(Error::BadResponse( - "`range` must be given on `Invalidate` operation".to_owned(), - )); - }; - - if pos > end { - return Err(Error::BadResponse( - "Invalid invalidation, end smaller than start".to_owned(), - )); - } - - // ranges are inclusive up to the last index. e.g. `[0, 10]`; `[0, 0]`. - // ensure we pick them all up - while pos <= end { - if pos as usize >= max_len { - break; // how does this happen? - } - let idx = pos as usize; - let entry = if let Some(RoomListEntry::Filled(b)) = rooms_list.get(idx) { - Some(b.clone()) - } else { - None - }; - - if let Some(b) = entry { - rooms_list.set_cloned(pos as usize, RoomListEntry::Invalidated(b)); - } else { - rooms_list.set_cloned(pos as usize, RoomListEntry::Empty); - } - pos += 1; - } - } - s => { - warn!("Unknown operation occurred: {:?}", s); - } - } - } - - Ok(()) -} - -impl SlidingSyncView { - /// Return a builder with the same settings as before - pub fn new_builder(&self) -> SlidingSyncViewBuilder { - SlidingSyncViewBuilder::default() - .name(&self.name) - .sync_mode(self.sync_mode.lock_ref().clone()) - .sort(self.sort.clone()) - .required_state(self.required_state.clone()) - .batch_size(self.batch_size) - .ranges(self.ranges.read_only().get_cloned()) - } - - /// Set the ranges to fetch - /// - /// Remember to cancel the existing stream and fetch a new one as this will - /// only be applied on the next request. - pub fn set_ranges(&self, range: Vec<(u32, u32)>) -> &Self { - *self.ranges.lock_mut() = range.into_iter().map(|(a, b)| (a.into(), b.into())).collect(); - self - } - - /// Reset the ranges to a particular set - /// - /// Remember to cancel the existing stream and fetch a new one as this will - /// only be applied on the next request. - pub fn set_range(&self, start: u32, end: u32) -> &Self { - *self.ranges.lock_mut() = vec![(start.into(), end.into())]; - self - } - - /// Set the ranges to fetch - /// - /// Remember to cancel the existing stream and fetch a new one as this will - /// only be applied on the next request. - pub fn add_range(&self, start: u32, end: u32) -> &Self { - self.ranges.lock_mut().push((start.into(), end.into())); - self - } - - /// Set the ranges to fetch - /// - /// Note: sending an empty list of ranges is, according to the spec, to be - /// understood that the consumer doesn't care about changes of the room - /// order but you will only receive updates when for rooms entering or - /// leaving the set. - /// - /// Remember to cancel the existing stream and fetch a new one as this will - /// only be applied on the next request. - pub fn reset_ranges(&self) -> &Self { - self.ranges.lock_mut().clear(); - self - } - - /// Find the current valid position of the room in the view room_list. - /// - /// Only matches against the current ranges and only against filled items. - /// Invalid items are ignore. Return the total position the item was - /// found in the room_list, return None otherwise. - pub fn find_room_in_view(&self, room_id: &RoomId) -> Option { - let ranges = self.ranges.lock_ref(); - let listing = self.rooms_list.lock_ref(); - for (start_uint, end_uint) in ranges.iter() { - let mut cur_pos: usize = (*start_uint).try_into().unwrap(); - let end: usize = (*end_uint).try_into().unwrap(); - let iterator = listing.iter().skip(cur_pos); - for n in iterator { - if let RoomListEntry::Filled(r) = n { - if room_id == r { - return Some(cur_pos); - } - } - if cur_pos == end { - break; - } - cur_pos += 1; - } - } - None - } - - /// Find the current valid position of the rooms in the views room_list. - /// - /// Only matches against the current ranges and only against filled items. - /// Invalid items are ignore. Return the total position the items that were - /// found in the room_list, will skip any room not found in the rooms_list. - pub fn find_rooms_in_view(&self, room_ids: &[OwnedRoomId]) -> Vec<(usize, OwnedRoomId)> { - let ranges = self.ranges.lock_ref(); - let listing = self.rooms_list.lock_ref(); - let mut rooms_found = Vec::new(); - for (start_uint, end_uint) in ranges.iter() { - let mut cur_pos: usize = (*start_uint).try_into().unwrap(); - let end: usize = (*end_uint).try_into().unwrap(); - let iterator = listing.iter().skip(cur_pos); - for n in iterator { - if let RoomListEntry::Filled(r) = n { - if room_ids.contains(r) { - rooms_found.push((cur_pos, r.clone())); - } - } - if cur_pos == end { - break; - } - cur_pos += 1; - } - } - rooms_found - } - - /// Return the room_id at the given index - pub fn get_room_id(&self, index: usize) -> Option { - self.rooms_list.lock_ref().get(index).and_then(|e| e.as_room_id().map(ToOwned::to_owned)) - } - - #[instrument(skip(self, ops), fields(name = self.name, ops_count = ops.len()))] - fn handle_response( - &self, - rooms_count: u32, - ops: &Vec, - ranges: &Vec<(usize, usize)>, - rooms: &Vec, - ) -> Result { - let current_rooms_count = self.rooms_count.get(); - if current_rooms_count.is_none() - || current_rooms_count == Some(0) - || self.is_cold.load(Ordering::SeqCst) - { - debug!("first run, replacing rooms list"); - // first response, we do that slightly differently - let rooms_list = - MutableVec::new_with_values(vec![RoomListEntry::Empty; rooms_count as usize]); - // then we apply it - let mut locked = rooms_list.lock_mut(); - room_ops(&mut locked, ops, ranges)?; - self.rooms_list.lock_mut().replace_cloned(locked.as_slice().to_vec()); - self.rooms_count.set(Some(rooms_count)); - self.is_cold.store(false, Ordering::SeqCst); - return Ok(true); - } - - debug!("regular update"); - let mut missing = - rooms_count.checked_sub(self.rooms_list.lock_ref().len() as u32).unwrap_or_default(); - let mut changed = false; - if missing > 0 { - let mut list = self.rooms_list.lock_mut(); - list.reserve_exact(missing as usize); - while missing > 0 { - list.push_cloned(RoomListEntry::Empty); - missing -= 1; - } - changed = true; - } - - { - // keep the lock scoped so that the later find_rooms_in_view doesn't deadlock - let mut rooms_list = self.rooms_list.lock_mut(); - - if !ops.is_empty() { - room_ops(&mut rooms_list, ops, ranges)?; - changed = true; - } else { - debug!("no rooms operations found"); - } - } - - if self.rooms_count.get() != Some(rooms_count) { - self.rooms_count.set(Some(rooms_count)); - changed = true; - } - - if self.send_updates_for_items && !rooms.is_empty() { - let found_views = self.find_rooms_in_view(rooms); - if !found_views.is_empty() { - debug!("room details found"); - let mut rooms_list = self.rooms_list.lock_mut(); - for (pos, room_id) in found_views { - // trigger an `UpdatedAt` update - rooms_list.set_cloned(pos, RoomListEntry::Filled(room_id)); - changed = true; - } - } - } - - if changed { - if let Err(e) = self.rooms_updated_signal.send(()) { - warn!("Could not inform about rooms updated: {:?}", e); - } - } - - Ok(changed) - } - - fn request_generator(&self) -> SlidingSyncViewRequestGenerator { - match self.sync_mode.read_only().get_cloned() { - SlidingSyncMode::PagingFullSync => { - SlidingSyncViewRequestGenerator::new_with_paging_syncup(self.clone()) - } - SlidingSyncMode::GrowingFullSync => { - SlidingSyncViewRequestGenerator::new_with_growing_syncup(self.clone()) - } - SlidingSyncMode::Selective => SlidingSyncViewRequestGenerator::new_live(self.clone()), - } - } -} - -impl Client { - /// Create a SlidingSyncBuilder tied to this client - pub async fn sliding_sync(&self) -> SlidingSyncBuilder { - SlidingSyncBuilder::default().client(self.clone()) - } - - #[instrument(skip(self, response))] - pub(crate) async fn process_sliding_sync( - &self, - response: v4::Response, - ) -> Result { - let response = self.base_client().process_sliding_sync(response).await?; - debug!("done processing on base_client"); - self.handle_sync_response(&response).await?; - Ok(response) - } -} - #[cfg(test)] mod test { use ruma::room_id; diff --git a/crates/matrix-sdk/src/sliding_sync/room.rs b/crates/matrix-sdk/src/sliding_sync/room.rs new file mode 100644 index 000000000..3b0127833 --- /dev/null +++ b/crates/matrix-sdk/src/sliding_sync/room.rs @@ -0,0 +1,363 @@ +use std::{ + fmt::Debug, + ops::Not, + sync::{ + atomic::{AtomicBool, Ordering}, + Arc, + }, +}; + +use futures_signals::{signal::Mutable, signal_vec::MutableVec}; +use matrix_sdk_base::deserialized_responses::SyncTimelineEvent; +use ruma::{ + api::client::sync::sync_events::{v4, UnreadNotificationsCount}, + events::AnySyncStateEvent, + serde::Raw, + OwnedRoomId, +}; +use serde::{Deserialize, Serialize}; +use tracing::{error, instrument}; + +use crate::{ + room::timeline::{EventTimelineItem, Timeline, TimelineBuilder}, + Client, +}; + +type AliveRoomTimeline = Arc>; + +/// Room info as giving by the SlidingSync Feature. +#[derive(Debug, Clone)] +pub struct SlidingSyncRoom { + client: Client, + room_id: OwnedRoomId, + inner: v4::SlidingSyncRoom, + is_loading_more: Mutable, + is_cold: Arc, + prev_batch: Mutable>, + timeline_queue: AliveRoomTimeline, +} + +#[derive(Serialize, Deserialize)] +pub(super) struct FrozenSlidingSyncRoom { + room_id: OwnedRoomId, + inner: v4::SlidingSyncRoom, + prev_batch: Option, + #[serde(rename = "timeline")] + timeline_queue: Vec, +} + +#[cfg(test)] +mod tests { + use matrix_sdk_base::deserialized_responses::TimelineEvent; + use ruma::{events::room::message::RoomMessageEventContent, RoomId}; + use serde_json::json; + + use super::*; + + #[test] + fn test_frozen_sliding_sync_room_serialize() { + let frozen_sliding_sync_room = FrozenSlidingSyncRoom { + room_id: <&RoomId>::try_from("!29fhd83h92h0:example.com").unwrap().to_owned(), + inner: v4::SlidingSyncRoom::default(), + prev_batch: Some("let it go!".to_owned()), + timeline_queue: vec![TimelineEvent { + event: Raw::new(&json! ({ + "content": RoomMessageEventContent::text_plain("let it gooo!"), + "type": "m.room.message", + "event_id": "$xxxxx:example.org", + "room_id": "!someroom:example.com", + "origin_server_ts": 2189, + "sender": "@bob:example.com", + })) + .unwrap() + .cast(), + encryption_info: None, + } + .into()], + }; + + assert_eq!( + serde_json::to_string(&frozen_sliding_sync_room).unwrap(), + "{\"room_id\":\"!29fhd83h92h0:example.com\",\"inner\":{},\"prev_batch\":\"let it go!\",\"timeline\":[{\"event\":{\"content\":{\"body\":\"let it gooo!\",\"msgtype\":\"m.text\"},\"event_id\":\"$xxxxx:example.org\",\"origin_server_ts\":2189,\"room_id\":\"!someroom:example.com\",\"sender\":\"@bob:example.com\",\"type\":\"m.room.message\"},\"encryption_info\":null}]}", + ); + } +} + +impl From<&SlidingSyncRoom> for FrozenSlidingSyncRoom { + fn from(value: &SlidingSyncRoom) -> Self { + let locked_tl = value.timeline_queue.lock_ref(); + let tl_len = locked_tl.len(); + // To not overflow the database, we only freeze the newest 10 items. On doing + // so, we must drop the `prev_batch` key however, as we'd otherwise + // create a gap between what we have loaded and where the + // prev_batch-key will start loading when paginating backwards. + let (prev_batch, timeline) = if tl_len > 10 { + let pos = tl_len - 10; + (None, locked_tl.iter().skip(pos).cloned().collect()) + } else { + (value.prev_batch.lock_ref().clone(), locked_tl.to_vec()) + }; + FrozenSlidingSyncRoom { + prev_batch, + timeline_queue: timeline, + room_id: value.room_id.clone(), + inner: value.inner.clone(), + } + } +} + +impl SlidingSyncRoom { + pub(super) fn from_frozen(val: FrozenSlidingSyncRoom, client: Client) -> Self { + let FrozenSlidingSyncRoom { room_id, inner, prev_batch, timeline_queue: timeline } = val; + SlidingSyncRoom { + client, + room_id, + inner, + is_loading_more: Mutable::new(false), + is_cold: Arc::new(AtomicBool::new(true)), + prev_batch: Mutable::new(prev_batch), + timeline_queue: Arc::new(MutableVec::new_with_values(timeline)), + } + } +} + +impl SlidingSyncRoom { + pub(crate) fn from( + client: Client, + room_id: OwnedRoomId, + mut inner: v4::SlidingSyncRoom, + timeline: Vec, + ) -> Self { + // we overwrite to only keep one copy + inner.timeline = vec![]; + Self { + client, + room_id, + is_loading_more: Mutable::new(false), + is_cold: Arc::new(AtomicBool::new(false)), + prev_batch: Mutable::new(inner.prev_batch.clone()), + timeline_queue: Arc::new(MutableVec::new_with_values(timeline)), + inner, + } + } + + /// RoomId of this SlidingSyncRoom + pub fn room_id(&self) -> &OwnedRoomId { + &self.room_id + } + + /// Are we currently fetching more timeline events in this room? + pub fn is_loading_more(&self) -> bool { + *self.is_loading_more.lock_ref() + } + + /// the `prev_batch` key to fetch more timeline events for this room + pub fn prev_batch(&self) -> Option { + self.prev_batch.lock_ref().clone() + } + + /// `Timeline` of this room + pub async fn timeline(&self) -> Option { + Some(self.timeline_builder()?.track_fully_read().build().await) + } + + fn timeline_builder(&self) -> Option { + if let Some(room) = self.client.get_room(&self.room_id) { + let timeline_queue = self.timeline_queue.lock_ref().to_vec(); + let prev_batch = self.prev_batch.lock_ref().clone(); + Some(Timeline::builder(&room).events(prev_batch, timeline_queue)) + } else if let Some(invited_room) = self.client.get_invited_room(&self.room_id) { + Some(Timeline::builder(&invited_room).events(None, vec![])) + } else { + error!( + room_id = ?self.room_id, + "Room not found in client. Can't provide a timeline for it" + ); + None + } + } + + /// The latest timeline item of this room. + /// + /// Use `Timeline::latest_event` instead if you already have a timeline for + /// this `SlidingSyncRoom`. + #[instrument(skip_all, parent = &self.client.root_span)] + pub async fn latest_event(&self) -> Option { + self.timeline_builder()?.build().await.latest_event() + } + + /// This rooms name as calculated by the server, if any + pub fn name(&self) -> Option<&str> { + self.inner.name.as_deref() + } + + /// Is this a direct message? + pub fn is_dm(&self) -> Option { + self.inner.is_dm + } + + /// Was this an initial response. + pub fn is_initial_response(&self) -> Option { + self.inner.initial + } + + /// Is there any unread notifications? + pub fn has_unread_notifications(&self) -> bool { + self.inner.unread_notifications.is_empty().not() + } + + /// Get unread notifications. + pub fn unread_notifications(&self) -> &UnreadNotificationsCount { + &self.inner.unread_notifications + } + + /// Get the required state. + pub fn required_state(&self) -> &Vec> { + &self.inner.required_state + } + + pub(super) fn update( + &mut self, + room_data: &v4::SlidingSyncRoom, + timeline_updates: Vec, + ) { + let v4::SlidingSyncRoom { + name, + initial, + limited, + is_dm, + invite_state, + unread_notifications, + required_state, + prev_batch, + .. + } = room_data; + + self.inner.unread_notifications = unread_notifications.clone(); + + if name.is_some() { + self.inner.name = name.clone(); + } + if initial.is_some() { + self.inner.initial = *initial; + } + if is_dm.is_some() { + self.inner.is_dm = *is_dm; + } + if !invite_state.is_empty() { + self.inner.invite_state = invite_state.clone(); + } + if !required_state.is_empty() { + self.inner.required_state = required_state.clone(); + } + + if let Some(batch) = prev_batch { + self.prev_batch.lock_mut().replace(batch.clone()); + } + + // There is timeline updates. + if !timeline_updates.is_empty() { + if self.is_cold.load(Ordering::SeqCst) { + // If we come from a cold storage, we overwrite the timeline queue with the + // timeline updates. + + self.timeline_queue.lock_mut().replace_cloned(timeline_updates); + self.is_cold.store(false, Ordering::SeqCst); + } else if *limited { + // The server alerted us that we missed items in between. + + self.timeline_queue.lock_mut().replace_cloned(timeline_updates); + } else { + // It's the hot path. We have new updates that must be added to the existing + // timeline queue. + + let mut timeline_queue = self.timeline_queue.lock_mut(); + + // If the `timeline_queue` contains: + // [D, E, F] + // and if the `timeline_updates` contains: + // [A, B, C, D, E, F] + // the resulting `timeline_queue` must be: + // [A, B, C, D, E, F] + // + // To do that, we find the longest suffix between `timeline_queue` and + // `timeline_updates`, in this case: + // [D, E, F] + // Remove the suffix from `timeline_updates`, we get `[A, B, C]` that is + // prepended to `timeline_queue`. + // + // If the `timeline_queue` contains: + // [A, B, C, D, E, F] + // and if the `timeline_updates` contains: + // [D, E, F] + // the resulting `timeline_queue` must be: + // [A, B, C, D, E, F] + // + // To do that, we continue with the longest suffix. In this case, it is: + // [D, E, F] + // Remove the suffix from `timeline_updates`, we get `[]`. It's empty, we don't + // touch at `timeline_queue`. + + { + let timeline_queue_len = timeline_queue.len(); + let timeline_updates_len = timeline_updates.len(); + + let position = match timeline_queue + .iter() + .rev() + .zip(timeline_updates.iter().rev()) + .position(|(queue, update)| queue.event_id() != update.event_id()) + { + // We have found a suffix that equals the size of `timeline_queue` or + // `timeline_update`, typically: + // timeline_queue = [D, E, F] + // timeline_update = [A, B, C, D, E, F] + // or + // timeline_queue = [A, B, C, D, E, F] + // timeline_update = [D, E, F] + // in both case, `position` will return `None` because we are looking for + // (from the end) an item that is different. + None => std::cmp::min(timeline_queue_len, timeline_updates_len), + + // We may have found a suffix. + // + // If we have `Some(0)`, it means we don't have found a suffix. That's the + // hot path, `timeline_updates` will just be appended to `timeline_queue`. + // + // If we have `Some(n)` with `n > 0`, it means we have a prefix but it + // doesn't cover all `timeline_queue` or `timeline_update`, typically: + // timeline_queue = [B, D, E, F] + // timeline_update = [A, B, C, D, E, F] + // in this case, `position` will return `Some(3)`. + // That's annoying because it means we have an invalid `timeline_queue` or + // `timeline_update`, but let's try to do our best. + Some(position) => position, + }; + + if position == 0 { + // No prefix found. + + timeline_queue.extend(timeline_updates); + } else { + // Prefix found. + + let new_timeline_updates = + &timeline_updates[..timeline_updates_len - position]; + + if !new_timeline_updates.is_empty() { + for (at, update) in new_timeline_updates.iter().cloned().enumerate() { + timeline_queue.insert_cloned(at, update); + } + } + } + } + } + } else if *limited { + // The timeline updates are empty. But `limited` is set to true. It's a way to + // alert that we are stale. In this case, we should just clear the + // existing timeline. + + self.timeline_queue.lock_mut().clear(); + } + } +} diff --git a/crates/matrix-sdk/src/sliding_sync/view.rs b/crates/matrix-sdk/src/sliding_sync/view.rs new file mode 100644 index 000000000..83923da17 --- /dev/null +++ b/crates/matrix-sdk/src/sliding_sync/view.rs @@ -0,0 +1,789 @@ +use std::{ + collections::BTreeMap, + fmt::Debug, + sync::{ + atomic::{AtomicBool, Ordering}, + Arc, + }, +}; + +use derive_builder::Builder; +use futures_signals::{ + signal::Mutable, + signal_map::MutableBTreeMapLockRef, + signal_vec::{MutableVec, MutableVecLockMut}, +}; +use ruma::{ + api::client::sync::sync_events::v4, assign, events::StateEventType, OwnedRoomId, RoomId, UInt, +}; +use serde::{Deserialize, Serialize}; +use tracing::{debug, error, instrument, trace, warn}; + +use super::{ + Error, FrozenSlidingSyncRoom, RangeState, RoomListEntry, RoomsCount, RoomsList, + SlidingSyncMode, SlidingSyncRoom, SlidingSyncState, SyncMode, ViewState, +}; +use crate::Result; + +/// Holding a specific filtered view within the concept of sliding sync. +/// Main entrypoint to the SlidingSync +/// +/// +/// ```no_run +/// # use futures::executor::block_on; +/// # use matrix_sdk::Client; +/// # use url::Url; +/// # block_on(async { +/// # let homeserver = Url::parse("http://example.com")?; +/// let client = Client::new(homeserver).await?; +/// let sliding_sync = +/// client.sliding_sync().await.add_fullsync_view().build().await?; +/// +/// # anyhow::Ok(()) +/// # }); +/// ``` +#[derive(Clone, Debug, Builder)] +#[builder(build_fn(name = "finish_build"), pattern = "owned", derive(Clone, Debug))] +pub struct SlidingSyncView { + /// Which SyncMode to start this view under + #[builder(setter(custom), default)] + sync_mode: SyncMode, + + /// Sort the rooms list by this + #[builder(default = "SlidingSyncViewBuilder::default_sort()")] + sort: Vec, + + /// Required states to return per room + #[builder(default = "SlidingSyncViewBuilder::default_required_state()")] + required_state: Vec<(StateEventType, String)>, + + /// How many rooms request at a time when doing a full-sync catch up + #[builder(default = "20")] + batch_size: u32, + + /// Whether the view should send `UpdatedAt`-Diff signals for rooms + /// that have changed + #[builder(default = "false")] + send_updates_for_items: bool, + + /// How many rooms request a total hen doing a full-sync catch up + #[builder(setter(into), default)] + limit: Option, + + /// Any filters to apply to the query + #[builder(default)] + filters: Option, + + /// The maximum number of timeline events to query for + #[builder(setter(name = "timeline_limit_raw"), default)] + pub timeline_limit: Mutable>, + + // ----- Public state + /// Name of this view to easily recognize them + #[builder(setter(into))] + pub name: String, + + /// The state this view is in + #[builder(private, default)] + pub state: ViewState, + + /// The total known number of rooms, + #[builder(private, default)] + pub rooms_count: RoomsCount, + + /// The rooms in order + #[builder(private, default)] + pub rooms_list: RoomsList, + + /// The ranges windows of the view + #[builder(setter(name = "ranges_raw"), default)] + ranges: RangeState, + + /// Signaling updates on the room list after processing + #[builder(private)] + rooms_updated_signal: futures_signals::signal::Sender<()>, + + #[builder(private)] + is_cold: Arc, + + /// Get informed if anything in the room changed + /// + /// If you only care to know about changes once all of them have applied + /// (including the total) listen to a clone of this signal. + #[builder(private)] + pub rooms_updated_broadcaster: + futures_signals::signal::Broadcaster>, +} + +#[derive(Serialize, Deserialize)] +pub(super) struct FrozenSlidingSyncView { + #[serde(default, skip_serializing_if = "Option::is_none")] + pub(super) rooms_count: Option, + #[serde(default, skip_serializing_if = "Vec::is_empty")] + pub(super) rooms_list: Vec, + #[serde(default, skip_serializing_if = "BTreeMap::is_empty")] + pub(super) rooms: BTreeMap, +} + +impl FrozenSlidingSyncView { + pub(super) fn freeze( + source_view: &SlidingSyncView, + rooms_map: &MutableBTreeMapLockRef<'_, OwnedRoomId, SlidingSyncRoom>, + ) -> Self { + let mut rooms = BTreeMap::new(); + let mut rooms_list = Vec::new(); + for entry in source_view.rooms_list.lock_ref().iter() { + match entry { + RoomListEntry::Filled(o) | RoomListEntry::Invalidated(o) => { + rooms.insert(o.clone(), rooms_map.get(o).expect("rooms always exists").into()); + } + _ => {} + }; + + rooms_list.push(entry.freeze()); + } + FrozenSlidingSyncView { + rooms_count: *source_view.rooms_count.lock_ref(), + rooms_list, + rooms, + } + } +} + +impl SlidingSyncView { + pub(crate) fn set_from_cold( + &mut self, + rooms_count: Option, + rooms_list: Vec, + ) { + self.state.set(SlidingSyncState::Preload); + self.is_cold.store(true, Ordering::SeqCst); + self.rooms_count.replace(rooms_count); + self.rooms_list.lock_mut().replace_cloned(rooms_list); + } +} + +/// the default name for the full sync view +pub const FULL_SYNC_VIEW_NAME: &str = "full-sync"; + +impl SlidingSyncViewBuilder { + /// Create a Builder set up for full sync + pub fn default_with_fullsync() -> Self { + Self::default().name(FULL_SYNC_VIEW_NAME).sync_mode(SlidingSyncMode::PagingFullSync) + } + + /// Build the view + pub fn build(mut self) -> Result { + let (sender, receiver) = futures_signals::signal::channel(()); + self.is_cold = Some(Arc::new(AtomicBool::new(false))); + self.rooms_updated_signal = Some(sender); + self.rooms_updated_broadcaster = Some(futures_signals::signal::Broadcaster::new(receiver)); + self.finish_build() + } + + fn default_sort() -> Vec { + vec!["by_recency".to_owned(), "by_name".to_owned()] + } + + fn default_required_state() -> Vec<(StateEventType, String)> { + vec![ + (StateEventType::RoomEncryption, "".to_owned()), + (StateEventType::RoomTombstone, "".to_owned()), + ] + } + + /// Set the Syncing mode + pub fn sync_mode(mut self, sync_mode: SlidingSyncMode) -> Self { + self.sync_mode = Some(SyncMode::new(sync_mode)); + self + } + + /// Set the ranges to fetch + pub fn ranges>(mut self, range: Vec<(U, U)>) -> Self { + self.ranges = + Some(RangeState::new(range.into_iter().map(|(a, b)| (a.into(), b.into())).collect())); + self + } + + /// Set a single range fetch + pub fn set_range>(mut self, from: U, to: U) -> Self { + self.ranges = Some(RangeState::new(vec![(from.into(), to.into())])); + self + } + + /// Set the ranges to fetch + pub fn add_range>(mut self, from: U, to: U) -> Self { + let r = self.ranges.get_or_insert_with(|| RangeState::new(Vec::new())); + r.lock_mut().push((from.into(), to.into())); + self + } + + /// Set the ranges to fetch + pub fn reset_ranges(mut self) -> Self { + self.ranges = None; + self + } + + /// Set the limit of regular events to fetch for the timeline. + pub fn timeline_limit>(mut self, timeline_limit: U) -> Self { + self.timeline_limit = Some(Mutable::new(Some(timeline_limit.into()))); + self + } + + /// Reset the limit of regular events to fetch for the timeline. It is left + /// to the server to decide how many to send back + pub fn no_timeline_limit(mut self) -> Self { + self.timeline_limit = None; + self + } +} + +enum InnerSlidingSyncViewRequestGenerator { + GrowingFullSync { position: u32, batch_size: u32, limit: Option, live: bool }, + PagingFullSync { position: u32, batch_size: u32, limit: Option, live: bool }, + Live, +} + +pub(super) struct SlidingSyncViewRequestGenerator { + view: SlidingSyncView, + ranges: Vec<(usize, usize)>, + inner: InnerSlidingSyncViewRequestGenerator, +} + +impl SlidingSyncViewRequestGenerator { + fn new_with_paging_syncup(view: SlidingSyncView) -> Self { + let batch_size = view.batch_size; + let limit = view.limit; + let position = view + .ranges + .get_cloned() + .first() + .map(|(_start, end)| u32::try_from(*end).unwrap()) + .unwrap_or_default(); + + SlidingSyncViewRequestGenerator { + view, + ranges: Default::default(), + inner: InnerSlidingSyncViewRequestGenerator::PagingFullSync { + position, + batch_size, + limit, + live: false, + }, + } + } + + fn new_with_growing_syncup(view: SlidingSyncView) -> Self { + let batch_size = view.batch_size; + let limit = view.limit; + let position = view + .ranges + .get_cloned() + .first() + .map(|(_start, end)| u32::try_from(*end).unwrap()) + .unwrap_or_default(); + + SlidingSyncViewRequestGenerator { + view, + ranges: Default::default(), + inner: InnerSlidingSyncViewRequestGenerator::GrowingFullSync { + position, + batch_size, + limit, + live: false, + }, + } + } + + fn new_live(view: SlidingSyncView) -> Self { + SlidingSyncViewRequestGenerator { + view, + ranges: Default::default(), + inner: InnerSlidingSyncViewRequestGenerator::Live, + } + } + + fn prefetch_request( + &mut self, + start: u32, + batch_size: u32, + limit: Option, + ) -> v4::SyncRequestList { + let calc_end = start + batch_size; + let end = match limit { + Some(l) => std::cmp::min(l, calc_end), + _ => calc_end, + }; + self.make_request_for_ranges(vec![(start.into(), end.into())]) + } + + #[instrument(skip(self), fields(name = self.view.name))] + fn make_request_for_ranges(&mut self, ranges: Vec<(UInt, UInt)>) -> v4::SyncRequestList { + let sort = self.view.sort.clone(); + let required_state = self.view.required_state.clone(); + let timeline_limit = self.view.timeline_limit.get_cloned(); + let filters = self.view.filters.clone(); + + self.ranges = ranges + .iter() + .map(|(a, b)| { + ( + usize::try_from(*a).expect("range is a valid u32"), + usize::try_from(*b).expect("range is a valid u32"), + ) + }) + .collect(); + + assign!(v4::SyncRequestList::default(), { + ranges: ranges, + room_details: assign!(v4::RoomDetailsConfig::default(), { + required_state, + timeline_limit, + }), + sort, + filters, + }) + } + + // generate the next live request + fn live_request(&mut self) -> v4::SyncRequestList { + let ranges = self.view.ranges.read_only().get_cloned(); + self.make_request_for_ranges(ranges) + } + + #[instrument(skip_all, fields(name = self.view.name, rooms_count, has_ops = !ops.is_empty()))] + pub(super) fn handle_response( + &mut self, + rooms_count: u32, + ops: &Vec, + rooms: &Vec, + ) -> Result { + let res = self.view.handle_response(rooms_count, ops, &self.ranges, rooms)?; + self.update_state(rooms_count.saturating_sub(1)); // index is 0 based, count is 1 based + Ok(res) + } + + fn update_state(&mut self, max_index: u32) { + let Some((_start, range_end)) = self.ranges.first() else { + error!("Why don't we have any ranges?"); + return + }; + + let end = if &(max_index as usize) < range_end { max_index } else { *range_end as u32 }; + + trace!(end, max_index, range_end, name = self.view.name, "updating state"); + + match &mut self.inner { + InnerSlidingSyncViewRequestGenerator::PagingFullSync { + position, live, limit, .. + } + | InnerSlidingSyncViewRequestGenerator::GrowingFullSync { + position, live, limit, .. + } => { + let max = limit.map(|limit| std::cmp::min(limit, max_index)).unwrap_or(max_index); + trace!(end, max, name = self.view.name, "updating state"); + if end >= max { + trace!(name = self.view.name, "going live"); + // we are switching to live mode + self.view.set_range(0, max); + *position = max; + *live = true; + + self.view.state.set_if(SlidingSyncState::Live, |before, _now| { + !matches!(before, SlidingSyncState::Live) + }); + } else { + *position = end; + *live = false; + self.view.set_range(0, end); + self.view.state.set_if(SlidingSyncState::CatchingUp, |before, _now| { + !matches!(before, SlidingSyncState::CatchingUp) + }); + } + } + InnerSlidingSyncViewRequestGenerator::Live => { + self.view.state.set_if(SlidingSyncState::Live, |before, _now| { + !matches!(before, SlidingSyncState::Live) + }); + } + } + } +} + +impl Iterator for SlidingSyncViewRequestGenerator { + type Item = v4::SyncRequestList; + + fn next(&mut self) -> Option { + match self.inner { + InnerSlidingSyncViewRequestGenerator::PagingFullSync { live, .. } + | InnerSlidingSyncViewRequestGenerator::GrowingFullSync { live, .. } + if live => + { + Some(self.live_request()) + } + InnerSlidingSyncViewRequestGenerator::PagingFullSync { + position, + batch_size, + limit, + .. + } => Some(self.prefetch_request(position, batch_size, limit)), + InnerSlidingSyncViewRequestGenerator::GrowingFullSync { + position, + batch_size, + limit, + .. + } => Some(self.prefetch_request(0, position + batch_size, limit)), + InnerSlidingSyncViewRequestGenerator::Live => Some(self.live_request()), + } + } +} + +#[instrument(skip(ops))] +fn room_ops( + rooms_list: &mut MutableVecLockMut<'_, RoomListEntry>, + ops: &Vec, + room_ranges: &Vec<(usize, usize)>, +) -> Result<(), Error> { + let index_in_range = |idx| room_ranges.iter().any(|(start, end)| idx >= *start && idx <= *end); + for op in ops { + match &op.op { + v4::SlidingOp::Sync => { + let start: u32 = op + .range + .ok_or_else(|| { + Error::BadResponse( + "`range` must be present for Sync and Update operation".to_owned(), + ) + })? + .0 + .try_into() + .map_err(|e| Error::BadResponse(format!("`range` not a valid int: {e:}")))?; + let room_ids = op.room_ids.clone(); + room_ids + .into_iter() + .enumerate() + .map(|(i, r)| { + let idx = start as usize + i; + if idx >= rooms_list.len() { + rooms_list.insert_cloned(idx, RoomListEntry::Filled(r)); + } else { + rooms_list.set_cloned(idx, RoomListEntry::Filled(r)); + } + }) + .count(); + } + v4::SlidingOp::Delete => { + let pos: u32 = op + .index + .ok_or_else(|| { + Error::BadResponse( + "`index` must be present for DELETE operation".to_owned(), + ) + })? + .try_into() + .map_err(|e| { + Error::BadResponse(format!("`index` not a valid int for DELETE: {e:}")) + })?; + rooms_list.set_cloned(pos as usize, RoomListEntry::Empty); + } + v4::SlidingOp::Insert => { + let pos: usize = op + .index + .ok_or_else(|| { + Error::BadResponse( + "`index` must be present for INSERT operation".to_owned(), + ) + })? + .try_into() + .map_err(|e| { + Error::BadResponse(format!("`index` not a valid int for INSERT: {e:}")) + })?; + let sliced = rooms_list.as_slice(); + let room = RoomListEntry::Filled(op.room_id.clone().ok_or_else(|| { + Error::BadResponse("`room_id` must be present for INSERT operation".to_owned()) + })?); + let mut dif = 0usize; + loop { + // find the next empty slot and drop it + let (prev_p, prev_overflow) = pos.overflowing_sub(dif); + let check_prev = !prev_overflow && index_in_range(prev_p); + let (next_p, overflown) = pos.overflowing_add(dif); + let check_after = !overflown && next_p < sliced.len() && index_in_range(next_p); + if !check_prev && !check_after { + return Err(Error::BadResponse( + "We were asked to insert but could not find any direction to shift to" + .to_owned(), + )); + } + + if check_prev && sliced[prev_p].empty_or_invalidated() { + // we only check for previous, if there are items left + rooms_list.remove(prev_p); + break; + } else if check_after && sliced[next_p].empty_or_invalidated() { + rooms_list.remove(next_p); + break; + } else { + // let's check the next position; + dif += 1; + } + } + rooms_list.insert_cloned(pos, room); + } + v4::SlidingOp::Invalidate => { + let max_len = rooms_list.len(); + let (mut pos, end): (u32, u32) = if let Some(range) = op.range { + ( + range.0.try_into().map_err(|e| { + Error::BadResponse(format!("`range.0` not a valid int: {e:}")) + })?, + range.1.try_into().map_err(|e| { + Error::BadResponse(format!("`range.1` not a valid int: {e:}")) + })?, + ) + } else { + return Err(Error::BadResponse( + "`range` must be given on `Invalidate` operation".to_owned(), + )); + }; + + if pos > end { + return Err(Error::BadResponse( + "Invalid invalidation, end smaller than start".to_owned(), + )); + } + + // ranges are inclusive up to the last index. e.g. `[0, 10]`; `[0, 0]`. + // ensure we pick them all up + while pos <= end { + if pos as usize >= max_len { + break; // how does this happen? + } + let idx = pos as usize; + let entry = if let Some(RoomListEntry::Filled(b)) = rooms_list.get(idx) { + Some(b.clone()) + } else { + None + }; + + if let Some(b) = entry { + rooms_list.set_cloned(pos as usize, RoomListEntry::Invalidated(b)); + } else { + rooms_list.set_cloned(pos as usize, RoomListEntry::Empty); + } + pos += 1; + } + } + s => { + warn!("Unknown operation occurred: {:?}", s); + } + } + } + + Ok(()) +} + +impl SlidingSyncView { + /// Return a builder with the same settings as before + pub fn new_builder(&self) -> SlidingSyncViewBuilder { + SlidingSyncViewBuilder::default() + .name(&self.name) + .sync_mode(self.sync_mode.lock_ref().clone()) + .sort(self.sort.clone()) + .required_state(self.required_state.clone()) + .batch_size(self.batch_size) + .ranges(self.ranges.read_only().get_cloned()) + } + + /// Set the ranges to fetch + /// + /// Remember to cancel the existing stream and fetch a new one as this will + /// only be applied on the next request. + pub fn set_ranges(&self, range: Vec<(u32, u32)>) -> &Self { + *self.ranges.lock_mut() = range.into_iter().map(|(a, b)| (a.into(), b.into())).collect(); + self + } + + /// Reset the ranges to a particular set + /// + /// Remember to cancel the existing stream and fetch a new one as this will + /// only be applied on the next request. + pub fn set_range(&self, start: u32, end: u32) -> &Self { + *self.ranges.lock_mut() = vec![(start.into(), end.into())]; + self + } + + /// Set the ranges to fetch + /// + /// Remember to cancel the existing stream and fetch a new one as this will + /// only be applied on the next request. + pub fn add_range(&self, start: u32, end: u32) -> &Self { + self.ranges.lock_mut().push((start.into(), end.into())); + self + } + + /// Set the ranges to fetch + /// + /// Note: sending an empty list of ranges is, according to the spec, to be + /// understood that the consumer doesn't care about changes of the room + /// order but you will only receive updates when for rooms entering or + /// leaving the set. + /// + /// Remember to cancel the existing stream and fetch a new one as this will + /// only be applied on the next request. + pub fn reset_ranges(&self) -> &Self { + self.ranges.lock_mut().clear(); + self + } + + /// Find the current valid position of the room in the view room_list. + /// + /// Only matches against the current ranges and only against filled items. + /// Invalid items are ignore. Return the total position the item was + /// found in the room_list, return None otherwise. + pub fn find_room_in_view(&self, room_id: &RoomId) -> Option { + let ranges = self.ranges.lock_ref(); + let listing = self.rooms_list.lock_ref(); + for (start_uint, end_uint) in ranges.iter() { + let mut cur_pos: usize = (*start_uint).try_into().unwrap(); + let end: usize = (*end_uint).try_into().unwrap(); + let iterator = listing.iter().skip(cur_pos); + for n in iterator { + if let RoomListEntry::Filled(r) = n { + if room_id == r { + return Some(cur_pos); + } + } + if cur_pos == end { + break; + } + cur_pos += 1; + } + } + None + } + + /// Find the current valid position of the rooms in the views room_list. + /// + /// Only matches against the current ranges and only against filled items. + /// Invalid items are ignore. Return the total position the items that were + /// found in the room_list, will skip any room not found in the rooms_list. + pub fn find_rooms_in_view(&self, room_ids: &[OwnedRoomId]) -> Vec<(usize, OwnedRoomId)> { + let ranges = self.ranges.lock_ref(); + let listing = self.rooms_list.lock_ref(); + let mut rooms_found = Vec::new(); + for (start_uint, end_uint) in ranges.iter() { + let mut cur_pos: usize = (*start_uint).try_into().unwrap(); + let end: usize = (*end_uint).try_into().unwrap(); + let iterator = listing.iter().skip(cur_pos); + for n in iterator { + if let RoomListEntry::Filled(r) = n { + if room_ids.contains(r) { + rooms_found.push((cur_pos, r.clone())); + } + } + if cur_pos == end { + break; + } + cur_pos += 1; + } + } + rooms_found + } + + /// Return the room_id at the given index + pub fn get_room_id(&self, index: usize) -> Option { + self.rooms_list.lock_ref().get(index).and_then(|e| e.as_room_id().map(ToOwned::to_owned)) + } + + #[instrument(skip(self, ops), fields(name = self.name, ops_count = ops.len()))] + pub(super) fn handle_response( + &self, + rooms_count: u32, + ops: &Vec, + ranges: &Vec<(usize, usize)>, + rooms: &Vec, + ) -> Result { + let current_rooms_count = self.rooms_count.get(); + if current_rooms_count.is_none() + || current_rooms_count == Some(0) + || self.is_cold.load(Ordering::SeqCst) + { + debug!("first run, replacing rooms list"); + // first response, we do that slightly differently + let rooms_list = + MutableVec::new_with_values(vec![RoomListEntry::Empty; rooms_count as usize]); + // then we apply it + let mut locked = rooms_list.lock_mut(); + room_ops(&mut locked, ops, ranges)?; + self.rooms_list.lock_mut().replace_cloned(locked.as_slice().to_vec()); + self.rooms_count.set(Some(rooms_count)); + self.is_cold.store(false, Ordering::SeqCst); + return Ok(true); + } + + debug!("regular update"); + let mut missing = + rooms_count.checked_sub(self.rooms_list.lock_ref().len() as u32).unwrap_or_default(); + let mut changed = false; + if missing > 0 { + let mut list = self.rooms_list.lock_mut(); + list.reserve_exact(missing as usize); + while missing > 0 { + list.push_cloned(RoomListEntry::Empty); + missing -= 1; + } + changed = true; + } + + { + // keep the lock scoped so that the later find_rooms_in_view doesn't deadlock + let mut rooms_list = self.rooms_list.lock_mut(); + + if !ops.is_empty() { + room_ops(&mut rooms_list, ops, ranges)?; + changed = true; + } else { + debug!("no rooms operations found"); + } + } + + if self.rooms_count.get() != Some(rooms_count) { + self.rooms_count.set(Some(rooms_count)); + changed = true; + } + + if self.send_updates_for_items && !rooms.is_empty() { + let found_views = self.find_rooms_in_view(rooms); + if !found_views.is_empty() { + debug!("room details found"); + let mut rooms_list = self.rooms_list.lock_mut(); + for (pos, room_id) in found_views { + // trigger an `UpdatedAt` update + rooms_list.set_cloned(pos, RoomListEntry::Filled(room_id)); + changed = true; + } + } + } + + if changed { + if let Err(e) = self.rooms_updated_signal.send(()) { + warn!("Could not inform about rooms updated: {:?}", e); + } + } + + Ok(changed) + } + + pub(super) fn request_generator(&self) -> SlidingSyncViewRequestGenerator { + match self.sync_mode.read_only().get_cloned() { + SlidingSyncMode::PagingFullSync => { + SlidingSyncViewRequestGenerator::new_with_paging_syncup(self.clone()) + } + SlidingSyncMode::GrowingFullSync => { + SlidingSyncViewRequestGenerator::new_with_growing_syncup(self.clone()) + } + SlidingSyncMode::Selective => SlidingSyncViewRequestGenerator::new_live(self.clone()), + } + } +}