diff --git a/crates/matrix-sdk/src/event_cache/linked_chunk/as_vector.rs b/crates/matrix-sdk/src/event_cache/linked_chunk/as_vector.rs new file mode 100644 index 000000000..39e0508e1 --- /dev/null +++ b/crates/matrix-sdk/src/event_cache/linked_chunk/as_vector.rs @@ -0,0 +1,239 @@ +// Copyright 2024 The Matrix.org Foundation C.I.C. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::{ + collections::VecDeque, + ops::ControlFlow, + pin::Pin, + task::{ready, Context, Poll}, +}; + +use eyeball_im::VectorDiff; +use futures_core::Stream; +use pin_project_lite::pin_project; + +use super::{ + updates::{Update, UpdatesSubscriber}, + ChunkIdentifier, ChunkIdentifierGenerator, +}; + +type Offset = usize; +type ChunkLength = usize; + +pin_project! { + pub struct AsVectorSubscriber { + #[pin] + updates_subscriber: UpdatesSubscriber, + + chunks: VecDeque<(ChunkIdentifier, ChunkLength)>, + } +} + +impl AsVectorSubscriber { + pub(super) fn new(updates_subscriber: UpdatesSubscriber) -> Self { + Self { + updates_subscriber, + chunks: { + let mut chunks = VecDeque::new(); + chunks.insert(0, (ChunkIdentifierGenerator::FIRST_IDENTIFIER, 0)); + + chunks + }, + } + } +} + +impl Stream for AsVectorSubscriber +where + Item: Clone + std::fmt::Debug, + Gap: Clone + std::fmt::Debug, +{ + type Item = Vec>; + + fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + let mut this = self.project(); + + let Some(updates) = ready!(this.updates_subscriber.as_mut().poll_next(cx)) else { + return Poll::Ready(None); + }; + + let mut diffs = Vec::with_capacity(updates.len()); + + for update in updates { + match update { + Update::NewItemsChunk { previous, new, next } + | Update::NewGapChunk { previous, new, next, .. } => { + match (previous, next) { + // New chunk at the end. + (Some(_previous), None) => { + // TODO: chec `previous` is correct + this.chunks.push_back((new, 0)); + } + + // New chunk at the beginning. + (None, Some(_next)) => { + // TODO: check `next` is correct + this.chunks.push_front((new, 0)); + } + + // New chunk is inserted between 2 chunks. + (Some(_previous), Some(next)) => { + // TODO: check `previous` is correct + let next_chunk_index = this + .chunks + .iter() + .position(|(chunk_identifier, _)| *chunk_identifier == next) + .expect("next chunk not found"); + + this.chunks.insert(next_chunk_index, (new, 0)); + } + + (None, None) => { + unreachable!("?"); + } + } + } + + Update::RemoveChunk(expected_chunk_identifier) => { + let chunk_index = this + .chunks + .iter() + .position(|(chunk_identifier, _)| { + *chunk_identifier == expected_chunk_identifier + }) + .expect("oops 4"); + + this.chunks.remove(chunk_index).unwrap(); + } + + Update::InsertItems { at: position, items } => { + let expected_chunk_identifier = position.chunk_identifier(); + + let (chunk_index, offset, chunk_length) = { + let control_flow = this.chunks.iter_mut().enumerate().try_fold( + position.index(), + |offset, (chunk_index, (chunk_identifier, chunk_length))| { + if chunk_identifier == &expected_chunk_identifier { + ControlFlow::Break((chunk_index, offset, chunk_length)) + } else { + ControlFlow::Continue(offset + *chunk_length) + } + }, + ); + + match control_flow { + ControlFlow::Break(value) => Some(value), + ControlFlow::Continue(..) => None, + } + } + .expect("`ChunkIdentifier` must exist"); + + *chunk_length += items.len(); + + // Optimisation: we can emit a `VectorDiff::Append` in this particular case. + if chunk_index == this.chunks.len() - 1 { + diffs.push(VectorDiff::Append { values: items.into() }); + } + // No optimisation: let's emit `VectorDiff::Insert`. + else { + diffs.extend(items.into_iter().enumerate().map(|(nth, item)| { + VectorDiff::Insert { index: offset + nth, value: item } + })); + } + } + + Update::TruncateItems { chunk: expected_chunk_identifier, length: new_length } => { + let length = this + .chunks + .iter_mut() + .find_map(|(chunk_identifier, length)| { + (*chunk_identifier == expected_chunk_identifier).then(|| length) + }) + .expect("oops 3"); + + let old_length = *length; + *length = new_length; + + diffs.extend( + (new_length..old_length) + .into_iter() + .map(|index| VectorDiff::Remove { index }), + ); + } + } + } + + Poll::Ready(Some(diffs)) + } +} + +#[cfg(test)] +mod tests { + use futures_util::pin_mut; + use imbl::vector; + use stream_assert::{assert_closed, assert_next_eq, assert_pending}; + + use super::{super::LinkedChunk, VectorDiff}; + + #[test] + fn test_as_vector() { + let mut linked_chunk = LinkedChunk::<3, char, ()>::new_with_update_history(); + let as_vector = linked_chunk.subscribe_as_vector().unwrap(); + pin_mut!(as_vector); + + assert_pending!(as_vector); + + linked_chunk.push_items_back(['a', 'b']); + linked_chunk.push_items_back(['c', 'd', 'e']); + + assert_next_eq!( + as_vector, + &[ + VectorDiff::Append { values: vector!['a', 'b'] }, + VectorDiff::Append { values: vector!['c'] }, + VectorDiff::Append { values: vector!['d', 'e'] }, + ] + ); + + linked_chunk + .insert_items_at(['f', 'g'], linked_chunk.item_position(|item| *item == 'b').unwrap()) + .unwrap(); + + assert_next_eq!( + as_vector, + &[ + VectorDiff::Remove { index: 1 }, + VectorDiff::Remove { index: 2 }, + VectorDiff::Insert { index: 1, value: 'f' }, + VectorDiff::Insert { index: 2, value: 'g' }, + VectorDiff::Insert { index: 3, value: 'b' }, + VectorDiff::Insert { index: 4, value: 'c' }, + ] + ); + + linked_chunk.push_gap_back(()); + linked_chunk.push_items_back(['h', 'i']); + + assert_next_eq!(as_vector, &[VectorDiff::Append { values: vector!['h', 'i'] }]); + + linked_chunk + .replace_gap_at(['j'], linked_chunk.chunk_identifier(|chunk| chunk.is_gap()).unwrap()) + .unwrap(); + + assert_next_eq!(as_vector, &[VectorDiff::Insert { index: 7, value: 'j' }]); + + drop(linked_chunk); + assert_closed!(as_vector); + } +} diff --git a/crates/matrix-sdk/src/event_cache/linked_chunk/mod.rs b/crates/matrix-sdk/src/event_cache/linked_chunk/mod.rs index 71a35d53d..524b4066b 100644 --- a/crates/matrix-sdk/src/event_cache/linked_chunk/mod.rs +++ b/crates/matrix-sdk/src/event_cache/linked_chunk/mod.rs @@ -14,6 +14,7 @@ #![allow(dead_code)] +mod as_vector; mod updates; use std::{ @@ -24,6 +25,7 @@ use std::{ sync::atomic::{AtomicU64, Ordering}, }; +use as_vector::*; use updates::*; /// Errors of [`LinkedChunk`]. @@ -611,9 +613,17 @@ impl LinkedChunk { /// If the `Option` becomes `None`, it will disable update history. Thus, be /// careful when you want to empty the update history: do not use /// `Option::take()` directly but rather [`Updates::take`] for example. + /// + /// It returns `None` if updates are disabled, i.e. if this linked chunk has + /// been constructed with [`Self::new`], otherwise, if it's been constructed + /// with [`Self::new_with_update_history`], it returns `Some(…)`. pub fn updates(&mut self) -> Option<&mut Updates> { self.updates.as_mut() } + + pub fn subscribe_as_vector(&mut self) -> Option> { + self.updates.as_mut().map(|updates| AsVectorSubscriber::new(updates.subscribe())) + } } impl Drop for LinkedChunk { diff --git a/crates/matrix-sdk/src/event_cache/linked_chunk/updates.rs b/crates/matrix-sdk/src/event_cache/linked_chunk/updates.rs index 2112a9edf..ab8732590 100644 --- a/crates/matrix-sdk/src/event_cache/linked_chunk/updates.rs +++ b/crates/matrix-sdk/src/event_cache/linked_chunk/updates.rs @@ -258,7 +258,7 @@ impl Updates { } /// Subscribe to updates by using a [`Stream`]. - fn subscribe(&mut self) -> UpdatesSubscriber { + pub(super) fn subscribe(&mut self) -> UpdatesSubscriber { // A subscriber is a new update reader, it needs its own token. let token = { let mut inner = self.inner.write().unwrap(); @@ -276,7 +276,7 @@ impl Updates { /// A subscriber to [`Updates`]. It is helpful to receive updates via a /// [`Stream`]. -struct UpdatesSubscriber { +pub(super) struct UpdatesSubscriber { /// Weak reference to [`UpdatesInner`]. /// /// Using a weak reference allows [`Updates`] to be dropped