From 619085a19025e4b4ce5d23d36da8102eeb5ac58f Mon Sep 17 00:00:00 2001 From: Jonas Platte Date: Thu, 7 Sep 2023 15:04:53 +0200 Subject: [PATCH] Use ObservableVectorTransaction for room list --- .../src/room_list_service/room_list.rs | 33 ++------- .../matrix-sdk/src/sliding_sync/list/mod.rs | 73 ++++++++++--------- 2 files changed, 44 insertions(+), 62 deletions(-) diff --git a/crates/matrix-sdk-ui/src/room_list_service/room_list.rs b/crates/matrix-sdk-ui/src/room_list_service/room_list.rs index 3ac1fd865..b526893ee 100644 --- a/crates/matrix-sdk-ui/src/room_list_service/room_list.rs +++ b/crates/matrix-sdk-ui/src/room_list_service/room_list.rs @@ -32,7 +32,6 @@ use super::{Error, State}; #[derive(Debug)] pub struct RoomList { sliding_sync_list: SlidingSyncList, - room_list_service_state: Subscriber, loading_state: SharedObservable, loading_state_task: JoinHandle<()>, } @@ -58,7 +57,6 @@ impl RoomList { Ok(Self { sliding_sync_list: sliding_sync_list.clone(), - room_list_service_state: room_list_service_state.clone(), loading_state: loading_state.clone(), loading_state_task: spawn(async move { pin_mut!(room_list_service_state); @@ -103,14 +101,7 @@ impl RoomList { pub fn entries( &self, ) -> (Vector, impl Stream>>) { - let (entries, entries_stream) = self.sliding_sync_list.room_list_stream(); - - ( - entries, - // Batch the entries stream. Batch is drained every time the `room_list_service_state` - // is changed. - entries_stream.batch_with(self.room_list_service_state.clone().map(|_| ())), - ) + self.sliding_sync_list.room_list_stream() } /// Similar to [`Self::entries`] except that it's possible to provide a @@ -122,14 +113,7 @@ impl RoomList { where F: Fn(&RoomListEntry) -> bool, { - let (entries, entries_stream) = self.sliding_sync_list.room_list_filtered_stream(filter); - - ( - entries, - // Batch the entries stream. Batch is drained every time the `room_list_service_state` - // is changed. - entries_stream.batch_with(self.room_list_service_state.clone().map(|_| ())), - ) + self.sliding_sync_list.room_list_filtered_stream(filter) } /// Similar to [`Self::entries_with_static_filter`] except that it's @@ -147,21 +131,14 @@ impl RoomList { let dynamic_filter = DynamicRoomListFilter::new(filter_fn_cell.clone()); let list = self.sliding_sync_list.clone(); - let room_list_service_state = self.room_list_service_state.clone(); let stream = stream! { loop { let filter_fn = filter_fn_cell.take().await; let (items, stream) = list.room_list_filtered_stream(filter_fn); - yield stream::once( - // Reset the stream with all its items. - ready(vec![VectorDiff::Reset { values: items }]), - ) - .chain( - // Batch the entries stream. Batch is drained every time the - // `room_list_service_state` is changed. - stream.batch_with(room_list_service_state.clone().map(|_| ())), - ) + // Reset the stream with all its items. + yield stream::once(ready(vec![VectorDiff::Reset { values: items }])) + .chain(stream); } } .switch(); diff --git a/crates/matrix-sdk/src/sliding_sync/list/mod.rs b/crates/matrix-sdk/src/sliding_sync/list/mod.rs index e92acd04f..f62073cec 100644 --- a/crates/matrix-sdk/src/sliding_sync/list/mod.rs +++ b/crates/matrix-sdk/src/sliding_sync/list/mod.rs @@ -13,7 +13,7 @@ use std::{ }; use eyeball::Observable; -use eyeball_im::{ObservableVector, VectorDiff}; +use eyeball_im::{ObservableVector, ObservableVectorTransaction, VectorDiff}; use eyeball_im_util::vector; use futures_core::Stream; use imbl::Vector; @@ -150,12 +150,12 @@ impl SlidingSyncList { /// entries. pub fn room_list_stream( &self, - ) -> (Vector, impl Stream>) { + ) -> (Vector, impl Stream>>) { let read_lock = self.inner.room_list.read().unwrap(); let values = (*read_lock).clone(); let subscriber = ObservableVector::subscribe(&read_lock); - (values, subscriber.into_stream()) + (values, subscriber.into_batched_stream()) } /// Get a stream of room list, but filtered. @@ -165,7 +165,7 @@ impl SlidingSyncList { pub fn room_list_filtered_stream( &self, filter: F, - ) -> (Vector, impl Stream>) + ) -> (Vector, impl Stream>>) where F: Fn(&RoomListEntry) -> bool, { @@ -173,7 +173,7 @@ impl SlidingSyncList { let values = (*read_lock).clone(); let subscriber = ObservableVector::subscribe(&read_lock); - vector::Filter::new(values, subscriber.into_stream(), filter) + vector::Filter::new(values, subscriber.into_batched_stream(), filter) } /// Get the maximum number of rooms. See [`Self::maximum_number_of_rooms`] @@ -435,6 +435,7 @@ impl SlidingSyncListInner { { let mut room_list = self.room_list.write().unwrap(); + let mut room_list_txn = room_list.transaction(); // Run the sync operations. let mut rooms_that_have_received_an_update = @@ -443,7 +444,7 @@ impl SlidingSyncListInner { if !list_sync_operations.is_empty() { apply_sync_operations( list_sync_operations, - &mut room_list, + &mut room_list_txn, &mut rooms_that_have_received_an_update, )?; @@ -458,7 +459,7 @@ impl SlidingSyncListInner { // message). Let's trigger those. let mut rooms_to_update = Vec::with_capacity(rooms_that_have_received_an_update.len()); - for (position, room_list_entry) in room_list.iter().enumerate() { + for (position, room_list_entry) in room_list_txn.iter().enumerate() { // Invalidated rooms must be considered as empty rooms, so let's just filter by // filled rooms. if let RoomListEntry::Filled(room_id) = room_list_entry { @@ -474,11 +475,13 @@ impl SlidingSyncListInner { for (position, room_list_entry) in rooms_to_update { // Setting to `room_list`'s item to the same value, just // to generate an “diff update”. - room_list.set(position, room_list_entry); + room_list_txn.set(position, room_list_entry); } new_changes = true; } + + room_list_txn.commit(); } Ok(new_changes) @@ -504,7 +507,7 @@ impl SlidingSyncListInner { fn apply_sync_operations( operations: &[v4::SyncOp], - room_list: &mut ObservableVector, + room_list_txn: &mut ObservableVectorTransaction<'_, RoomListEntry>, rooms_that_have_received_an_update: &mut HashSet, ) -> Result<(), Error> { for operation in operations { @@ -538,11 +541,11 @@ fn apply_sync_operations( } // Range is too big. - if end > room_list.len() { + if end > room_list_txn.len() { return Err(Error::BadResponse(format!( "`range` is out of the `rooms_list`'s bounds ({} > {})", end, - room_list.len(), + room_list_txn.len(), ))); } @@ -574,7 +577,7 @@ fn apply_sync_operations( // The room ID is given by the `room_ids`. for (room_entry_index, room_id) in room_entry_range.zip(room_ids) { // Syncing means updating the room list to `Filled`. - room_list.set(room_entry_index, RoomListEntry::Filled(room_id.clone())); + room_list_txn.set(room_entry_index, RoomListEntry::Filled(room_id.clone())); // This `room_id` has been handled, let's remove it from the rooms to handle // later. @@ -598,7 +601,7 @@ fn apply_sync_operations( .unwrap(); // Index is out of bounds. - if index >= room_list.len() { + if index >= room_list_txn.len() { // OK, so, normally, we should raise an error. But the server sometimes sends a // `DELETE` for an index that doesn't exist. It happens with the existing // Sliding Sync Proxy (at the time of writing). It may be a bug or something @@ -607,7 +610,7 @@ fn apply_sync_operations( } // Removing the entry in the room list. - let room_entry = room_list.remove(index); + let room_entry = room_list_txn.remove(index); // This `room_id` has been handled, let's remove it from the rooms to handle // later. @@ -638,11 +641,11 @@ fn apply_sync_operations( })?; // Index is out of bounds. - if index > room_list.len() { + if index > room_list_txn.len() { return Err(Error::BadResponse(format!( "`index` is out of the `room_list`' bounds ({} > {})", index, - room_list.len(), + room_list_txn.len(), ))); } @@ -651,7 +654,7 @@ fn apply_sync_operations( rooms_that_have_received_an_update.remove(&room_id); // Inserting a `Filled` entry in the room list . - room_list.insert(index, RoomListEntry::Filled(room_id)); + room_list_txn.insert(index, RoomListEntry::Filled(room_id)); } // Specification says: @@ -685,11 +688,11 @@ fn apply_sync_operations( } // Range is too big. - if end > room_list.len() { + if end > room_list_txn.len() { return Err(Error::BadResponse(format!( "`range` is out of the `room_list`' bounds ({} > {})", end, - room_list.len(), + room_list_txn.len(), ))); } @@ -703,13 +706,13 @@ fn apply_sync_operations( // // If the previous room list entry is `Filled`, it becomes `Invalidated`. // Otherwise, for `Empty` or `Invalidated`, it stays as is. - match room_list.get(room_entry_index) { + match room_list_txn.get(room_entry_index) { Some(RoomListEntry::Filled(room_id)) => { // This `room_id` is being handled, let's remove it from the rooms to // handle later. rooms_that_have_received_an_update.remove(room_id); - room_list.set( + room_list_txn.set( room_entry_index, RoomListEntry::Invalidated(room_id.to_owned()), ); @@ -1580,20 +1583,16 @@ mod tests { // forever. tokio::time::sleep(std::time::Duration::from_millis(100)).await; - // `room3` has been modified by a `SYNC` operation. assert_eq!( room_list_stream_receiver.try_recv(), - Ok(VectorDiff::Set { index: 3, value: RoomListEntry::Filled(room3.to_owned()) }) - ); - // `room4` has been modified by a `SYNC` operation. - assert_eq!( - room_list_stream_receiver.try_recv(), - Ok(VectorDiff::Set { index: 4, value: RoomListEntry::Filled(room4.to_owned()) }) - ); - // `room2` has been modified by another update (like a new event). - assert_eq!( - room_list_stream_receiver.try_recv(), - Ok(VectorDiff::Set { index: 2, value: RoomListEntry::Filled(room2.to_owned()) }) + Ok(vec![ + // `room3` has been modified by a `SYNC` operation. + VectorDiff::Set { index: 3, value: RoomListEntry::Filled(room3.to_owned()) }, + // `room4` has been modified by a `SYNC` operation. + VectorDiff::Set { index: 4, value: RoomListEntry::Filled(room4.to_owned()) }, + // `room2` has been modified by another update (like a new event). + VectorDiff::Set { index: 2, value: RoomListEntry::Filled(room2.to_owned()) }, + ]) ); } @@ -1690,7 +1689,13 @@ mod tests { } )? - let result = apply_sync_operations(operations, &mut room_list, &mut rooms_that_have_received_an_update); + let mut txn = room_list.transaction(); + let result = apply_sync_operations( + operations, + &mut txn, + &mut rooms_that_have_received_an_update, + ); + txn.commit(); assert!(result.$result(), "{}; assert the `Result`", $assert_description); assert_eq!(