fix(sdk): Prevent bugs, remove expensive clones, and simplify SlidingSyncList

fix(sdk): Prevent bugs, remove expensive clones, and simplify `SlidingSyncList`
This commit is contained in:
Ivan Enderlin
2023-03-22 16:39:30 +01:00
committed by GitHub
7 changed files with 886 additions and 835 deletions

View File

@@ -1,7 +1,6 @@
use std::sync::{Arc, RwLock};
use anyhow::Context;
use eyeball::unique::Observable;
use eyeball_im::VectorDiff;
use futures_util::{future::join, pin_mut, StreamExt};
use matrix_sdk::ruma::{
@@ -540,8 +539,8 @@ impl SlidingSyncList {
&self,
observer: Box<dyn SlidingSyncListRoomItemsObserver>,
) -> Arc<TaskHandle> {
let mut rooms_updated =
Observable::subscribe(&self.inner.rooms_updated_broadcast.read().unwrap());
let mut rooms_updated = self.inner.rooms_updated_broadcast_stream();
Arc::new(TaskHandle::new(RUNTIME.spawn(async move {
loop {
if rooms_updated.next().await.is_some() {
@@ -602,19 +601,17 @@ impl SlidingSyncList {
/// The current timeline limit
pub fn get_timeline_limit(&self) -> Option<u32> {
(**self.inner.timeline_limit.read().unwrap())
.map(|limit| u32::try_from(limit).unwrap_or_default())
self.inner.timeline_limit().map(|limit| u32::try_from(limit).unwrap_or_default())
}
/// The current timeline limit
pub fn set_timeline_limit(&self, value: u32) {
let value = Some(UInt::try_from(value).unwrap());
Observable::set(&mut self.inner.timeline_limit.write().unwrap(), value);
self.inner.set_timeline_limit(Some(value))
}
/// Unset the current timeline limit
pub fn unset_timeline_limit(&self) {
Observable::set(&mut self.inner.timeline_limit.write().unwrap(), None);
self.inner.set_timeline_limit::<UInt>(None)
}
}

View File

@@ -104,7 +104,7 @@ impl SlidingSyncBuilder {
///
/// Replace any list with the name.
pub fn add_list(mut self, list: SlidingSyncList) -> Self {
self.lists.insert(list.name.clone(), list);
self.lists.insert(list.name().to_owned(), list);
self
}

View File

@@ -9,7 +9,10 @@ use eyeball::unique::Observable;
use eyeball_im::ObservableVector;
use ruma::{api::client::sync::sync_events::v4, events::StateEventType, UInt};
use super::{Error, SlidingSyncList, SlidingSyncMode, SlidingSyncState};
use super::{
Error, SlidingSyncList, SlidingSyncListInner, SlidingSyncListRequestGenerator, SlidingSyncMode,
SlidingSyncState,
};
use crate::Result;
/// The default name for the full sync list.
@@ -147,28 +150,49 @@ impl SlidingSyncListBuilder {
/// Build the list.
pub fn build(self) -> Result<SlidingSyncList> {
Ok(SlidingSyncList {
//
// From the builder
sync_mode: self.sync_mode,
sort: self.sort,
required_state: self.required_state,
full_sync_batch_size: self.full_sync_batch_size,
full_sync_maximum_number_of_rooms_to_fetch: self
.full_sync_maximum_number_of_rooms_to_fetch,
send_updates_for_items: self.send_updates_for_items,
filters: self.filters,
timeline_limit: Arc::new(StdRwLock::new(Observable::new(self.timeline_limit))),
name: self.name.ok_or(Error::BuildMissingField("name"))?,
ranges: Arc::new(StdRwLock::new(Observable::new(self.ranges))),
let request_generator = match &self.sync_mode {
SlidingSyncMode::PagingFullSync => {
SlidingSyncListRequestGenerator::new_paging_full_sync(
self.full_sync_batch_size,
self.full_sync_maximum_number_of_rooms_to_fetch,
)
}
//
// Default values for the type we are building.
state: Arc::new(StdRwLock::new(Observable::new(SlidingSyncState::default()))),
maximum_number_of_rooms: Arc::new(StdRwLock::new(Observable::new(None))),
rooms_list: Arc::new(StdRwLock::new(ObservableVector::new())),
is_cold: Arc::new(AtomicBool::new(false)),
rooms_updated_broadcast: Arc::new(StdRwLock::new(Observable::new(()))),
SlidingSyncMode::GrowingFullSync => {
SlidingSyncListRequestGenerator::new_growing_full_sync(
self.full_sync_batch_size,
self.full_sync_maximum_number_of_rooms_to_fetch,
)
}
SlidingSyncMode::Selective => SlidingSyncListRequestGenerator::new_selective(),
};
Ok(SlidingSyncList {
inner: Arc::new(SlidingSyncListInner {
// From the builder
sync_mode: self.sync_mode,
sort: self.sort,
required_state: self.required_state,
full_sync_batch_size: self.full_sync_batch_size,
full_sync_maximum_number_of_rooms_to_fetch: self
.full_sync_maximum_number_of_rooms_to_fetch,
send_updates_for_items: self.send_updates_for_items,
filters: self.filters,
timeline_limit: StdRwLock::new(Observable::new(self.timeline_limit)),
name: self.name.ok_or(Error::BuildMissingField("name"))?,
ranges: StdRwLock::new(Observable::new(self.ranges)),
// Computed from the builder.
request_generator: StdRwLock::new(request_generator),
// Default values for the type we are building.
state: StdRwLock::new(Observable::new(SlidingSyncState::default())),
maximum_number_of_rooms: StdRwLock::new(Observable::new(None)),
rooms_list: StdRwLock::new(ObservableVector::new()),
is_cold: AtomicBool::new(false),
rooms_updated_broadcast: StdRwLock::new(Observable::new(())),
}),
})
}
}

View File

File diff suppressed because it is too large Load Diff

View File

@@ -31,246 +31,101 @@
use std::cmp::min;
use eyeball::unique::Observable;
use ruma::{api::client::sync::sync_events::v4, assign, OwnedRoomId, UInt};
use tracing::{error, instrument};
use super::{Error, SlidingSyncList, SlidingSyncState};
use ruma::UInt;
/// The kind of request generator.
#[derive(Debug)]
enum GeneratorKind {
// Growing-mode (see [`SlidingSyncMode`]).
pub(super) enum SlidingSyncListRequestGeneratorKind {
/// Growing-mode (see [`SlidingSyncMode`]).
GrowingFullSync {
// Number of fetched rooms.
number_of_fetched_rooms: u32,
// Size of the batch, used to grow the range to fetch more rooms.
/// Size of the batch, used to grow the range to fetch more rooms.
batch_size: u32,
// Maximum number of rooms to fetch (see
// [`SlidingSyncList::full_sync_maximum_number_of_rooms_to_fetch`]).
/// Maximum number of rooms to fetch (see
/// [`SlidingSyncList::full_sync_maximum_number_of_rooms_to_fetch`]).
maximum_number_of_rooms_to_fetch: Option<u32>,
// Whether all rooms have been loaded.
/// Number of rooms that have been already fetched.
number_of_fetched_rooms: u32,
/// Whether all rooms have been loaded.
fully_loaded: bool,
},
// Paging-mode (see [`SlidingSyncMode`]).
/// Paging-mode (see [`SlidingSyncMode`]).
PagingFullSync {
// Number of fetched rooms.
number_of_fetched_rooms: u32,
// Size of the batch, used to grow the range to fetch more rooms.
/// Size of the batch, used to grow the range to fetch more rooms.
batch_size: u32,
// Maximum number of rooms to fetch (see
// [`SlidingSyncList::full_sync_maximum_number_of_rooms_to_fetch`]).
/// Maximum number of rooms to fetch (see
/// [`SlidingSyncList::full_sync_maximum_number_of_rooms_to_fetch`]).
maximum_number_of_rooms_to_fetch: Option<u32>,
// Whether all romms have been loaded.
/// Number of rooms that have been already fetched.
number_of_fetched_rooms: u32,
/// Whether all romms have been loaded.
fully_loaded: bool,
},
// Selective-mode (see [`SlidingSyncMode`]).
/// Selective-mode (see [`SlidingSyncMode`]).
Selective,
}
/// A request generator for [`SlidingSyncList`].
#[derive(Debug)]
pub(in super::super) struct SlidingSyncListRequestGenerator {
/// The parent [`SlidingSyncList`] object that has created this request
/// generator.
list: SlidingSyncList,
/// The current range used by this request generator.
ranges: Vec<(UInt, UInt)>,
pub(super) ranges: Vec<(UInt, UInt)>,
/// The kind of request generator.
kind: GeneratorKind,
pub(super) kind: SlidingSyncListRequestGeneratorKind,
}
impl SlidingSyncListRequestGenerator {
/// Create a new request generator configured for paging-mode.
pub(super) fn new_with_paging_full_sync(list: SlidingSyncList) -> Self {
let batch_size = list.full_sync_batch_size;
let maximum_number_of_rooms_to_fetch = list.full_sync_maximum_number_of_rooms_to_fetch;
// If a range exists, let's consider it's been used to load existing room. So
// let's start from the end of the range. It can be useful when we resume a sync
// for example. Otherwise let's use the default value.
let number_of_fetched_rooms = list
.ranges
.read()
.unwrap()
.first()
.map(|(_start, end)| u32::try_from(*end).unwrap().saturating_add(1))
.unwrap_or_default();
pub(super) fn new_paging_full_sync(
batch_size: u32,
maximum_number_of_rooms_to_fetch: Option<u32>,
) -> Self {
Self {
list,
ranges: Vec::new(),
kind: GeneratorKind::PagingFullSync {
number_of_fetched_rooms,
kind: SlidingSyncListRequestGeneratorKind::PagingFullSync {
batch_size,
maximum_number_of_rooms_to_fetch,
number_of_fetched_rooms: 0,
fully_loaded: false,
},
}
}
/// Create a new request generator configured for growing-mode.
pub(super) fn new_with_growing_full_sync(list: SlidingSyncList) -> Self {
let batch_size = list.full_sync_batch_size;
let maximum_number_of_rooms_to_fetch = list.full_sync_maximum_number_of_rooms_to_fetch;
// If a range exists, let's consider it's been used to load existing room. So
// let's start from the end of the range. It can be useful when we resume a sync
// for example. Otherwise let's use the default value.
let number_of_fetched_rooms = list
.ranges
.read()
.unwrap()
.first()
.map(|(_start, end)| u32::try_from(*end).unwrap().saturating_add(1))
.unwrap_or_default();
pub(super) fn new_growing_full_sync(
batch_size: u32,
maximum_number_of_rooms_to_fetch: Option<u32>,
) -> Self {
Self {
list,
ranges: Vec::new(),
kind: GeneratorKind::GrowingFullSync {
number_of_fetched_rooms,
kind: SlidingSyncListRequestGeneratorKind::GrowingFullSync {
batch_size,
maximum_number_of_rooms_to_fetch,
number_of_fetched_rooms: 0,
fully_loaded: false,
},
}
}
/// Create a new request generator configured for selective-mode.
pub(super) fn new_selective(list: SlidingSyncList) -> Self {
Self { list, ranges: Vec::new(), kind: GeneratorKind::Selective }
}
/// Build a [`SyncRequestList`][v4::SyncRequestList].
#[instrument(skip(self), fields(name = self.list.name, ranges = ?&self.ranges))]
fn build_request(&self) -> v4::SyncRequestList {
let sort = self.list.sort.clone();
let required_state = self.list.required_state.clone();
let timeline_limit = **self.list.timeline_limit.read().unwrap();
let filters = self.list.filters.clone();
assign!(v4::SyncRequestList::default(), {
ranges: self.ranges.clone(),
room_details: assign!(v4::RoomDetailsConfig::default(), {
required_state,
timeline_limit,
}),
sort,
filters,
})
}
// Handle the response from the server.
#[instrument(skip_all, fields(name = self.list.name, rooms_count, has_ops = !ops.is_empty()))]
pub(in super::super) fn handle_response(
&mut self,
maximum_number_of_rooms: u32,
ops: &Vec<v4::SyncOp>,
updated_rooms: &Vec<OwnedRoomId>,
) -> Result<bool, Error> {
let response =
self.list.handle_response(maximum_number_of_rooms, ops, &self.ranges, updated_rooms)?;
self.update_state(maximum_number_of_rooms);
Ok(response)
}
/// Update the state of the generator.
fn update_state(&mut self, maximum_number_of_rooms: u32) {
let Some(range_end) = self.ranges.first().map(|(_start, end)| u32::try_from(*end).unwrap()) else {
error!(name = self.list.name, "The request generator must have a range.");
return;
};
match &mut self.kind {
GeneratorKind::PagingFullSync {
number_of_fetched_rooms,
fully_loaded,
maximum_number_of_rooms_to_fetch,
..
}
| GeneratorKind::GrowingFullSync {
number_of_fetched_rooms,
fully_loaded,
maximum_number_of_rooms_to_fetch,
..
} => {
// Calculate the maximum bound for the range.
// At this step, the server has given us a maximum number of rooms for this
// list. That's our `range_maximum`.
let mut range_maximum = maximum_number_of_rooms;
// But maybe the user has defined a maximum number of rooms to fetch? In this
// case, let's take the minimum of the two.
if let Some(maximum_number_of_rooms_to_fetch) = maximum_number_of_rooms_to_fetch {
range_maximum = min(range_maximum, *maximum_number_of_rooms_to_fetch);
}
// Finally, ranges are inclusive!
range_maximum = range_maximum.saturating_sub(1);
// Now, we know what the maximum bound for the range is.
// The current range hasn't reached its maximum, let's continue.
if range_end < range_maximum {
// Update the _list range_ to cover from 0 to `range_end`.
// The list range is different from the request generator (this) range.
self.list.set_range(0, range_end);
// Update the number of fetched rooms forward. Do not forget that ranges are
// inclusive, so let's add 1.
*number_of_fetched_rooms = range_end.saturating_add(1);
// The list is still not fully loaded.
*fully_loaded = false;
// Finally, let's update the list' state.
Observable::update_eq(&mut self.list.state.write().unwrap(), |state| {
*state = SlidingSyncState::PartiallyLoaded;
});
}
// Otherwise the current range has reached its maximum, we switched to `FullyLoaded`
// mode.
else {
// The range is covering the entire list, from 0 to its maximum.
self.list.set_range(0, range_maximum);
// The number of fetched rooms is set to the maximum too.
*number_of_fetched_rooms = range_maximum;
// And we update the `fully_loaded` marker.
*fully_loaded = true;
// Finally, let's update the list' state.
Observable::update_eq(&mut self.list.state.write().unwrap(), |state| {
*state = SlidingSyncState::FullyLoaded;
});
}
}
GeneratorKind::Selective => {
// Selective mode always loads everything.
Observable::update_eq(&mut self.list.state.write().unwrap(), |state| {
*state = SlidingSyncState::FullyLoaded;
});
}
}
pub(super) fn new_selective() -> Self {
Self { ranges: Vec::new(), kind: SlidingSyncListRequestGeneratorKind::Selective }
}
#[cfg(test)]
fn is_fully_loaded(&self) -> bool {
pub(super) fn is_fully_loaded(&self) -> bool {
match self.kind {
GeneratorKind::PagingFullSync { fully_loaded, .. }
| GeneratorKind::GrowingFullSync { fully_loaded, .. } => fully_loaded,
GeneratorKind::Selective => true,
SlidingSyncListRequestGeneratorKind::PagingFullSync { fully_loaded, .. }
| SlidingSyncListRequestGeneratorKind::GrowingFullSync { fully_loaded, .. } => {
fully_loaded
}
SlidingSyncListRequestGeneratorKind::Selective => true,
}
}
}
fn create_range(
pub(super) fn create_range(
start: u32,
desired_size: u32,
maximum_number_of_rooms_to_fetch: Option<u32>,
@@ -309,72 +164,6 @@ fn create_range(
Some((start.into(), end.into()))
}
impl Iterator for SlidingSyncListRequestGenerator {
type Item = v4::SyncRequestList;
fn next(&mut self) -> Option<Self::Item> {
match self.kind {
// Cases where all rooms have been fully loaded.
GeneratorKind::PagingFullSync { fully_loaded: true, .. }
| GeneratorKind::GrowingFullSync { fully_loaded: true, .. }
| GeneratorKind::Selective => {
// Let's copy all the ranges from the parent `SlidingSyncList`, and build a
// request for them.
self.ranges = self.list.ranges.read().unwrap().clone();
// Here we go.
Some(self.build_request())
}
GeneratorKind::PagingFullSync {
number_of_fetched_rooms,
batch_size,
maximum_number_of_rooms_to_fetch,
..
} => {
// In paging-mode, range starts at the number of fetched rooms. Since ranges are
// inclusive, and since the number of fetched rooms starts at 1,
// not at 0, there is no need to add 1 here.
let range_start = number_of_fetched_rooms;
let range_desired_size = batch_size;
// Create a new range, and use it as the current set of ranges.
self.ranges = vec![create_range(
range_start,
range_desired_size,
maximum_number_of_rooms_to_fetch,
self.list.maximum_number_of_rooms(),
)?];
// Here we go.
Some(self.build_request())
}
GeneratorKind::GrowingFullSync {
number_of_fetched_rooms,
batch_size,
maximum_number_of_rooms_to_fetch,
..
} => {
// In growing-mode, range always starts from 0. However, the end is growing by
// adding `batch_size` to the previous number of fetched rooms.
let range_start = 0;
let range_desired_size = number_of_fetched_rooms.saturating_add(batch_size);
self.ranges = vec![create_range(
range_start,
range_desired_size,
maximum_number_of_rooms_to_fetch,
self.list.maximum_number_of_rooms(),
)?];
// Here we go.
Some(self.build_request())
}
}
}
}
#[cfg(test)]
mod tests {
use ruma::uint;
@@ -421,250 +210,4 @@ mod tests {
// defined at 50, and a maximum number of rooms defined at 75.
assert_eq!(create_range(0, 100, Some(50), Some(75)), Some((uint!(0), uint!(49))));
}
macro_rules! assert_request_and_response {
(
list = $list:ident,
generator = $generator:ident,
maximum_number_of_rooms = $maximum_number_of_rooms:expr,
$(
next => {
ranges = $( [ $range_start:literal ; $range_end:literal ] ),+ ,
is_fully_loaded = $is_fully_loaded:expr,
list_state = $list_state:ident,
}
),*
$(,)*
) => {
// That's the initial state.
assert_eq!($list.state(), SlidingSyncState::NotLoaded);
$(
{
// Generate a new request.
let request = $generator.next().unwrap();
assert_eq!(request.ranges, [ $( (uint!( $range_start ), uint!( $range_end )) ),* ]);
// Fake a response.
let _ = $generator.handle_response($maximum_number_of_rooms, &vec![], &vec![]);
assert_eq!($generator.is_fully_loaded(), $is_fully_loaded);
assert_eq!($list.state(), SlidingSyncState::$list_state);
}
)*
};
}
#[test]
fn test_generator_paging_full_sync() {
let list = SlidingSyncList::builder()
.sync_mode(crate::SlidingSyncMode::PagingFullSync)
.name("testing")
.full_sync_batch_size(10)
.build()
.unwrap();
let mut generator = list.request_generator();
assert_request_and_response! {
list = list,
generator = generator,
maximum_number_of_rooms = 25,
next => {
ranges = [0; 9],
is_fully_loaded = false,
list_state = PartiallyLoaded,
},
next => {
ranges = [10; 19],
is_fully_loaded = false,
list_state = PartiallyLoaded,
},
// The maximum number of rooms is reached!
next => {
ranges = [20; 24],
is_fully_loaded = true,
list_state = FullyLoaded,
},
// Now it's fully loaded, so the same request must be produced everytime.
next => {
ranges = [0; 24], // the range starts at 0 now!
is_fully_loaded = true,
list_state = FullyLoaded,
},
next => {
ranges = [0; 24],
is_fully_loaded = true,
list_state = FullyLoaded,
},
};
}
#[test]
fn test_generator_paging_full_sync_with_a_maximum_number_of_rooms_to_fetch() {
let list = SlidingSyncList::builder()
.sync_mode(crate::SlidingSyncMode::PagingFullSync)
.name("testing")
.full_sync_batch_size(10)
.full_sync_maximum_number_of_rooms_to_fetch(22)
.build()
.unwrap();
let mut generator = list.request_generator();
assert_request_and_response! {
list = list,
generator = generator,
maximum_number_of_rooms = 25,
next => {
ranges = [0; 9],
is_fully_loaded = false,
list_state = PartiallyLoaded,
},
next => {
ranges = [10; 19],
is_fully_loaded = false,
list_state = PartiallyLoaded,
},
// The maximum number of rooms to fetch is reached!
next => {
ranges = [20; 21],
is_fully_loaded = true,
list_state = FullyLoaded,
},
// Now it's fully loaded, so the same request must be produced everytime.
next => {
ranges = [0; 21], // the range starts at 0 now!
is_fully_loaded = true,
list_state = FullyLoaded,
},
next => {
ranges = [0; 21],
is_fully_loaded = true,
list_state = FullyLoaded,
},
};
}
#[test]
fn test_generator_growing_full_sync() {
let list = SlidingSyncList::builder()
.sync_mode(crate::SlidingSyncMode::GrowingFullSync)
.name("testing")
.full_sync_batch_size(10)
.build()
.unwrap();
let mut generator = list.request_generator();
assert_request_and_response! {
list = list,
generator = generator,
maximum_number_of_rooms = 25,
next => {
ranges = [0; 9],
is_fully_loaded = false,
list_state = PartiallyLoaded,
},
next => {
ranges = [0; 19],
is_fully_loaded = false,
list_state = PartiallyLoaded,
},
// The maximum number of rooms is reached!
next => {
ranges = [0; 24],
is_fully_loaded = true,
list_state = FullyLoaded,
},
// Now it's fully loaded, so the same request must be produced everytime.
next => {
ranges = [0; 24],
is_fully_loaded = true,
list_state = FullyLoaded,
},
next => {
ranges = [0; 24],
is_fully_loaded = true,
list_state = FullyLoaded,
},
};
}
#[test]
fn test_generator_growing_full_sync_with_a_maximum_number_of_rooms_to_fetch() {
let list = SlidingSyncList::builder()
.sync_mode(crate::SlidingSyncMode::GrowingFullSync)
.name("testing")
.full_sync_batch_size(10)
.full_sync_maximum_number_of_rooms_to_fetch(22)
.build()
.unwrap();
let mut generator = list.request_generator();
assert_request_and_response! {
list = list,
generator = generator,
maximum_number_of_rooms = 25,
next => {
ranges = [0; 9],
is_fully_loaded = false,
list_state = PartiallyLoaded,
},
next => {
ranges = [0; 19],
is_fully_loaded = false,
list_state = PartiallyLoaded,
},
// The maximum number of rooms is reached!
next => {
ranges = [0; 21],
is_fully_loaded = true,
list_state = FullyLoaded,
},
// Now it's fully loaded, so the same request must be produced everytime.
next => {
ranges = [0; 21],
is_fully_loaded = true,
list_state = FullyLoaded,
},
next => {
ranges = [0; 21],
is_fully_loaded = true,
list_state = FullyLoaded,
},
};
}
#[test]
fn test_generator_selective() {
let list = SlidingSyncList::builder()
.sync_mode(crate::SlidingSyncMode::Selective)
.name("testing")
.ranges(vec![(0u32, 10), (42, 153)])
.build()
.unwrap();
let mut generator = list.request_generator();
assert_request_and_response! {
list = list,
generator = generator,
maximum_number_of_rooms = 25,
// The maximum number of rooms is reached directly!
next => {
ranges = [0; 10], [42; 153],
is_fully_loaded = true,
list_state = FullyLoaded,
},
// Now it's fully loaded, so the same request must be produced everytime.
next => {
ranges = [0; 10], [42; 153],
is_fully_loaded = true,
list_state = FullyLoaded,
},
next => {
ranges = [0; 10], [42; 153],
is_fully_loaded = true,
list_state = FullyLoaded,
}
};
}
}

View File

@@ -792,7 +792,7 @@ impl SlidingSync {
/// stream created after this. The old stream will still continue to use the
/// previous set of lists.
pub fn add_list(&self, list: SlidingSyncList) -> Option<SlidingSyncList> {
self.inner.lists.write().unwrap().insert(list.name.clone(), list)
self.inner.lists.write().unwrap().insert(list.name().to_owned(), list)
}
/// Lookup a set of rooms
@@ -853,12 +853,12 @@ impl SlidingSync {
}
/// Handle the HTTP response.
#[instrument(skip_all, fields(lists = list_generators.len()))]
#[instrument(skip_all, fields(lists = lists.len()))]
fn handle_response(
&self,
sliding_sync_response: v4::Response,
mut sync_response: SyncResponse,
list_generators: &mut BTreeMap<String, SlidingSyncListRequestGenerator>,
lists: &mut BTreeMap<String, SlidingSyncList>,
) -> Result<UpdateSummary, crate::Error> {
{
debug!(
@@ -912,7 +912,7 @@ impl SlidingSync {
let mut updated_lists = Vec::new();
for (name, updates) in sliding_sync_response.lists {
let Some(list_generator) = list_generators.get_mut(&name) else {
let Some(list) = lists.get_mut(&name) else {
error!("Response for list `{name}` - unknown to us; skipping");
continue
@@ -921,11 +921,7 @@ impl SlidingSync {
let maximum_number_of_rooms: u32 =
updates.count.try_into().expect("the list total count convertible into u32");
if list_generator.handle_response(
maximum_number_of_rooms,
&updates.ops,
&updated_rooms,
)? {
if list.handle_response(maximum_number_of_rooms, &updates.ops, &updated_rooms)? {
updated_lists.push(name.clone());
}
}
@@ -944,28 +940,28 @@ impl SlidingSync {
async fn sync_once(
&self,
stream_id: &str,
list_generators: Arc<Mutex<BTreeMap<String, SlidingSyncListRequestGenerator>>>,
lists: Arc<Mutex<BTreeMap<String, SlidingSyncList>>>,
) -> Result<Option<UpdateSummary>> {
let mut lists = BTreeMap::new();
let mut requests_lists = BTreeMap::new();
{
let mut list_generators_lock = list_generators.lock().unwrap();
let list_generators = list_generators_lock.borrow_mut();
let mut lists_lock = lists.lock().unwrap();
let lists = lists_lock.borrow_mut();
let mut lists_to_remove = Vec::new();
for (name, generator) in list_generators.iter_mut() {
if let Some(request) = generator.next() {
lists.insert(name.clone(), request);
for (name, list) in lists.iter_mut() {
if let Some(list_request) = list.next_request() {
requests_lists.insert(name.clone(), list_request);
} else {
lists_to_remove.push(name.clone());
}
}
for list_name in lists_to_remove {
list_generators.remove(&list_name);
lists.remove(&list_name);
}
if list_generators.is_empty() {
if lists.is_empty() {
return Ok(None);
}
}
@@ -996,7 +992,7 @@ impl SlidingSync {
// request. We use the (optional) `txn_id` field for that.
txn_id: Some(stream_id.to_owned()),
timeout: Some(timeout),
lists,
lists: requests_lists,
room_subscriptions,
unsubscribe_rooms,
extensions,
@@ -1083,7 +1079,7 @@ impl SlidingSync {
debug!(?sync_response, "Sliding Sync response has been handled by the client");
let updates = this.handle_response(response, sync_response, list_generators.lock().unwrap().borrow_mut())?;
let updates = this.handle_response(response, sync_response, lists.lock().unwrap().borrow_mut())?;
this.cache_to_storage().await?;
@@ -1103,24 +1099,15 @@ impl SlidingSync {
#[allow(unknown_lints, clippy::let_with_type_underscore)] // triggered by instrument macro
#[instrument(name = "sync_stream", skip_all, parent = &self.inner.client.inner.root_span)]
pub fn stream<'a>(&'a self) -> impl Stream<Item = Result<UpdateSummary, crate::Error>> + 'a {
// Collect all the lists that need to be updated.
let list_generators = {
let mut list_generators = BTreeMap::new();
let lock = self.inner.lists.read().unwrap();
for (name, lists) in lock.iter() {
list_generators.insert(name.clone(), lists.request_generator());
}
list_generators
};
// Copy all the lists.
let lists = Arc::new(Mutex::new(self.inner.lists.read().unwrap().clone()));
// Define a stream ID.
let stream_id = Uuid::new_v4().to_string();
debug!(?self.inner.extensions, stream_id, "About to run the sync stream");
let instrument_span = Span::current();
let list_generators = Arc::new(Mutex::new(list_generators));
async_stream::stream! {
loop {
@@ -1130,7 +1117,7 @@ impl SlidingSync {
debug!(?self.inner.extensions, "Sync stream loop is running");
});
match self.sync_once(&stream_id, list_generators.clone()).instrument(sync_span.clone()).await {
match self.sync_once(&stream_id, lists.clone()).instrument(sync_span.clone()).await {
Ok(Some(updates)) => {
self.inner.reset_counter.store(0, Ordering::SeqCst);

View File

@@ -7,7 +7,6 @@ use std::{
use anyhow::{bail, Context};
use assert_matches::assert_matches;
use eyeball::unique::Observable;
use eyeball_im::VectorDiff;
use futures::{pin_mut, stream::StreamExt};
use matrix_sdk::{
@@ -232,7 +231,7 @@ async fn modifying_timeline_limit() -> anyhow::Result<()> {
// Sync to receive messages with a `timeline_limit` set to 20.
{
Observable::set(&mut list.timeline_limit.write().unwrap(), Some(uint!(20)));
list.set_timeline_limit(Some(uint!(20)));
let mut update_summary;