Merge branch 'main' into feat-sliding-sync-room

This commit is contained in:
Ivan Enderlin
2023-05-11 17:47:08 +02:00
18 changed files with 426 additions and 205 deletions

1
Cargo.lock generated
View File

@@ -538,6 +538,7 @@ version = "1.0.0"
dependencies = [
"criterion",
"matrix-sdk-crypto",
"matrix-sdk-sled",
"matrix-sdk-sqlite",
"matrix-sdk-test",
"pprof",

View File

@@ -11,6 +11,7 @@ publish = false
criterion = { version = "0.4.0", features = ["async", "async_tokio", "html_reports"] }
matrix-sdk-crypto = { path = "../crates/matrix-sdk-crypto", version = "0.6.0"}
matrix-sdk-sqlite = { path = "../crates/matrix-sdk-sqlite", version = "0.1.0", default-features = false, features = ["crypto-store"] }
matrix-sdk-sled = { path = "../crates/matrix-sdk-sled", version = "0.2.0", default-features = false, features = ["crypto-store"] }
matrix-sdk-test = { path = "../testing/matrix-sdk-test", version = "0.6.0"}
ruma = { workspace = true }
serde_json = { workspace = true }

View File

@@ -2,6 +2,7 @@ use std::{ops::Deref, sync::Arc};
use criterion::*;
use matrix_sdk_crypto::{EncryptionSettings, OlmMachine};
use matrix_sdk_sled::SledCryptoStore;
use matrix_sdk_sqlite::SqliteCryptoStore;
use matrix_sdk_test::response_from_file;
use ruma::{
@@ -65,11 +66,15 @@ pub fn keys_query(c: &mut Criterion) {
let name = format!("{count} device and cross signing keys");
// Benchmark memory store.
group.bench_with_input(BenchmarkId::new("memory store", &name), &response, |b, response| {
b.to_async(&runtime)
.iter(|| async { machine.mark_request_as_sent(&txn_id, response).await.unwrap() })
});
// Benchmark sqlite store.
let dir = tempfile::tempdir().unwrap();
let store = Arc::new(runtime.block_on(SqliteCryptoStore::open(dir.path(), None)).unwrap());
let machine =
@@ -80,6 +85,23 @@ pub fn keys_query(c: &mut Criterion) {
.iter(|| async { machine.mark_request_as_sent(&txn_id, response).await.unwrap() })
});
{
let _guard = runtime.enter();
drop(machine);
}
// Benchmark (deprecated) sled store.
let dir = tempfile::tempdir().unwrap();
let store = Arc::new(runtime.block_on(SledCryptoStore::open(dir.path(), None)).unwrap());
let machine =
runtime.block_on(OlmMachine::with_store(alice_id(), alice_device_id(), store)).unwrap();
group.bench_with_input(BenchmarkId::new("sled store", &name), &response, |b, response| {
b.to_async(&runtime)
.iter(|| async { machine.mark_request_as_sent(&txn_id, response).await.unwrap() })
});
group.finish()
}
@@ -108,7 +130,10 @@ pub fn keys_claiming(c: &mut Criterion) {
(machine, &runtime, &txn_id)
},
move |(machine, runtime, txn_id)| {
runtime.block_on(machine.mark_request_as_sent(txn_id, response)).unwrap()
runtime.block_on(async {
machine.mark_request_as_sent(txn_id, response).await.unwrap();
drop(machine);
})
},
BatchSize::SmallInput,
)
@@ -129,6 +154,31 @@ pub fn keys_claiming(c: &mut Criterion) {
.unwrap();
(machine, &runtime, &txn_id)
},
move |(machine, runtime, txn_id)| {
runtime.block_on(async {
machine.mark_request_as_sent(txn_id, response).await.unwrap();
drop(machine)
})
},
BatchSize::SmallInput,
)
});
group.bench_with_input(BenchmarkId::new("sled store", &name), &response, |b, response| {
b.iter_batched(
|| {
let dir = tempfile::tempdir().unwrap();
let store =
Arc::new(runtime.block_on(SledCryptoStore::open(dir.path(), None)).unwrap());
let machine = runtime
.block_on(OlmMachine::with_store(alice_id(), alice_device_id(), store))
.unwrap();
runtime
.block_on(machine.mark_request_as_sent(&txn_id, &keys_query_response))
.unwrap();
(machine, &runtime, &txn_id)
},
move |(machine, runtime, txn_id)| {
runtime.block_on(machine.mark_request_as_sent(txn_id, response)).unwrap()
},
@@ -160,6 +210,8 @@ pub fn room_key_sharing(c: &mut Criterion) {
group.throughput(Throughput::Elements(count as u64));
let name = format!("{count} devices");
// Benchmark memory store.
group.bench_function(BenchmarkId::new("memory store", &name), |b| {
b.to_async(&runtime).iter(|| async {
let requests = machine
@@ -180,6 +232,9 @@ pub fn room_key_sharing(c: &mut Criterion) {
machine.invalidate_group_session(room_id).await.unwrap();
})
});
// Benchmark sqlite store.
let dir = tempfile::tempdir().unwrap();
let store = Arc::new(runtime.block_on(SqliteCryptoStore::open(dir.path(), None)).unwrap());
@@ -209,6 +264,42 @@ pub fn room_key_sharing(c: &mut Criterion) {
})
});
{
let _guard = runtime.enter();
drop(machine);
}
// Benchmark (deprecated) sled store.
let dir = tempfile::tempdir().unwrap();
let store = Arc::new(runtime.block_on(SledCryptoStore::open(dir.path(), None)).unwrap());
let machine =
runtime.block_on(OlmMachine::with_store(alice_id(), alice_device_id(), store)).unwrap();
runtime.block_on(machine.mark_request_as_sent(&txn_id, &keys_query_response)).unwrap();
runtime.block_on(machine.mark_request_as_sent(&txn_id, &response)).unwrap();
group.bench_function(BenchmarkId::new("sled store", &name), |b| {
b.to_async(&runtime).iter(|| async {
let requests = machine
.share_room_key(
room_id,
users.iter().map(Deref::deref),
EncryptionSettings::default(),
)
.await
.unwrap();
assert!(!requests.is_empty());
for request in requests {
machine.mark_request_as_sent(&request.txn_id, &to_device_response).await.unwrap();
}
machine.invalidate_group_session(room_id).await.unwrap();
})
});
group.finish()
}
@@ -229,12 +320,16 @@ pub fn devices_missing_sessions_collecting(c: &mut Criterion) {
runtime.block_on(machine.mark_request_as_sent(&txn_id, &response)).unwrap();
// Benchmark memory store.
group.bench_function(BenchmarkId::new("memory store", &name), |b| {
b.to_async(&runtime).iter_with_large_drop(|| async {
machine.get_missing_sessions(users.iter().map(Deref::deref)).await.unwrap()
})
});
// Benchmark sqlite store.
let dir = tempfile::tempdir().unwrap();
let store = Arc::new(runtime.block_on(SqliteCryptoStore::open(dir.path(), None)).unwrap());
@@ -249,6 +344,27 @@ pub fn devices_missing_sessions_collecting(c: &mut Criterion) {
})
});
{
let _guard = runtime.enter();
drop(machine);
}
// Benchmark (deprecated) sled store.
let dir = tempfile::tempdir().unwrap();
let store = Arc::new(runtime.block_on(SledCryptoStore::open(dir.path(), None)).unwrap());
let machine =
runtime.block_on(OlmMachine::with_store(alice_id(), alice_device_id(), store)).unwrap();
runtime.block_on(machine.mark_request_as_sent(&txn_id, &response)).unwrap();
group.bench_function(BenchmarkId::new("sled store", &name), |b| {
b.to_async(&runtime).iter(|| async {
machine.get_missing_sessions(users.iter().map(Deref::deref)).await.unwrap()
})
});
group.finish()
}

View File

@@ -18,7 +18,7 @@ dictionary NotificationItem {
string? room_canonical_alias;
boolean is_noisy;
boolean is_direct;
boolean is_encrypted;
boolean? is_encrypted;
};
interface TimelineEvent {};
@@ -154,7 +154,15 @@ interface PaginationOptions {
interface RoomMessageEventContent {};
[Error]
interface ClientError {
Generic(string msg);
};
interface MediaSource {
[Name=from_json, Throws=ClientError]
constructor(string json);
string to_json();
string url();
};

View File

@@ -1,6 +1,6 @@
use matrix_sdk::{self, encryption::CryptoStoreError, HttpError, IdParseError, StoreError};
#[derive(Debug, thiserror::Error, uniffi::Error)]
#[derive(Debug, thiserror::Error)]
pub enum ClientError {
#[error("client error: {msg}")]
Generic { msg: String },

View File

@@ -21,7 +21,7 @@ pub struct NotificationItem {
pub is_noisy: bool,
pub is_direct: bool,
pub is_encrypted: bool,
pub is_encrypted: Option<bool>,
}
impl NotificationItem {
@@ -46,7 +46,10 @@ impl NotificationItem {
room: Room,
actions: Vec<Action>,
) -> anyhow::Result<Self> {
let sender = room.get_member(event.sender()).await?;
let sender = match &room {
Room::Invited(invited) => invited.invite_details().await?.inviter,
_ => room.get_member(event.sender()).await?,
};
let mut sender_display_name = None;
let mut sender_avatar_url = None;
if let Some(sender) = sender {
@@ -65,7 +68,7 @@ impl NotificationItem {
room_canonical_alias: room.canonical_alias().map(|c| c.to_string()),
is_noisy,
is_direct: room.is_direct().await?,
is_encrypted: room.is_encrypted().await?,
is_encrypted: room.is_encrypted().await.ok(),
};
Ok(item)
}

View File

@@ -8,7 +8,7 @@ use matrix_sdk::ruma::{
v4::RoomSubscription as RumaRoomSubscription,
UnreadNotificationsCount as RumaUnreadNotificationsCount,
},
assign, IdParseError, OwnedRoomId, RoomId, UInt,
assign, IdParseError, OwnedRoomId, RoomId,
};
pub use matrix_sdk::{
room::timeline::Timeline, ruma::api::client::sync::sync_events::v4::SyncRequestListFilters,
@@ -529,9 +529,9 @@ impl SlidingSyncListBuilder {
Arc::new(builder)
}
pub fn add_range(self: Arc<Self>, from: u32, to: u32) -> Arc<Self> {
pub fn add_range(self: Arc<Self>, from: u32, to_included: u32) -> Arc<Self> {
let mut builder = unwrap_or_clone_arc(self);
builder.inner = builder.inner.add_range(from, to);
builder.inner = builder.inner.add_range(from..=to_included);
Arc::new(builder)
}
@@ -627,7 +627,7 @@ impl SlidingSyncList {
/// Remember to cancel the existing stream and fetch a new one as this will
/// only be applied on the next request.
pub fn set_range(&self, start: u32, end: u32) -> Result<(), SlidingSyncError> {
self.inner.set_range(start, end).map_err(Into::into)
self.inner.set_range(start..=end).map_err(Into::into)
}
/// Set the ranges to fetch
@@ -635,7 +635,7 @@ impl SlidingSyncList {
/// Remember to cancel the existing stream and fetch a new one as this will
/// only be applied on the next request.
pub fn add_range(&self, start: u32, end: u32) -> Result<(), SlidingSyncError> {
self.inner.add_range((start, end)).map_err(Into::into)
self.inner.add_range(start..=end).map_err(Into::into)
}
/// Reset the ranges
@@ -650,7 +650,7 @@ impl SlidingSyncList {
/// The current timeline limit
pub fn get_timeline_limit(&self) -> Option<u32> {
self.inner.timeline_limit().map(|limit| u32::try_from(limit).unwrap_or_default())
self.inner.timeline_limit()
}
/// The current timeline limit
@@ -660,7 +660,7 @@ impl SlidingSyncList {
/// Unset the current timeline limit
pub fn unset_timeline_limit(&self) {
self.inner.set_timeline_limit::<UInt>(None)
self.inner.set_timeline_limit(None)
}
}

View File

@@ -11,7 +11,10 @@ use matrix_sdk::{
use ruma::UInt;
use tracing::warn;
use crate::{error::TimelineError, helpers::unwrap_or_clone_arc};
use crate::{
error::{ClientError, TimelineError},
helpers::unwrap_or_clone_arc,
};
#[uniffi::export]
pub fn media_source_from_url(url: String) -> Arc<MediaSource> {
@@ -934,6 +937,15 @@ pub enum VirtualTimelineItem {
#[extension_trait]
pub impl MediaSourceExt for MediaSource {
fn from_json(json: String) -> Result<MediaSource, ClientError> {
let res = serde_json::from_str(&json)?;
Ok(res)
}
fn to_json(&self) -> String {
serde_json::to_string(self).expect("Media source should always be serializable ")
}
fn url(&self) -> String {
match self {
MediaSource::Plain(url) => url.to_string(),

View File

@@ -61,7 +61,7 @@ struct DebugNotification<'a>(&'a Notification);
#[cfg(not(tarpaulin_include))]
impl<'a> fmt::Debug for DebugNotification<'a> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("DebugNotification")
f.debug_struct("Notification")
.field("actions", &self.0.actions)
.field("event", &DebugRawEvent(&self.0.event))
.field("profile_tag", &self.0.profile_tag)

View File

@@ -16,7 +16,7 @@
use std::{collections::BTreeMap, fmt};
use matrix_sdk_common::deserialized_responses::SyncTimelineEvent;
use matrix_sdk_common::{debug::DebugRawEvent, deserialized_responses::SyncTimelineEvent};
use ruma::{
api::client::{
push::get_notifications::v3::Notification,
@@ -81,7 +81,7 @@ impl fmt::Debug for SyncResponse {
}
/// Updates to rooms in a [`SyncResponse`].
#[derive(Clone, Debug, Default)]
#[derive(Clone, Default)]
pub struct Rooms {
/// The rooms that the user has left or been banned from.
pub leave: BTreeMap<OwnedRoomId, LeftRoom>,
@@ -91,8 +91,19 @@ pub struct Rooms {
pub invite: BTreeMap<OwnedRoomId, InvitedRoom>,
}
#[cfg(not(tarpaulin_include))]
impl fmt::Debug for Rooms {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("Rooms")
.field("leave", &self.leave)
.field("join", &self.join)
.field("invite", &DebugInvitedRooms(&self.invite))
.finish()
}
}
/// Updates to joined rooms.
#[derive(Clone, Debug)]
#[derive(Clone)]
pub struct JoinedRoom {
/// Counts of unread notifications for this room.
pub unread_notifications: UnreadNotificationsCount,
@@ -110,6 +121,18 @@ pub struct JoinedRoom {
pub ephemeral: Vec<Raw<AnySyncEphemeralRoomEvent>>,
}
impl fmt::Debug for JoinedRoom {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("JoinedRoom")
.field("unread_notifications", &self.unread_notifications)
.field("timeline", &self.timeline)
.field("state", &DebugListOfRawEvents(&self.state))
.field("account_data", &DebugListOfRawEventsNoId(&self.account_data))
.field("ephemeral", &self.ephemeral)
.finish()
}
}
impl JoinedRoom {
pub(crate) fn new(
timeline: Timeline,
@@ -142,7 +165,7 @@ impl From<RumaUnreadNotificationsCount> for UnreadNotificationsCount {
}
/// Updates to left rooms.
#[derive(Clone, Debug)]
#[derive(Clone)]
pub struct LeftRoom {
/// The timeline of messages and state changes in the room up to the point
/// when the user left.
@@ -166,6 +189,16 @@ impl LeftRoom {
}
}
impl fmt::Debug for LeftRoom {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("JoinedRoom")
.field("timeline", &self.timeline)
.field("state", &DebugListOfRawEvents(&self.state))
.field("account_data", &DebugListOfRawEventsNoId(&self.account_data))
.finish()
}
}
/// Events in the room.
#[derive(Clone, Debug, Default)]
pub struct Timeline {
@@ -186,3 +219,34 @@ impl Timeline {
Self { limited, prev_batch, ..Default::default() }
}
}
struct DebugInvitedRooms<'a>(&'a BTreeMap<OwnedRoomId, InvitedRoom>);
#[cfg(not(tarpaulin_include))]
impl<'a> fmt::Debug for DebugInvitedRooms<'a> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_map().entries(self.0.iter().map(|(k, v)| (k, DebugInvitedRoom(v)))).finish()
}
}
struct DebugInvitedRoom<'a>(&'a InvitedRoom);
#[cfg(not(tarpaulin_include))]
impl<'a> fmt::Debug for DebugInvitedRoom<'a> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("InvitedRoom")
.field("invite_state", &DebugListOfRawEvents(&self.0.invite_state.events))
.finish()
}
}
struct DebugListOfRawEvents<'a, T>(&'a [Raw<T>]);
#[cfg(not(tarpaulin_include))]
impl<'a, T> fmt::Debug for DebugListOfRawEvents<'a, T> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
let mut list = f.debug_list();
list.entries(self.0.iter().map(DebugRawEvent));
list.finish()
}
}

View File

@@ -16,7 +16,7 @@
use std::fmt;
use ruma::{serde::Raw, OwnedEventId};
use ruma::serde::Raw;
/// A wrapper around `Raw` that implements `Debug` in a way that only prints the
/// event ID and event type.
@@ -26,8 +26,8 @@ pub struct DebugRawEvent<'a, T>(pub &'a Raw<T>);
impl<T> fmt::Debug for DebugRawEvent<'_, T> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("RawEvent")
.field("event_id", &DebugEventId(self.0.get_field("event_id")))
.field("event_type", &DebugEventType(self.0.get_field("event_type")))
.field("event_id", &DebugStringField(self.0.get_field("event_id")))
.field("event_type", &DebugStringField(self.0.get_field("type")))
.finish_non_exhaustive()
}
}
@@ -40,28 +40,15 @@ pub struct DebugRawEventNoId<'a, T>(pub &'a Raw<T>);
impl<T> fmt::Debug for DebugRawEventNoId<'_, T> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("RawEvent")
.field("event_type", &DebugEventType(self.0.get_field("event_type")))
.field("event_type", &DebugStringField(self.0.get_field("type")))
.finish_non_exhaustive()
}
}
struct DebugEventId(serde_json::Result<Option<OwnedEventId>>);
struct DebugStringField(serde_json::Result<Option<String>>);
#[cfg(not(tarpaulin_include))]
impl fmt::Debug for DebugEventId {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match &self.0 {
Ok(Some(id)) => id.fmt(f),
Ok(None) => f.write_str("Missing"),
Err(e) => f.debug_tuple("Invalid").field(&e).finish(),
}
}
}
struct DebugEventType(serde_json::Result<Option<String>>);
#[cfg(not(tarpaulin_include))]
impl fmt::Debug for DebugEventType {
impl fmt::Debug for DebugStringField {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match &self.0 {
Ok(Some(id)) => id.fmt(f),

View File

@@ -13,6 +13,7 @@
// limitations under the License.
use std::{
collections::HashMap,
ops::Deref,
sync::{
atomic::{AtomicBool, Ordering},
@@ -25,7 +26,7 @@ use ruma::{
events::{
key::verification::VerificationMethod, room::message::KeyVerificationRequestEventContent,
},
EventId, OwnedDeviceId, OwnedUserId, RoomId, UserId,
DeviceId, EventId, OwnedDeviceId, OwnedUserId, RoomId, UserId,
};
use serde::{Deserialize, Serialize};
use tracing::error;
@@ -161,14 +162,10 @@ impl OwnUserIdentity {
&self,
methods: Option<Vec<VerificationMethod>>,
) -> Result<(VerificationRequest, OutgoingVerificationRequest), CryptoStoreError> {
let devices: Vec<OwnedDeviceId> = self
.verification_machine
.store
.get_user_devices(self.user_id())
.await?
.into_keys()
.filter(|d| &**d != self.verification_machine.own_device_id())
.collect();
let all_devices = self.verification_machine.store.get_user_devices(self.user_id()).await?;
let devices = self
.inner
.filter_devices_to_request(all_devices, self.verification_machine.own_device_id());
Ok(self
.verification_machine
@@ -622,6 +619,20 @@ impl ReadOnlyOwnUserIdentity {
Ok(())
}
fn filter_devices_to_request(
&self,
devices: HashMap<OwnedDeviceId, ReadOnlyDevice>,
own_device_id: &DeviceId,
) -> Vec<OwnedDeviceId> {
devices
.into_iter()
.filter_map(|(device_id, device)| {
(device_id != own_device_id && self.is_device_signed(&device).is_ok())
.then_some(device_id)
})
.collect()
}
}
#[cfg(any(test, feature = "testing"))]
@@ -699,11 +710,11 @@ pub(crate) mod testing {
#[cfg(test)]
pub(crate) mod tests {
use std::sync::Arc;
use std::{collections::HashMap, sync::Arc};
use assert_matches::assert_matches;
use matrix_sdk_test::async_test;
use ruma::user_id;
use ruma::{device_id, user_id};
use serde_json::{json, Value};
use tokio::sync::Mutex;
@@ -875,4 +886,27 @@ pub(crate) mod tests {
Err(_)
);
}
#[test]
fn filter_devices_to_request() {
let response = own_key_query();
let identity = get_own_identity();
let (first, second) = device(&response);
let second_device_id = second.device_id().to_owned();
let unknown_device_id = device_id!("UNKNOWN");
let devices = HashMap::from([
(first.device_id().to_owned(), first),
(second.device_id().to_owned(), second),
]);
// Own device and devices not verified are filtered out.
assert_eq!(identity.filter_devices_to_request(devices.clone(), &second_device_id).len(), 0);
// Signed devices that are not our own are kept.
assert_eq!(
identity.filter_devices_to_request(devices, unknown_device_id),
[second_device_id]
);
}
}

View File

@@ -513,6 +513,11 @@ impl Device {
pub async fn set_local_trust(&self, trust_state: LocalTrust) -> Result<(), CryptoStoreError> {
self.inner.set_local_trust(trust_state).await
}
/// Is the device cross-signed by its own user.
pub fn is_cross_signed_by_owner(&self) -> bool {
self.inner.is_cross_signed_by_owner()
}
}
/// The collection of all the [`Device`]s a user has.

View File

@@ -79,7 +79,7 @@ let list_builder = SlidingSyncList::builder("main_list")
v4::SyncRequestListFilters::default(), { is_dm: Some(true)}
)))
.sort(vec!["by_recency".to_owned()])
.set_range(0u32, 9u32);
.set_range(0u32..=9);
```
Please refer to the [specification][MSC], the [Ruma types][ruma-types],
@@ -438,7 +438,7 @@ let full_sync_list = SlidingSyncList::builder(&full_sync_list_name)
let active_list = SlidingSyncList::builder(&active_list_name) // the active window
.sync_mode(SlidingSyncMode::Selective) // sync up the specific range only
.set_range(0u32, 9u32) // only the top 10 items
.set_range(0u32..=9) // only the top 10 items
.sort(vec!["by_recency".to_owned()]) // last active
.timeline_limit(5u32) // add the last 5 timeline items for room preview and faster timeline loading
.required_state(vec![ // we want to know immediately:

View File

@@ -3,16 +3,17 @@
use std::{
convert::identity,
fmt,
ops::RangeInclusive,
sync::{Arc, RwLock as StdRwLock},
};
use eyeball::unique::Observable;
use eyeball_im::ObservableVector;
use ruma::{api::client::sync::sync_events::v4, events::StateEventType, UInt};
use ruma::{api::client::sync::sync_events::v4, events::StateEventType};
use tokio::sync::mpsc::Sender;
use super::{
super::SlidingSyncInternalMessage, SlidingSyncList, SlidingSyncListInner,
super::SlidingSyncInternalMessage, Bound, SlidingSyncList, SlidingSyncListInner,
SlidingSyncListRequestGenerator, SlidingSyncMode, SlidingSyncState,
};
@@ -28,9 +29,9 @@ pub struct SlidingSyncListBuilder {
full_sync_batch_size: u32,
full_sync_maximum_number_of_rooms_to_fetch: Option<u32>,
filters: Option<v4::SyncRequestListFilters>,
timeline_limit: Option<UInt>,
timeline_limit: Option<Bound>,
name: String,
ranges: Vec<(UInt, UInt)>,
ranges: Vec<RangeInclusive<Bound>>,
once_built: Arc<Box<dyn Fn(SlidingSyncList) -> SlidingSyncList + Send + Sync>>,
}
@@ -130,8 +131,8 @@ impl SlidingSyncListBuilder {
}
/// Set the limit of regular events to fetch for the timeline.
pub fn timeline_limit<U: Into<UInt>>(mut self, timeline_limit: U) -> Self {
self.timeline_limit = Some(timeline_limit.into());
pub fn timeline_limit(mut self, timeline_limit: Bound) -> Self {
self.timeline_limit = Some(timeline_limit);
self
}
@@ -143,24 +144,24 @@ impl SlidingSyncListBuilder {
}
/// Set the ranges to fetch.
pub fn ranges<U: Into<UInt>>(mut self, range: Vec<(U, U)>) -> Self {
self.ranges = range.into_iter().map(|(a, b)| (a.into(), b.into())).collect();
pub fn ranges(mut self, ranges: Vec<RangeInclusive<Bound>>) -> Self {
self.ranges = ranges;
self
}
/// Set a single range fetch.
pub fn set_range<U: Into<UInt>>(mut self, from: U, to: U) -> Self {
self.ranges = vec![(from.into(), to.into())];
/// Set a single range to fetch.
pub fn set_range(mut self, range: RangeInclusive<Bound>) -> Self {
self.ranges = vec![range];
self
}
/// Set the ranges to fetch.
pub fn add_range<U: Into<UInt>>(mut self, from: U, to: U) -> Self {
self.ranges.push((from.into(), to.into()));
pub fn add_range(mut self, range: RangeInclusive<Bound>) -> Self {
self.ranges.push(range);
self
}
/// Set the ranges to fetch.
/// Reset the ranges to fetch.
pub fn reset_ranges(mut self) -> Self {
self.ranges.clear();
self

View File

@@ -8,7 +8,7 @@ use std::{
collections::HashSet,
fmt::Debug,
iter,
ops::Not,
ops::{Not, RangeInclusive},
sync::{Arc, RwLock as StdRwLock},
};
@@ -20,7 +20,7 @@ use futures_core::Stream;
use imbl::Vector;
pub(super) use request_generator::*;
pub use room_list_entry::RoomListEntry;
use ruma::{api::client::sync::sync_events::v4, assign, events::StateEventType, OwnedRoomId, UInt};
use ruma::{api::client::sync::sync_events::v4, assign, events::StateEventType, OwnedRoomId};
use serde::{Deserialize, Serialize};
use tokio::sync::mpsc::Sender;
use tracing::{instrument, warn};
@@ -28,7 +28,11 @@ use tracing::{instrument, warn};
use super::{Error, SlidingSyncInternalMessage};
use crate::Result;
/// Holding a specific filtered list within the concept of Sliding Sync.
/// The type used to express natural bounds (including but not limited to:
/// ranges, timeline limit) in the sliding sync SDK.
pub type Bound = u32;
/// Holding a specific filtered list within the concept of sliding sync.
///
/// It is OK to clone this type as much as you need: cloning it is cheap.
#[derive(Clone, Debug)]
@@ -64,10 +68,7 @@ impl SlidingSyncList {
}
/// Set the ranges to fetch.
pub fn set_ranges<U>(&self, ranges: &[(U, U)]) -> Result<(), Error>
where
U: Into<UInt> + Copy,
{
pub fn set_ranges(&self, ranges: &[RangeInclusive<Bound>]) -> Result<(), Error> {
if self.inner.sync_mode.ranges_can_be_modified_by_user().not() {
return Err(Error::CannotModifyRanges(self.name().to_owned()));
}
@@ -79,30 +80,24 @@ impl SlidingSyncList {
}
/// Reset the ranges to a particular set.
pub fn set_range<U>(&self, start: U, end: U) -> Result<(), Error>
where
U: Into<UInt> + Copy,
{
pub fn set_range(&self, range: RangeInclusive<Bound>) -> Result<(), Error> {
if self.inner.sync_mode.ranges_can_be_modified_by_user().not() {
return Err(Error::CannotModifyRanges(self.name().to_owned()));
}
self.inner.set_ranges(&[(start, end)]);
self.inner.set_ranges(&[range]);
self.reset()?;
Ok(())
}
/// Set the ranges to fetch.
pub fn add_range<U>(&self, (start, end): (U, U)) -> Result<(), Error>
where
U: Into<UInt>,
{
pub fn add_range(&self, range: RangeInclusive<Bound>) -> Result<(), Error> {
if self.inner.sync_mode.ranges_can_be_modified_by_user().not() {
return Err(Error::CannotModifyRanges(self.name().to_owned()));
}
self.inner.add_range((start, end));
self.inner.add_range(range);
self.reset()?;
Ok(())
@@ -119,7 +114,7 @@ impl SlidingSyncList {
return Err(Error::CannotModifyRanges(self.name().to_owned()));
}
self.inner.set_ranges::<UInt>(&[]);
self.inner.set_ranges(&[]);
self.reset()?;
Ok(())
@@ -136,17 +131,12 @@ impl SlidingSyncList {
}
/// Get the timeline limit.
pub fn timeline_limit(&self) -> Option<UInt> {
pub fn timeline_limit(&self) -> Option<Bound> {
**self.inner.timeline_limit.read().unwrap()
}
/// Set timeline limit.
pub fn set_timeline_limit<U>(&self, timeline: Option<U>)
where
U: Into<UInt>,
{
let timeline = timeline.map(Into::into);
pub fn set_timeline_limit(&self, timeline: Option<Bound>) {
Observable::set(&mut self.inner.timeline_limit.write().unwrap(), timeline);
}
@@ -264,7 +254,7 @@ pub(super) struct SlidingSyncListInner {
filters: Option<v4::SyncRequestListFilters>,
/// The maximum number of timeline events to query for.
timeline_limit: StdRwLock<Observable<Option<UInt>>>,
timeline_limit: StdRwLock<Observable<Option<Bound>>>,
/// The total number of rooms that is possible to interact with for the
/// given list.
@@ -279,7 +269,7 @@ pub(super) struct SlidingSyncListInner {
room_list: StdRwLock<ObservableVector<RoomListEntry>>,
/// The ranges windows of the list.
ranges: StdRwLock<Observable<Vec<(UInt, UInt)>>>,
ranges: StdRwLock<Observable<Vec<RangeInclusive<Bound>>>>,
/// The request generator, i.e. a type that yields the appropriate list
/// request. See [`SlidingSyncListRequestGenerator`] to learn more.
@@ -290,28 +280,33 @@ pub(super) struct SlidingSyncListInner {
impl SlidingSyncListInner {
/// Reset and add new ranges.
fn set_ranges<U>(&self, ranges: &[(U, U)])
where
U: Into<UInt> + Copy,
{
let ranges = ranges.iter().map(|(start, end)| ((*start).into(), (*end).into())).collect();
Observable::set(&mut self.ranges.write().unwrap(), ranges);
fn set_ranges(&self, ranges: &[RangeInclusive<Bound>]) {
Observable::set(&mut self.ranges.write().unwrap(), ranges.to_vec());
}
/// Add a new range.
fn add_range<U>(&self, (start, end): (U, U))
where
U: Into<UInt>,
{
fn add_range(&self, range: RangeInclusive<Bound>) {
Observable::update(&mut self.ranges.write().unwrap(), |ranges| {
ranges.push((start.into(), end.into()));
ranges.push(RangeInclusive::new(*range.start(), *range.end()));
});
}
/// Call this method when it's necessary to reset `Self`.
fn reset(&self) {
self.request_generator.write().unwrap().reset();
Observable::set(&mut self.state.write().unwrap(), SlidingSyncState::NotLoaded);
{
let mut state = self.state.write().unwrap();
let next_state = match **state {
SlidingSyncState::NotLoaded => SlidingSyncState::NotLoaded,
SlidingSyncState::Preloaded => SlidingSyncState::Preloaded,
SlidingSyncState::PartiallyLoaded | SlidingSyncState::FullyLoaded => {
SlidingSyncState::PartiallyLoaded
}
};
Observable::set(&mut state, next_state);
}
}
// Update the state to the next request, and return it.
@@ -380,10 +375,18 @@ impl SlidingSyncListInner {
/// state of the request generator.
#[instrument(skip(self), fields(name = self.name, ranges = ?&self.ranges))]
fn request(&self) -> v4::SyncRequestList {
let ranges = self.request_generator.read().unwrap().ranges.clone();
use ruma::UInt;
let ranges = self
.request_generator
.read()
.unwrap()
.ranges
.iter()
.map(|r| (UInt::from(*r.start()), UInt::from(*r.end())))
.collect();
let sort = self.sort.clone();
let required_state = self.required_state.clone();
let timeline_limit = **self.timeline_limit.read().unwrap();
let timeline_limit = self.timeline_limit.read().unwrap().map(UInt::from);
let filters = self.filters.clone();
assign!(v4::SyncRequestList::default(), {
@@ -469,9 +472,10 @@ impl SlidingSyncListInner {
// message). Let's trigger those.
let ranges = self.ranges.read().unwrap();
for (start, end) in ranges.iter().map(|(start, end)| {
(usize::try_from(*start).unwrap(), usize::try_from(*end).unwrap())
}) {
for (start, end) in ranges
.iter()
.map(|r| (usize::try_from(*r.start()).unwrap(), usize::try_from(*r.end()).unwrap()))
{
let mut rooms_to_update =
Vec::with_capacity(rooms_that_have_received_an_update.len());
@@ -508,11 +512,10 @@ impl SlidingSyncListInner {
fn update_request_generator_state(&self, maximum_number_of_rooms: u32) -> Result<(), Error> {
let mut request_generator = self.request_generator.write().unwrap();
let range_end: u32 = request_generator
.ranges
.first()
.map(|(_start, end)| u32::try_from(*end).unwrap())
.ok_or_else(|| Error::RequestGeneratorHasNotBeenInitialized(self.name.to_owned()))?;
let range_end: u32 =
request_generator.ranges.first().map(|r| *r.end()).ok_or_else(|| {
Error::RequestGeneratorHasNotBeenInitialized(self.name.to_owned())
})?;
match &mut request_generator.kind {
SlidingSyncListRequestGeneratorKind::Paging {
@@ -554,7 +557,7 @@ impl SlidingSyncListInner {
// Update the _list range_ to cover from 0 to `range_end`.
// The list's range is different from the request generator (this) range.
self.set_ranges(&[(0, range_end)]);
self.set_ranges(&[0..=range_end]);
// Finally, let's update the list' state.
Observable::set_if_not_eq(
@@ -572,7 +575,7 @@ impl SlidingSyncListInner {
*fully_loaded = true;
// The range is covering the entire list, from 0 to its maximum.
self.set_ranges(&[(0, range_maximum)]);
self.set_ranges(&[0..=range_maximum]);
// Finally, let's update the list' state.
Observable::set_if_not_eq(
@@ -908,40 +911,29 @@ mod tests {
};
}
macro_rules! ranges {
( $( ( $start:literal, $end:literal ) ),* $(,)* ) => {
&[$(
(
uint!($start),
uint!($end),
)
),+]
}
}
#[test]
fn test_sliding_sync_list_set_ranges() {
let (sender, _receiver) = channel(1);
let list = SlidingSyncList::builder("foo")
.sync_mode(SlidingSyncMode::Selective)
.ranges(ranges![(0, 1), (2, 3)].to_vec())
.ranges(vec![0..=1, 2..=3])
.build(sender);
{
let lock = list.inner.ranges.read().unwrap();
let ranges = Observable::get(&lock);
assert_eq!(ranges, &ranges![(0, 1), (2, 3)]);
assert_eq!(ranges, &[0..=1, 2..=3]);
}
list.set_ranges(ranges![(4, 5), (6, 7)]).unwrap();
list.set_ranges(&[4..=5, 6..=7]).unwrap();
{
let lock = list.inner.ranges.read().unwrap();
let ranges = Observable::get(&lock);
assert_eq!(ranges, &ranges![(4, 5), (6, 7)]);
assert_eq!(ranges, &[4..=5, 6..=7]);
}
}
@@ -953,23 +945,23 @@ mod tests {
{
let list = SlidingSyncList::builder("foo")
.sync_mode(SlidingSyncMode::Selective)
.ranges(ranges![(0, 1), (2, 3)].to_vec())
.ranges(vec![0..=1, 2..=3])
.build(sender.clone());
{
let lock = list.inner.ranges.read().unwrap();
let ranges = Observable::get(&lock);
assert_eq!(ranges, &ranges![(0, 1), (2, 3)]);
assert_eq!(ranges, &[0..=1, 2..=3]);
}
list.set_range(4u32, 5).unwrap();
list.set_range(4..=5).unwrap();
{
let lock = list.inner.ranges.read().unwrap();
let ranges = Observable::get(&lock);
assert_eq!(ranges, &ranges![(4, 5)]);
assert_eq!(ranges, &[4..=5]);
}
}
@@ -979,7 +971,7 @@ mod tests {
.sync_mode(SlidingSyncMode::Growing)
.build(sender.clone());
assert!(list.set_range(4u32, 5).is_err());
assert!(list.set_range(4..=5).is_err());
}
// Set range on `Paging`.
@@ -987,7 +979,7 @@ mod tests {
let list =
SlidingSyncList::builder("foo").sync_mode(SlidingSyncMode::Paging).build(sender);
assert!(list.set_range(4u32, 5).is_err());
assert!(list.set_range(4..=5).is_err());
}
}
@@ -999,23 +991,23 @@ mod tests {
{
let list = SlidingSyncList::builder("foo")
.sync_mode(SlidingSyncMode::Selective)
.ranges(ranges![(0, 1)].to_vec())
.ranges(vec![0..=1])
.build(sender.clone());
{
let lock = list.inner.ranges.read().unwrap();
let ranges = Observable::get(&lock);
assert_eq!(ranges, &ranges![(0, 1)]);
assert_eq!(ranges, &[0..=1]);
}
list.add_range((2u32, 3)).unwrap();
list.add_range(2..=3).unwrap();
{
let lock = list.inner.ranges.read().unwrap();
let ranges = Observable::get(&lock);
assert_eq!(ranges, &ranges![(0, 1), (2, 3)]);
assert_eq!(ranges, &[0..=1, 2..=3]);
}
}
@@ -1025,7 +1017,7 @@ mod tests {
.sync_mode(SlidingSyncMode::Growing)
.build(sender.clone());
assert!(list.add_range((2u32, 3)).is_err());
assert!(list.add_range(2..=3).is_err());
}
// Add range on `Paging`.
@@ -1033,7 +1025,7 @@ mod tests {
let list =
SlidingSyncList::builder("foo").sync_mode(SlidingSyncMode::Paging).build(sender);
assert!(list.add_range((2u32, 3)).is_err());
assert!(list.add_range(2..=3).is_err());
}
}
@@ -1045,14 +1037,14 @@ mod tests {
{
let list = SlidingSyncList::builder("foo")
.sync_mode(SlidingSyncMode::Selective)
.ranges(ranges![(0, 1)].to_vec())
.ranges(vec![0..=1])
.build(sender.clone());
{
let lock = list.inner.ranges.read().unwrap();
let ranges = Observable::get(&lock);
assert_eq!(ranges, &ranges![(0, 1)]);
assert_eq!(ranges, &[0..=1]);
}
list.reset_ranges().unwrap();
@@ -1089,27 +1081,27 @@ mod tests {
let list = SlidingSyncList::builder("foo")
.sync_mode(SlidingSyncMode::Selective)
.ranges(ranges![(0, 1)].to_vec())
.timeline_limit(7u32)
.ranges(vec![0..=1])
.timeline_limit(7)
.build(sender);
{
let lock = list.inner.timeline_limit.read().unwrap();
let timeline_limit = Observable::get(&lock);
assert_eq!(timeline_limit, &Some(uint!(7)));
assert_eq!(timeline_limit, &Some(7));
}
list.set_timeline_limit(Some(42u32));
list.set_timeline_limit(Some(42));
{
let lock = list.inner.timeline_limit.read().unwrap();
let timeline_limit = Observable::get(&lock);
assert_eq!(timeline_limit, &Some(uint!(42)));
assert_eq!(timeline_limit, &Some(42));
}
list.set_timeline_limit::<UInt>(None);
list.set_timeline_limit(None);
{
let lock = list.inner.timeline_limit.read().unwrap();
@@ -1125,7 +1117,7 @@ mod tests {
let mut list = SlidingSyncList::builder("foo")
.sync_mode(SlidingSyncMode::Selective)
.add_range(0u32, 1)
.add_range(0..=1)
.build(sender);
let room0 = room_id!("!room0:bar.org");
@@ -1376,7 +1368,7 @@ mod tests {
let mut list = SlidingSyncList::builder("testing")
.sync_mode(crate::SlidingSyncMode::Selective)
.ranges(ranges![(0, 10), (42, 153)].to_vec())
.ranges(vec![0..=10, 42..=153])
.build(sender);
assert_ranges! {
@@ -1409,7 +1401,7 @@ mod tests {
let mut list = SlidingSyncList::builder("testing")
.sync_mode(crate::SlidingSyncMode::Selective)
.ranges(ranges![(0, 10), (42, 153)].to_vec())
.ranges(vec![0..=10, 42..=153].to_vec())
.build(sender);
assert_ranges! {
@@ -1435,11 +1427,11 @@ mod tests {
}
};
list.set_ranges(&[(3u32, 7)]).unwrap();
list.set_ranges(&[3..=7]).unwrap();
assert_ranges! {
list = list,
list_state = NotLoaded,
list_state = PartiallyLoaded,
maximum_number_of_rooms = 25,
next => {
ranges = [3; 7],
@@ -1448,11 +1440,11 @@ mod tests {
},
};
list.add_range((10u32, 23)).unwrap();
list.add_range(10..=23).unwrap();
assert_ranges! {
list = list,
list_state = NotLoaded,
list_state = PartiallyLoaded,
maximum_number_of_rooms = 25,
next => {
ranges = [3; 7], [10; 23],
@@ -1461,11 +1453,11 @@ mod tests {
},
};
list.set_range(42u32, 77).unwrap();
list.set_range(42..=77).unwrap();
assert_ranges! {
list = list,
list_state = NotLoaded,
list_state = PartiallyLoaded,
maximum_number_of_rooms = 25,
next => {
ranges = [42; 77],
@@ -1478,12 +1470,12 @@ mod tests {
assert_ranges! {
list = list,
list_state = NotLoaded,
list_state = PartiallyLoaded,
maximum_number_of_rooms = 25,
next => {
ranges = ,
is_fully_loaded = true,
list_state = NotLoaded, // Is this correct?
list_state = PartiallyLoaded,
},
};
}
@@ -1495,7 +1487,7 @@ mod tests {
let mut list = SlidingSyncList::builder("foo")
.sync_mode(SlidingSyncMode::Selective)
.add_range(0u32, 3)
.add_range(0..=3)
.build(sender);
assert_eq!(**list.inner.maximum_number_of_rooms.read().unwrap(), None);

View File

@@ -29,10 +29,9 @@
//! user-specified limit representing the maximum number of rooms the user
//! actually wants to load.
use std::cmp::min;
use ruma::UInt;
use std::{cmp::min, ops::RangeInclusive};
use super::Bound;
use crate::sliding_sync::Error;
/// The kind of request generator.
@@ -72,7 +71,7 @@ pub(super) enum SlidingSyncListRequestGeneratorKind {
#[derive(Debug)]
pub(in super::super) struct SlidingSyncListRequestGenerator {
/// The current ranges used by this request generator.
pub(super) ranges: Vec<(UInt, UInt)>,
pub(super) ranges: Vec<RangeInclusive<u32>>,
/// The kind of request generator.
pub(super) kind: SlidingSyncListRequestGeneratorKind,
}
@@ -154,7 +153,7 @@ pub(super) fn create_range(
desired_size: u32,
maximum_number_of_rooms_to_fetch: Option<u32>,
maximum_number_of_rooms: Option<u32>,
) -> Result<(UInt, UInt), Error> {
) -> Result<RangeInclusive<Bound>, Error> {
// Calculate the range.
// The `start` bound is given. Let's calculate the `end` bound.
@@ -185,30 +184,28 @@ pub(super) fn create_range(
return Err(Error::InvalidRange { start, end });
}
Ok((start.into(), end.into()))
Ok(RangeInclusive::new(start, end))
}
#[cfg(test)]
mod tests {
use ruma::uint;
use super::*;
#[test]
fn test_create_range_from() {
// From 0, we want 100 items.
assert_eq!(create_range(0, 100, None, None), Ok((uint!(0), uint!(99))));
assert_eq!(create_range(0, 100, None, None), Ok(0..=99));
// From 100, we want 100 items.
assert_eq!(create_range(100, 100, None, None), Ok((uint!(100), uint!(199))));
assert_eq!(create_range(100, 100, None, None), Ok(100..=199));
// From 0, we want 100 items, but there is a maximum number of rooms to fetch
// defined at 50.
assert_eq!(create_range(0, 100, Some(50), None), Ok((uint!(0), uint!(49))));
assert_eq!(create_range(0, 100, Some(50), None), Ok(0..=49));
// From 49, we want 100 items, but there is a maximum number of rooms to fetch
// defined at 50. There is 1 item to load.
assert_eq!(create_range(49, 100, Some(50), None), Ok((uint!(49), uint!(49))));
assert_eq!(create_range(49, 100, Some(50), None), Ok(49..=49));
// From 50, we want 100 items, but there is a maximum number of rooms to fetch
// defined at 50.
@@ -219,11 +216,11 @@ mod tests {
// From 0, we want 100 items, but there is a maximum number of rooms defined at
// 50.
assert_eq!(create_range(0, 100, None, Some(50)), Ok((uint!(0), uint!(49))));
assert_eq!(create_range(0, 100, None, Some(50)), Ok(0..=49));
// From 49, we want 100 items, but there is a maximum number of rooms defined at
// 50. There is 1 item to load.
assert_eq!(create_range(49, 100, None, Some(50)), Ok((uint!(49), uint!(49))));
assert_eq!(create_range(49, 100, None, Some(50)), Ok(49..=49));
// From 50, we want 100 items, but there is a maximum number of rooms defined at
// 50.
@@ -234,10 +231,10 @@ mod tests {
// From 0, we want 100 items, but there is a maximum number of rooms to fetch
// defined at 75, and a maximum number of rooms defined at 50.
assert_eq!(create_range(0, 100, Some(75), Some(50)), Ok((uint!(0), uint!(49))));
assert_eq!(create_range(0, 100, Some(75), Some(50)), Ok(0..=49));
// From 0, we want 100 items, but there is a maximum number of rooms to fetch
// defined at 50, and a maximum number of rooms defined at 75.
assert_eq!(create_range(0, 100, Some(50), Some(75)), Ok((uint!(0), uint!(49))));
assert_eq!(create_range(0, 100, Some(50), Some(75)), Ok(0..=49));
}
}

View File

@@ -44,8 +44,8 @@ async fn it_works_smoke_test() -> anyhow::Result<()> {
.add_list(
SlidingSyncList::builder("foo")
.sync_mode(SlidingSyncMode::Selective)
.add_range(0u32, 10)
.timeline_limit(0u32),
.add_range(0..=10)
.timeline_limit(0),
)
.build()
.await?;
@@ -70,8 +70,8 @@ async fn modifying_timeline_limit() -> anyhow::Result<()> {
.add_list(
SlidingSyncList::builder("init_list")
.sync_mode(SlidingSyncMode::Selective)
.add_range(0u32, 1)
.timeline_limit(0u32)
.add_range(0..=1)
.timeline_limit(0)
.build(),
)
.build()
@@ -127,8 +127,8 @@ async fn modifying_timeline_limit() -> anyhow::Result<()> {
.add_list(
SlidingSyncList::builder("visible_room_list")
.sync_mode(SlidingSyncMode::Selective)
.add_range(0u32, 1)
.timeline_limit(1u32)
.add_range(0..=1)
.timeline_limit(1)
.build(),
)
.build()
@@ -191,7 +191,7 @@ async fn modifying_timeline_limit() -> anyhow::Result<()> {
// Sync to receive messages with a `timeline_limit` set to 20.
{
list.set_timeline_limit(Some(uint!(20)));
list.set_timeline_limit(Some(20));
let mut update_summary;
@@ -282,7 +282,7 @@ async fn adding_list_later() -> anyhow::Result<()> {
let build_list = |name| {
SlidingSyncList::builder(name)
.sync_mode(SlidingSyncMode::Selective)
.set_range(0u32, 10u32)
.set_range(0..=10)
.sort(vec!["by_recency".to_owned(), "by_name".to_owned()])
.build()
};
@@ -367,7 +367,7 @@ async fn live_lists() -> anyhow::Result<()> {
let build_list = |name| {
SlidingSyncList::builder(name)
.sync_mode(SlidingSyncMode::Selective)
.set_range(0u32, 10u32)
.set_range(0..=10)
.sort(vec!["by_recency".to_owned(), "by_name".to_owned()])
.build()
};
@@ -477,13 +477,13 @@ async fn list_goes_live() -> anyhow::Result<()> {
let (_client, sync_proxy_builder) = random_setup_with_rooms(21).await?;
let sliding_window_list = SlidingSyncList::builder("sliding")
.sync_mode(SlidingSyncMode::Selective)
.set_range(0u32, 10u32)
.set_range(0..=10)
.sort(vec!["by_recency".to_owned(), "by_name".to_owned()])
.build();
let full = SlidingSyncList::builder("full")
.sync_mode(SlidingSyncMode::Growing)
.full_sync_batch_size(10u32)
.full_sync_batch_size(10)
.sort(vec!["by_recency".to_owned(), "by_name".to_owned()])
.build();
let sync_proxy =
@@ -540,7 +540,7 @@ async fn resizing_sliding_window() -> anyhow::Result<()> {
let (_client, sync_proxy_builder) = random_setup_with_rooms(20).await?;
let sliding_window_list = SlidingSyncList::builder("sliding")
.sync_mode(SlidingSyncMode::Selective)
.set_range(0u32, 10u32)
.set_range(0..=10)
.sort(vec!["by_recency".to_owned(), "by_name".to_owned()])
.build();
let sync_proxy = sync_proxy_builder.add_list(sliding_window_list).build().await?;
@@ -567,7 +567,7 @@ async fn resizing_sliding_window() -> anyhow::Result<()> {
// let's move the window
list.set_range(1u32, 10).unwrap();
list.set_range(1..=10).unwrap();
// Ensure 0-0 invalidation ranges work.
for _n in 0..2 {
@@ -590,7 +590,7 @@ async fn resizing_sliding_window() -> anyhow::Result<()> {
.collect::<Vec<_>>()
);
list.set_range(5u32, 10).unwrap();
list.set_range(5..=10).unwrap();
for _n in 0..2 {
let room_summary = stream.next().await.context("sync has closed unexpectedly")?;
@@ -614,7 +614,7 @@ async fn resizing_sliding_window() -> anyhow::Result<()> {
// let's move the window
list.set_range(5u32, 15).unwrap();
list.set_range(5..=15).unwrap();
for _n in 0..2 {
let room_summary = stream.next().await.context("sync has closed unexpectedly")?;
@@ -643,7 +643,7 @@ async fn moving_out_of_sliding_window() -> anyhow::Result<()> {
let (client, sync_proxy_builder) = random_setup_with_rooms(20).await?;
let sliding_window_list = SlidingSyncList::builder("sliding")
.sync_mode(SlidingSyncMode::Selective)
.set_range(1u32, 10u32)
.set_range(1..=10)
.sort(vec!["by_recency".to_owned(), "by_name".to_owned()])
.build();
let sync_proxy = sync_proxy_builder.add_list(sliding_window_list).build().await?;
@@ -670,7 +670,7 @@ async fn moving_out_of_sliding_window() -> anyhow::Result<()> {
// let's move the window
list.set_range(0u32, 10).unwrap();
list.set_range(0..=10).unwrap();
for _n in 0..2 {
let room_summary = stream.next().await.context("sync has closed unexpectedly")?;
@@ -693,7 +693,7 @@ async fn moving_out_of_sliding_window() -> anyhow::Result<()> {
// let's move the window again
list.set_range(2u32, 12).unwrap();
list.set_range(2..=12).unwrap();
for _n in 0..2 {
let room_summary = stream.next().await.context("sync has closed unexpectedly")?;
@@ -751,7 +751,7 @@ async fn moving_out_of_sliding_window() -> anyhow::Result<()> {
// let's move the window again
list.set_range(0u32, 10).unwrap();
list.set_range(0..=10).unwrap();
for _n in 0..2 {
let room_summary = stream.next().await.context("sync has closed unexpectedly")?;
@@ -787,7 +787,7 @@ async fn fast_unfreeze() -> anyhow::Result<()> {
let build_lists = || {
let sliding_window_list = SlidingSyncList::builder("sliding")
.sync_mode(SlidingSyncMode::Selective)
.set_range(1u32, 10u32)
.set_range(1..=10)
.sort(vec!["by_recency".to_owned(), "by_name".to_owned()])
.build();
let growing_sync = SlidingSyncList::builder("growing")
@@ -848,7 +848,7 @@ async fn growing_sync_keeps_going() -> anyhow::Result<()> {
let (_client, sync_proxy_builder) = random_setup_with_rooms(20).await?;
let growing_sync = SlidingSyncList::builder("growing")
.sync_mode(SlidingSyncMode::Growing)
.full_sync_batch_size(5u32)
.full_sync_batch_size(5)
.sort(vec!["by_recency".to_owned(), "by_name".to_owned()])
.build();
@@ -892,7 +892,7 @@ async fn continue_on_reset() -> anyhow::Result<()> {
print!("setup took its time");
let growing_sync = SlidingSyncList::builder("growing")
.sync_mode(SlidingSyncMode::Growing)
.full_sync_batch_size(5u32)
.full_sync_batch_size(5)
.full_sync_maximum_number_of_rooms_to_fetch(100)
.sort(vec!["by_recency".to_owned(), "by_name".to_owned()])
.build();
@@ -976,7 +976,7 @@ async fn noticing_new_rooms_in_growing() -> anyhow::Result<()> {
print!("setup took its time");
let growing_sync = SlidingSyncList::builder("growing")
.sync_mode(SlidingSyncMode::Growing)
.full_sync_batch_size(10u32)
.full_sync_batch_size(10)
.full_sync_maximum_number_of_rooms_to_fetch(100)
.sort(vec!["by_recency".to_owned(), "by_name".to_owned()])
.build();
@@ -1053,7 +1053,7 @@ async fn restart_room_resubscription() -> anyhow::Result<()> {
.add_list(
SlidingSyncList::builder("sliding_list")
.sync_mode(SlidingSyncMode::Selective)
.set_range(0u32, 2u32)
.set_range(0..=2)
.sort(vec!["by_recency".to_owned(), "by_name".to_owned()])
.build(),
)
@@ -1079,7 +1079,7 @@ async fn restart_room_resubscription() -> anyhow::Result<()> {
// let's move the window
list.set_range(1u32, 2).unwrap();
list.set_range(1..=2).unwrap();
for _n in 0..2 {
let room_summary = stream.next().await.context("sync has closed unexpectedly")??;
@@ -1211,7 +1211,7 @@ async fn receipts_extension_works() -> anyhow::Result<()> {
let (client, sync_proxy_builder) = random_setup_with_rooms(1).await?;
let list = SlidingSyncList::builder("a")
.sync_mode(SlidingSyncMode::Selective)
.ranges(vec![(0u32, 1u32)])
.ranges(vec![(0..=1)])
.sort(vec!["by_recency".to_owned()])
.build();