Use ObservableVectorTransaction for room list

This commit is contained in:
Jonas Platte
2023-09-07 15:04:53 +02:00
committed by Jonas Platte
parent 71cc7318ca
commit 619085a190
2 changed files with 44 additions and 62 deletions

View File

@@ -32,7 +32,6 @@ use super::{Error, State};
#[derive(Debug)]
pub struct RoomList {
sliding_sync_list: SlidingSyncList,
room_list_service_state: Subscriber<State>,
loading_state: SharedObservable<RoomListLoadingState>,
loading_state_task: JoinHandle<()>,
}
@@ -58,7 +57,6 @@ impl RoomList {
Ok(Self {
sliding_sync_list: sliding_sync_list.clone(),
room_list_service_state: room_list_service_state.clone(),
loading_state: loading_state.clone(),
loading_state_task: spawn(async move {
pin_mut!(room_list_service_state);
@@ -103,14 +101,7 @@ impl RoomList {
pub fn entries(
&self,
) -> (Vector<RoomListEntry>, impl Stream<Item = Vec<VectorDiff<RoomListEntry>>>) {
let (entries, entries_stream) = self.sliding_sync_list.room_list_stream();
(
entries,
// Batch the entries stream. Batch is drained every time the `room_list_service_state`
// is changed.
entries_stream.batch_with(self.room_list_service_state.clone().map(|_| ())),
)
self.sliding_sync_list.room_list_stream()
}
/// Similar to [`Self::entries`] except that it's possible to provide a
@@ -122,14 +113,7 @@ impl RoomList {
where
F: Fn(&RoomListEntry) -> bool,
{
let (entries, entries_stream) = self.sliding_sync_list.room_list_filtered_stream(filter);
(
entries,
// Batch the entries stream. Batch is drained every time the `room_list_service_state`
// is changed.
entries_stream.batch_with(self.room_list_service_state.clone().map(|_| ())),
)
self.sliding_sync_list.room_list_filtered_stream(filter)
}
/// Similar to [`Self::entries_with_static_filter`] except that it's
@@ -147,21 +131,14 @@ impl RoomList {
let dynamic_filter = DynamicRoomListFilter::new(filter_fn_cell.clone());
let list = self.sliding_sync_list.clone();
let room_list_service_state = self.room_list_service_state.clone();
let stream = stream! {
loop {
let filter_fn = filter_fn_cell.take().await;
let (items, stream) = list.room_list_filtered_stream(filter_fn);
yield stream::once(
// Reset the stream with all its items.
ready(vec![VectorDiff::Reset { values: items }]),
)
.chain(
// Batch the entries stream. Batch is drained every time the
// `room_list_service_state` is changed.
stream.batch_with(room_list_service_state.clone().map(|_| ())),
)
// Reset the stream with all its items.
yield stream::once(ready(vec![VectorDiff::Reset { values: items }]))
.chain(stream);
}
}
.switch();

View File

@@ -13,7 +13,7 @@ use std::{
};
use eyeball::Observable;
use eyeball_im::{ObservableVector, VectorDiff};
use eyeball_im::{ObservableVector, ObservableVectorTransaction, VectorDiff};
use eyeball_im_util::vector;
use futures_core::Stream;
use imbl::Vector;
@@ -150,12 +150,12 @@ impl SlidingSyncList {
/// entries.
pub fn room_list_stream(
&self,
) -> (Vector<RoomListEntry>, impl Stream<Item = VectorDiff<RoomListEntry>>) {
) -> (Vector<RoomListEntry>, impl Stream<Item = Vec<VectorDiff<RoomListEntry>>>) {
let read_lock = self.inner.room_list.read().unwrap();
let values = (*read_lock).clone();
let subscriber = ObservableVector::subscribe(&read_lock);
(values, subscriber.into_stream())
(values, subscriber.into_batched_stream())
}
/// Get a stream of room list, but filtered.
@@ -165,7 +165,7 @@ impl SlidingSyncList {
pub fn room_list_filtered_stream<F>(
&self,
filter: F,
) -> (Vector<RoomListEntry>, impl Stream<Item = VectorDiff<RoomListEntry>>)
) -> (Vector<RoomListEntry>, impl Stream<Item = Vec<VectorDiff<RoomListEntry>>>)
where
F: Fn(&RoomListEntry) -> bool,
{
@@ -173,7 +173,7 @@ impl SlidingSyncList {
let values = (*read_lock).clone();
let subscriber = ObservableVector::subscribe(&read_lock);
vector::Filter::new(values, subscriber.into_stream(), filter)
vector::Filter::new(values, subscriber.into_batched_stream(), filter)
}
/// Get the maximum number of rooms. See [`Self::maximum_number_of_rooms`]
@@ -435,6 +435,7 @@ impl SlidingSyncListInner {
{
let mut room_list = self.room_list.write().unwrap();
let mut room_list_txn = room_list.transaction();
// Run the sync operations.
let mut rooms_that_have_received_an_update =
@@ -443,7 +444,7 @@ impl SlidingSyncListInner {
if !list_sync_operations.is_empty() {
apply_sync_operations(
list_sync_operations,
&mut room_list,
&mut room_list_txn,
&mut rooms_that_have_received_an_update,
)?;
@@ -458,7 +459,7 @@ impl SlidingSyncListInner {
// message). Let's trigger those.
let mut rooms_to_update = Vec::with_capacity(rooms_that_have_received_an_update.len());
for (position, room_list_entry) in room_list.iter().enumerate() {
for (position, room_list_entry) in room_list_txn.iter().enumerate() {
// Invalidated rooms must be considered as empty rooms, so let's just filter by
// filled rooms.
if let RoomListEntry::Filled(room_id) = room_list_entry {
@@ -474,11 +475,13 @@ impl SlidingSyncListInner {
for (position, room_list_entry) in rooms_to_update {
// Setting to `room_list`'s item to the same value, just
// to generate an “diff update”.
room_list.set(position, room_list_entry);
room_list_txn.set(position, room_list_entry);
}
new_changes = true;
}
room_list_txn.commit();
}
Ok(new_changes)
@@ -504,7 +507,7 @@ impl SlidingSyncListInner {
fn apply_sync_operations(
operations: &[v4::SyncOp],
room_list: &mut ObservableVector<RoomListEntry>,
room_list_txn: &mut ObservableVectorTransaction<'_, RoomListEntry>,
rooms_that_have_received_an_update: &mut HashSet<OwnedRoomId>,
) -> Result<(), Error> {
for operation in operations {
@@ -538,11 +541,11 @@ fn apply_sync_operations(
}
// Range is too big.
if end > room_list.len() {
if end > room_list_txn.len() {
return Err(Error::BadResponse(format!(
"`range` is out of the `rooms_list`'s bounds ({} > {})",
end,
room_list.len(),
room_list_txn.len(),
)));
}
@@ -574,7 +577,7 @@ fn apply_sync_operations(
// The room ID is given by the `room_ids`.
for (room_entry_index, room_id) in room_entry_range.zip(room_ids) {
// Syncing means updating the room list to `Filled`.
room_list.set(room_entry_index, RoomListEntry::Filled(room_id.clone()));
room_list_txn.set(room_entry_index, RoomListEntry::Filled(room_id.clone()));
// This `room_id` has been handled, let's remove it from the rooms to handle
// later.
@@ -598,7 +601,7 @@ fn apply_sync_operations(
.unwrap();
// Index is out of bounds.
if index >= room_list.len() {
if index >= room_list_txn.len() {
// OK, so, normally, we should raise an error. But the server sometimes sends a
// `DELETE` for an index that doesn't exist. It happens with the existing
// Sliding Sync Proxy (at the time of writing). It may be a bug or something
@@ -607,7 +610,7 @@ fn apply_sync_operations(
}
// Removing the entry in the room list.
let room_entry = room_list.remove(index);
let room_entry = room_list_txn.remove(index);
// This `room_id` has been handled, let's remove it from the rooms to handle
// later.
@@ -638,11 +641,11 @@ fn apply_sync_operations(
})?;
// Index is out of bounds.
if index > room_list.len() {
if index > room_list_txn.len() {
return Err(Error::BadResponse(format!(
"`index` is out of the `room_list`' bounds ({} > {})",
index,
room_list.len(),
room_list_txn.len(),
)));
}
@@ -651,7 +654,7 @@ fn apply_sync_operations(
rooms_that_have_received_an_update.remove(&room_id);
// Inserting a `Filled` entry in the room list .
room_list.insert(index, RoomListEntry::Filled(room_id));
room_list_txn.insert(index, RoomListEntry::Filled(room_id));
}
// Specification says:
@@ -685,11 +688,11 @@ fn apply_sync_operations(
}
// Range is too big.
if end > room_list.len() {
if end > room_list_txn.len() {
return Err(Error::BadResponse(format!(
"`range` is out of the `room_list`' bounds ({} > {})",
end,
room_list.len(),
room_list_txn.len(),
)));
}
@@ -703,13 +706,13 @@ fn apply_sync_operations(
//
// If the previous room list entry is `Filled`, it becomes `Invalidated`.
// Otherwise, for `Empty` or `Invalidated`, it stays as is.
match room_list.get(room_entry_index) {
match room_list_txn.get(room_entry_index) {
Some(RoomListEntry::Filled(room_id)) => {
// This `room_id` is being handled, let's remove it from the rooms to
// handle later.
rooms_that_have_received_an_update.remove(room_id);
room_list.set(
room_list_txn.set(
room_entry_index,
RoomListEntry::Invalidated(room_id.to_owned()),
);
@@ -1580,20 +1583,16 @@ mod tests {
// forever.
tokio::time::sleep(std::time::Duration::from_millis(100)).await;
// `room3` has been modified by a `SYNC` operation.
assert_eq!(
room_list_stream_receiver.try_recv(),
Ok(VectorDiff::Set { index: 3, value: RoomListEntry::Filled(room3.to_owned()) })
);
// `room4` has been modified by a `SYNC` operation.
assert_eq!(
room_list_stream_receiver.try_recv(),
Ok(VectorDiff::Set { index: 4, value: RoomListEntry::Filled(room4.to_owned()) })
);
// `room2` has been modified by another update (like a new event).
assert_eq!(
room_list_stream_receiver.try_recv(),
Ok(VectorDiff::Set { index: 2, value: RoomListEntry::Filled(room2.to_owned()) })
Ok(vec![
// `room3` has been modified by a `SYNC` operation.
VectorDiff::Set { index: 3, value: RoomListEntry::Filled(room3.to_owned()) },
// `room4` has been modified by a `SYNC` operation.
VectorDiff::Set { index: 4, value: RoomListEntry::Filled(room4.to_owned()) },
// `room2` has been modified by another update (like a new event).
VectorDiff::Set { index: 2, value: RoomListEntry::Filled(room2.to_owned()) },
])
);
}
@@ -1690,7 +1689,13 @@ mod tests {
}
)?
let result = apply_sync_operations(operations, &mut room_list, &mut rooms_that_have_received_an_update);
let mut txn = room_list.transaction();
let result = apply_sync_operations(
operations,
&mut txn,
&mut rooms_that_have_received_an_update,
);
txn.commit();
assert!(result.$result(), "{}; assert the `Result`", $assert_description);
assert_eq!(