feat(sdk): Introduce linked_chunk::Updates

feat(sdk): Introduce `linked_chunk::Updates`
This commit is contained in:
Ivan Enderlin
2024-05-12 17:42:29 +02:00
committed by GitHub
3 changed files with 788 additions and 220 deletions

View File

@@ -14,6 +14,8 @@
#![allow(dead_code)]
mod updates;
use std::{
fmt,
marker::PhantomData,
@@ -22,9 +24,11 @@ use std::{
sync::atomic::{AtomicU64, Ordering},
};
use updates::*;
/// Errors of [`LinkedChunk`].
#[derive(thiserror::Error, Debug)]
pub enum LinkedChunkError {
pub enum Error {
#[error("The chunk identifier is invalid: `{identifier:?}`")]
InvalidChunkIdentifier { identifier: ChunkIdentifier },
@@ -38,64 +42,18 @@ pub enum LinkedChunkError {
InvalidItemIndex { index: usize },
}
/// Represent the updates that have happened inside a [`LinkedChunk`].
///
/// To retrieve the updates, use [`LinkedChunk::updates`].
///
/// These updates are useful to store a `LinkedChunk` in another form of
/// storage, like a database or something similar.
#[derive(Debug, PartialEq)]
pub enum LinkedChunkUpdate<Item, Gap> {
/// A new chunk of kind Items has been created.
NewItemsChunk {
/// The identifier of the previous chunk of this new chunk.
previous: Option<ChunkIdentifier>,
/// The identifier of the new chunk.
new: ChunkIdentifier,
/// The identifier of the next chunk of this new chunk.
next: Option<ChunkIdentifier>,
},
/// A new chunk of kind Gap has been created.
NewGapChunk {
/// The identifier of the previous chunk of this new chunk.
previous: Option<ChunkIdentifier>,
/// The identifier of the new chunk.
new: ChunkIdentifier,
/// The identifier of the next chunk of this new chunk.
next: Option<ChunkIdentifier>,
/// The content of the chunk.
gap: Gap,
},
/// A chunk has been removed.
RemoveChunk(ChunkIdentifier),
/// Items are inserted inside a chunk of kind Items.
InsertItems {
/// [`Position`] of the items.
at: Position,
/// The items.
items: Vec<Item>,
},
/// A chunk of kind Items has been truncated.
TruncateItems {
/// The identifier of the chunk.
chunk: ChunkIdentifier,
/// The new length of the chunk.
length: usize,
},
}
/// Links of a `LinkedChunk`, i.e. the first and last [`Chunk`].
///
/// This type was introduced to avoid borrow checking errors when mutably
/// referencing a subset of fields of a `LinkedChunk`.
struct LinkedChunkEnds<const CHUNK_CAPACITY: usize, Item, Gap> {
struct Ends<const CHUNK_CAPACITY: usize, Item, Gap> {
/// The first chunk.
first: NonNull<Chunk<CHUNK_CAPACITY, Item, Gap>>,
/// The last chunk.
last: Option<NonNull<Chunk<CHUNK_CAPACITY, Item, Gap>>>,
}
impl<const CAP: usize, Item, Gap> LinkedChunkEnds<CAP, Item, Gap> {
impl<const CAP: usize, Item, Gap> Ends<CAP, Item, Gap> {
/// Get the first chunk, as an immutable reference.
fn first_chunk(&self) -> &Chunk<CAP, Item, Gap> {
unsafe { self.first.as_ref() }
@@ -146,15 +104,19 @@ impl<const CAP: usize, Item, Gap> LinkedChunkEnds<CAP, Item, Gap> {
/// entirely full. A chunk can represents a `Gap` between other chunks.
pub struct LinkedChunk<const CHUNK_CAPACITY: usize, Item, Gap> {
/// The links to the chunks, i.e. the first and the last chunk.
links: LinkedChunkEnds<CHUNK_CAPACITY, Item, Gap>,
links: Ends<CHUNK_CAPACITY, Item, Gap>,
/// The number of items hold by this linked chunk.
length: usize,
/// The generator of chunk identifiers.
chunk_identifier_generator: ChunkIdentifierGenerator,
/// All updates that have been made on this `LinkedChunk`. If this field is
/// `Some(…)`, update history is enabled, otherwise, if it's `None`, update
/// history is disabled.
update_history: Option<Vec<LinkedChunkUpdate<Item, Gap>>>,
updates: Option<Updates<Item, Gap>>,
/// Marker.
marker: PhantomData<Box<Chunk<CHUNK_CAPACITY, Item, Gap>>>,
}
@@ -163,29 +125,32 @@ impl<const CAP: usize, Item, Gap> LinkedChunk<CAP, Item, Gap> {
/// Create a new [`Self`].
pub fn new() -> Self {
Self {
links: LinkedChunkEnds {
links: Ends {
// INVARIANT: The first chunk must always be an Items, not a Gap.
first: Chunk::new_items_leaked(ChunkIdentifierGenerator::FIRST_IDENTIFIER),
last: None,
},
length: 0,
chunk_identifier_generator: ChunkIdentifierGenerator::new_from_scratch(),
update_history: None,
updates: None,
marker: PhantomData,
}
}
/// Create a new [`Self`] with a history of updates.
///
/// When [`Self`] is built with update history, the [`Updates::take`] method
/// must be called to consume and clean the updates. See [`Self::updates`].
pub fn new_with_update_history() -> Self {
Self {
links: LinkedChunkEnds {
links: Ends {
// INVARIANT: The first chunk must always be an Items, not a Gap.
first: Chunk::new_items_leaked(ChunkIdentifierGenerator::FIRST_IDENTIFIER),
last: None,
},
length: 0,
chunk_identifier_generator: ChunkIdentifierGenerator::new_from_scratch(),
update_history: Some(Vec::new()),
updates: Some(Updates::new()),
marker: PhantomData,
}
}
@@ -213,11 +178,8 @@ impl<const CAP: usize, Item, Gap> LinkedChunk<CAP, Item, Gap> {
let last_chunk = self.links.latest_chunk_mut();
// Push the items.
let last_chunk = last_chunk.push_items(
items,
&self.chunk_identifier_generator,
&mut self.update_history,
);
let last_chunk =
last_chunk.push_items(items, &self.chunk_identifier_generator, &mut self.updates);
debug_assert!(last_chunk.is_last_chunk(), "`last_chunk` must be… the last chunk");
@@ -242,7 +204,7 @@ impl<const CAP: usize, Item, Gap> LinkedChunk<CAP, Item, Gap> {
let last_chunk = self.links.latest_chunk_mut();
last_chunk.insert_next(
Chunk::new_gap_leaked(self.chunk_identifier_generator.next(), content),
&mut self.update_history,
&mut self.updates,
);
self.links.last = last_chunk.next;
@@ -252,11 +214,7 @@ impl<const CAP: usize, Item, Gap> LinkedChunk<CAP, Item, Gap> {
///
/// Because the `position` can be invalid, this method returns a
/// `Result`.
pub fn insert_items_at<I>(
&mut self,
items: I,
position: Position,
) -> Result<(), LinkedChunkError>
pub fn insert_items_at<I>(&mut self, items: I, position: Position) -> Result<(), Error>
where
Item: Clone,
Gap: Clone,
@@ -269,18 +227,18 @@ impl<const CAP: usize, Item, Gap> LinkedChunk<CAP, Item, Gap> {
let chunk = self
.links
.chunk_mut(chunk_identifier)
.ok_or(LinkedChunkError::InvalidChunkIdentifier { identifier: chunk_identifier })?;
.ok_or(Error::InvalidChunkIdentifier { identifier: chunk_identifier })?;
let (chunk, number_of_items) = match &mut chunk.content {
ChunkContent::Gap(..) => {
return Err(LinkedChunkError::ChunkIsAGap { identifier: chunk_identifier })
return Err(Error::ChunkIsAGap { identifier: chunk_identifier })
}
ChunkContent::Items(current_items) => {
let current_items_length = current_items.len();
if item_index > current_items_length {
return Err(LinkedChunkError::InvalidItemIndex { index: item_index });
return Err(Error::InvalidItemIndex { index: item_index });
}
// Prepare the items to be pushed.
@@ -292,16 +250,12 @@ impl<const CAP: usize, Item, Gap> LinkedChunk<CAP, Item, Gap> {
if item_index == current_items_length {
chunk
// Push the new items.
.push_items(
items,
&self.chunk_identifier_generator,
&mut self.update_history,
)
.push_items(items, &self.chunk_identifier_generator, &mut self.updates)
}
// Insert inside the current items.
else {
if let Some(updates) = self.update_history.as_mut() {
updates.push(LinkedChunkUpdate::TruncateItems {
if let Some(updates) = self.updates.as_mut() {
updates.push(Update::TruncateItems {
chunk: chunk_identifier,
length: item_index,
});
@@ -312,16 +266,12 @@ impl<const CAP: usize, Item, Gap> LinkedChunk<CAP, Item, Gap> {
chunk
// Push the new items.
.push_items(
items,
&self.chunk_identifier_generator,
&mut self.update_history,
)
.push_items(items, &self.chunk_identifier_generator, &mut self.updates)
// Finally, push the items that have been detached.
.push_items(
detached_items.into_iter(),
&self.chunk_identifier_generator,
&mut self.update_history,
&mut self.updates,
)
},
number_of_items,
@@ -346,11 +296,7 @@ impl<const CAP: usize, Item, Gap> LinkedChunk<CAP, Item, Gap> {
///
/// Because the `position` can be invalid, this method returns a
/// `Result`.
pub fn insert_gap_at(
&mut self,
content: Gap,
position: Position,
) -> Result<(), LinkedChunkError>
pub fn insert_gap_at(&mut self, content: Gap, position: Position) -> Result<(), Error>
where
Item: Clone,
Gap: Clone,
@@ -361,7 +307,7 @@ impl<const CAP: usize, Item, Gap> LinkedChunk<CAP, Item, Gap> {
let chunk = self
.links
.chunk_mut(chunk_identifier)
.ok_or(LinkedChunkError::InvalidChunkIdentifier { identifier: chunk_identifier })?;
.ok_or(Error::InvalidChunkIdentifier { identifier: chunk_identifier })?;
// If `item_index` is 0, we don't want to split the current items chunk to
// insert a new gap chunk, otherwise it would create an empty current items
@@ -378,7 +324,7 @@ impl<const CAP: usize, Item, Gap> LinkedChunk<CAP, Item, Gap> {
previous_chunk.insert_next(
Chunk::new_gap_leaked(self.chunk_identifier_generator.next(), content),
&mut self.update_history,
&mut self.updates,
);
// We don't need to update `self.last` because we have inserted a new chunk
@@ -389,18 +335,18 @@ impl<const CAP: usize, Item, Gap> LinkedChunk<CAP, Item, Gap> {
let chunk = match &mut chunk.content {
ChunkContent::Gap(..) => {
return Err(LinkedChunkError::ChunkIsAGap { identifier: chunk_identifier });
return Err(Error::ChunkIsAGap { identifier: chunk_identifier });
}
ChunkContent::Items(current_items) => {
let current_items_length = current_items.len();
if item_index >= current_items_length {
return Err(LinkedChunkError::InvalidItemIndex { index: item_index });
return Err(Error::InvalidItemIndex { index: item_index });
}
if let Some(updates) = self.update_history.as_mut() {
updates.push(LinkedChunkUpdate::TruncateItems {
if let Some(updates) = self.updates.as_mut() {
updates.push(Update::TruncateItems {
chunk: chunk_identifier,
length: item_index,
});
@@ -413,18 +359,18 @@ impl<const CAP: usize, Item, Gap> LinkedChunk<CAP, Item, Gap> {
// Insert a new gap chunk.
.insert_next(
Chunk::new_gap_leaked(self.chunk_identifier_generator.next(), content),
&mut self.update_history,
&mut self.updates,
)
// Insert a new items chunk.
.insert_next(
Chunk::new_items_leaked(self.chunk_identifier_generator.next()),
&mut self.update_history,
&mut self.updates,
)
// Finally, push the items that have been detached.
.push_items(
detached_items.into_iter(),
&self.chunk_identifier_generator,
&mut self.update_history,
&mut self.updates,
)
}
};
@@ -451,7 +397,7 @@ impl<const CAP: usize, Item, Gap> LinkedChunk<CAP, Item, Gap> {
&mut self,
items: I,
chunk_identifier: ChunkIdentifier,
) -> Result<&Chunk<CAP, Item, Gap>, LinkedChunkError>
) -> Result<&Chunk<CAP, Item, Gap>, Error>
where
Item: Clone,
Gap: Clone,
@@ -465,7 +411,7 @@ impl<const CAP: usize, Item, Gap> LinkedChunk<CAP, Item, Gap> {
let chunk = self
.links
.chunk_mut(chunk_identifier)
.ok_or(LinkedChunkError::InvalidChunkIdentifier { identifier: chunk_identifier })?;
.ok_or(Error::InvalidChunkIdentifier { identifier: chunk_identifier })?;
debug_assert!(chunk.is_first_chunk().not(), "A gap cannot be the first chunk");
@@ -478,14 +424,10 @@ impl<const CAP: usize, Item, Gap> LinkedChunk<CAP, Item, Gap> {
// Insert a new items chunk…
.insert_next(
Chunk::new_items_leaked(self.chunk_identifier_generator.next()),
&mut self.update_history,
&mut self.updates,
)
// … and insert the items.
.push_items(
items,
&self.chunk_identifier_generator,
&mut self.update_history,
);
.push_items(items, &self.chunk_identifier_generator, &mut self.updates);
(
last_inserted_chunk.is_last_chunk().then(|| last_inserted_chunk.as_ptr()),
@@ -493,7 +435,7 @@ impl<const CAP: usize, Item, Gap> LinkedChunk<CAP, Item, Gap> {
)
}
ChunkContent::Items(..) => {
return Err(LinkedChunkError::ChunkIsItems { identifier: chunk_identifier })
return Err(Error::ChunkIsItems { identifier: chunk_identifier })
}
};
@@ -503,7 +445,7 @@ impl<const CAP: usize, Item, Gap> LinkedChunk<CAP, Item, Gap> {
.unwrap();
// Now that new items have been pushed, we can unlink the gap chunk.
chunk.unlink(&mut self.update_history);
chunk.unlink(&mut self.updates);
// Get the pointer to `chunk`.
chunk_ptr = chunk.as_ptr();
@@ -551,15 +493,15 @@ impl<const CAP: usize, Item, Gap> LinkedChunk<CAP, Item, Gap> {
/// Iterate over the chunks, backwards.
///
/// It iterates from the last to the first chunk.
pub fn rchunks(&self) -> LinkedChunkIterBackward<'_, CAP, Item, Gap> {
LinkedChunkIterBackward::new(self.links.latest_chunk())
pub fn rchunks(&self) -> IterBackward<'_, CAP, Item, Gap> {
IterBackward::new(self.links.latest_chunk())
}
/// Iterate over the chunks, forward.
///
/// It iterates from the first to the last chunk.
pub fn chunks(&self) -> LinkedChunkIter<'_, CAP, Item, Gap> {
LinkedChunkIter::new(self.links.first_chunk())
pub fn chunks(&self) -> Iter<'_, CAP, Item, Gap> {
Iter::new(self.links.first_chunk())
}
/// Iterate over the chunks, starting from `identifier`, backward.
@@ -569,11 +511,9 @@ impl<const CAP: usize, Item, Gap> LinkedChunk<CAP, Item, Gap> {
pub fn rchunks_from(
&self,
identifier: ChunkIdentifier,
) -> Result<LinkedChunkIterBackward<'_, CAP, Item, Gap>, LinkedChunkError> {
Ok(LinkedChunkIterBackward::new(
self.links
.chunk(identifier)
.ok_or(LinkedChunkError::InvalidChunkIdentifier { identifier })?,
) -> Result<IterBackward<'_, CAP, Item, Gap>, Error> {
Ok(IterBackward::new(
self.links.chunk(identifier).ok_or(Error::InvalidChunkIdentifier { identifier })?,
))
}
@@ -584,11 +524,9 @@ impl<const CAP: usize, Item, Gap> LinkedChunk<CAP, Item, Gap> {
pub fn chunks_from(
&self,
identifier: ChunkIdentifier,
) -> Result<LinkedChunkIter<'_, CAP, Item, Gap>, LinkedChunkError> {
Ok(LinkedChunkIter::new(
self.links
.chunk(identifier)
.ok_or(LinkedChunkError::InvalidChunkIdentifier { identifier })?,
) -> Result<Iter<'_, CAP, Item, Gap>, Error> {
Ok(Iter::new(
self.links.chunk(identifier).ok_or(Error::InvalidChunkIdentifier { identifier })?,
))
}
@@ -616,7 +554,7 @@ impl<const CAP: usize, Item, Gap> LinkedChunk<CAP, Item, Gap> {
pub fn ritems_from(
&self,
position: Position,
) -> Result<impl Iterator<Item = (Position, &Item)>, LinkedChunkError> {
) -> Result<impl Iterator<Item = (Position, &Item)>, Error> {
Ok(self
.rchunks_from(position.chunk_identifier())?
.filter_map(|chunk| match &chunk.content {
@@ -647,7 +585,7 @@ impl<const CAP: usize, Item, Gap> LinkedChunk<CAP, Item, Gap> {
pub fn items_from(
&self,
position: Position,
) -> Result<impl Iterator<Item = (Position, &Item)>, LinkedChunkError> {
) -> Result<impl Iterator<Item = (Position, &Item)>, Error> {
Ok(self
.chunks_from(position.chunk_identifier())?
.filter_map(|chunk| match &chunk.content {
@@ -666,15 +604,15 @@ impl<const CAP: usize, Item, Gap> LinkedChunk<CAP, Item, Gap> {
.skip(position.index()))
}
/// Get a mutable reference to the `LinkedChunk` updates.
/// Get a mutable reference to the `LinkedChunk` updates, aka [`Updates`].
///
/// The caller is responsible to drain/empty these updates.
/// The caller is responsible to clear these updates.
///
/// If the `Option` becomes `None`, it will disable update history. Thus, be
/// careful when you want to empty the update history: do not use
/// `Option::take()` directly but rather `Vec::drain` for example.
pub fn updates(&mut self) -> Option<&mut Vec<LinkedChunkUpdate<Item, Gap>>> {
self.update_history.as_mut()
/// `Option::take()` directly but rather [`Updates::take`] for example.
pub fn updates(&mut self) -> Option<&mut Updates<Item, Gap>> {
self.updates.as_mut()
}
}
@@ -789,18 +727,18 @@ impl Position {
/// An iterator over a [`LinkedChunk`] that traverses the chunk in backward
/// direction (i.e. it calls `previous` on each chunk to make progress).
pub struct LinkedChunkIterBackward<'a, const CAP: usize, Item, Gap> {
pub struct IterBackward<'a, const CAP: usize, Item, Gap> {
chunk: Option<&'a Chunk<CAP, Item, Gap>>,
}
impl<'a, const CAP: usize, Item, Gap> LinkedChunkIterBackward<'a, CAP, Item, Gap> {
impl<'a, const CAP: usize, Item, Gap> IterBackward<'a, CAP, Item, Gap> {
/// Create a new [`LinkedChunkIter`] from a particular [`Chunk`].
fn new(from_chunk: &'a Chunk<CAP, Item, Gap>) -> Self {
Self { chunk: Some(from_chunk) }
}
}
impl<'a, const CAP: usize, Item, Gap> Iterator for LinkedChunkIterBackward<'a, CAP, Item, Gap> {
impl<'a, const CAP: usize, Item, Gap> Iterator for IterBackward<'a, CAP, Item, Gap> {
type Item = &'a Chunk<CAP, Item, Gap>;
fn next(&mut self) -> Option<Self::Item> {
@@ -814,18 +752,18 @@ impl<'a, const CAP: usize, Item, Gap> Iterator for LinkedChunkIterBackward<'a, C
/// An iterator over a [`LinkedChunk`] that traverses the chunk in forward
/// direction (i.e. it calls `next` on each chunk to make progress).
pub struct LinkedChunkIter<'a, const CAP: usize, Item, Gap> {
pub struct Iter<'a, const CAP: usize, Item, Gap> {
chunk: Option<&'a Chunk<CAP, Item, Gap>>,
}
impl<'a, const CAP: usize, Item, Gap> LinkedChunkIter<'a, CAP, Item, Gap> {
impl<'a, const CAP: usize, Item, Gap> Iter<'a, CAP, Item, Gap> {
/// Create a new [`LinkedChunkIter`] from a particular [`Chunk`].
fn new(from_chunk: &'a Chunk<CAP, Item, Gap>) -> Self {
Self { chunk: Some(from_chunk) }
}
}
impl<'a, const CAP: usize, Item, Gap> Iterator for LinkedChunkIter<'a, CAP, Item, Gap> {
impl<'a, const CAP: usize, Item, Gap> Iterator for Iter<'a, CAP, Item, Gap> {
type Item = &'a Chunk<CAP, Item, Gap>;
fn next(&mut self) -> Option<Self::Item> {
@@ -973,7 +911,7 @@ impl<const CAPACITY: usize, Item, Gap> Chunk<CAPACITY, Item, Gap> {
&mut self,
mut new_items: I,
chunk_identifier_generator: &ChunkIdentifierGenerator,
updates: &mut Option<Vec<LinkedChunkUpdate<Item, Gap>>>,
updates: &mut Option<Updates<Item, Gap>>,
) -> &mut Self
where
I: Iterator<Item = Item> + ExactSizeIterator,
@@ -1012,7 +950,7 @@ impl<const CAPACITY: usize, Item, Gap> Chunk<CAPACITY, Item, Gap> {
items.extend(new_items);
if let Some(updates) = updates.as_mut() {
updates.push(LinkedChunkUpdate::InsertItems {
updates.push(Update::InsertItems {
at: Position(identifier, start),
items: items[start..].to_vec(),
});
@@ -1027,7 +965,7 @@ impl<const CAPACITY: usize, Item, Gap> Chunk<CAPACITY, Item, Gap> {
items.extend(new_items.by_ref().take(free_space));
if let Some(updates) = updates.as_mut() {
updates.push(LinkedChunkUpdate::InsertItems {
updates.push(Update::InsertItems {
at: Position(identifier, start),
items: items[start..].to_vec(),
});
@@ -1055,7 +993,7 @@ impl<const CAPACITY: usize, Item, Gap> Chunk<CAPACITY, Item, Gap> {
fn insert_next(
&mut self,
mut new_chunk_ptr: NonNull<Self>,
updates: &mut Option<Vec<LinkedChunkUpdate<Item, Gap>>>,
updates: &mut Option<Updates<Item, Gap>>,
) -> &mut Self
where
Gap: Clone,
@@ -1082,15 +1020,12 @@ impl<const CAPACITY: usize, Item, Gap> Chunk<CAPACITY, Item, Gap> {
let next = new_chunk.next().map(Chunk::identifier);
match new_chunk.content() {
ChunkContent::Gap(gap) => updates.push(LinkedChunkUpdate::NewGapChunk {
previous,
new,
next,
gap: gap.clone(),
}),
ChunkContent::Gap(gap) => {
updates.push(Update::NewGapChunk { previous, new, next, gap: gap.clone() })
}
ChunkContent::Items(..) => {
updates.push(LinkedChunkUpdate::NewItemsChunk { previous, new, next })
updates.push(Update::NewItemsChunk { previous, new, next })
}
}
}
@@ -1102,7 +1037,7 @@ impl<const CAPACITY: usize, Item, Gap> Chunk<CAPACITY, Item, Gap> {
///
/// Be careful: `self` won't belong to `LinkedChunk` anymore, and should be
/// dropped appropriately.
fn unlink(&mut self, updates: &mut Option<Vec<LinkedChunkUpdate<Item, Gap>>>) {
fn unlink(&mut self, updates: &mut Option<Updates<Item, Gap>>) {
let previous_ptr = self.previous;
let next_ptr = self.next;
@@ -1115,7 +1050,7 @@ impl<const CAPACITY: usize, Item, Gap> Chunk<CAPACITY, Item, Gap> {
}
if let Some(updates) = updates.as_mut() {
updates.push(LinkedChunkUpdate::RemoveChunk(self.identifier()));
updates.push(Update::RemoveChunk(self.identifier()));
}
}
@@ -1178,8 +1113,8 @@ mod tests {
use assert_matches::assert_matches;
use super::{
Chunk, ChunkContent, ChunkIdentifier, ChunkIdentifierGenerator, LinkedChunk,
LinkedChunkError, Position,
Chunk, ChunkContent, ChunkIdentifier, ChunkIdentifierGenerator, Error, LinkedChunk,
Position,
};
/// A macro to test the items and the gap of a `LinkedChunk`.
@@ -1283,35 +1218,35 @@ mod tests {
}
#[test]
fn test_update_history() {
fn test_updates() {
assert!(LinkedChunk::<3, char, ()>::new().updates().is_none());
assert!(LinkedChunk::<3, char, ()>::new_with_update_history().updates().is_some());
}
#[test]
fn test_push_items() {
use super::LinkedChunkUpdate::*;
use super::Update::*;
let mut linked_chunk = LinkedChunk::<3, char, ()>::new_with_update_history();
linked_chunk.push_items_back(['a']);
assert_items_eq!(linked_chunk, ['a']);
assert_eq!(
linked_chunk.updates().unwrap().drain(..).collect::<Vec<_>>(),
linked_chunk.updates().unwrap().take(),
&[InsertItems { at: Position(ChunkIdentifier(0), 0), items: vec!['a'] }]
);
linked_chunk.push_items_back(['b', 'c']);
assert_items_eq!(linked_chunk, ['a', 'b', 'c']);
assert_eq!(
linked_chunk.updates().unwrap().drain(..).collect::<Vec<_>>(),
linked_chunk.updates().unwrap().take(),
&[InsertItems { at: Position(ChunkIdentifier(0), 1), items: vec!['b', 'c'] }]
);
linked_chunk.push_items_back(['d', 'e']);
assert_items_eq!(linked_chunk, ['a', 'b', 'c'] ['d', 'e']);
assert_eq!(
linked_chunk.updates().unwrap().drain(..).collect::<Vec<_>>(),
linked_chunk.updates().unwrap().take(),
&[
NewItemsChunk {
previous: Some(ChunkIdentifier(0)),
@@ -1325,7 +1260,7 @@ mod tests {
linked_chunk.push_items_back(['f', 'g', 'h', 'i', 'j']);
assert_items_eq!(linked_chunk, ['a', 'b', 'c'] ['d', 'e', 'f'] ['g', 'h', 'i'] ['j']);
assert_eq!(
linked_chunk.updates().unwrap().drain(..).collect::<Vec<_>>(),
linked_chunk.updates().unwrap().take(),
&[
InsertItems { at: Position(ChunkIdentifier(1), 2), items: vec!['f'] },
NewItemsChunk {
@@ -1348,20 +1283,20 @@ mod tests {
#[test]
fn test_push_gap() {
use super::LinkedChunkUpdate::*;
use super::Update::*;
let mut linked_chunk = LinkedChunk::<3, char, ()>::new_with_update_history();
linked_chunk.push_items_back(['a']);
assert_items_eq!(linked_chunk, ['a']);
assert_eq!(
linked_chunk.updates().unwrap().drain(..).collect::<Vec<_>>(),
linked_chunk.updates().unwrap().take(),
&[InsertItems { at: Position(ChunkIdentifier(0), 0), items: vec!['a'] }]
);
linked_chunk.push_gap_back(());
assert_items_eq!(linked_chunk, ['a'] [-]);
assert_eq!(
linked_chunk.updates().unwrap().drain(..).collect::<Vec<_>>(),
linked_chunk.updates().unwrap().take(),
&[NewGapChunk {
previous: Some(ChunkIdentifier(0)),
new: ChunkIdentifier(1),
@@ -1373,7 +1308,7 @@ mod tests {
linked_chunk.push_items_back(['b', 'c', 'd', 'e']);
assert_items_eq!(linked_chunk, ['a'] [-] ['b', 'c', 'd'] ['e']);
assert_eq!(
linked_chunk.updates().unwrap().drain(..).collect::<Vec<_>>(),
linked_chunk.updates().unwrap().take(),
&[
NewItemsChunk {
previous: Some(ChunkIdentifier(1)),
@@ -1394,7 +1329,7 @@ mod tests {
linked_chunk.push_gap_back(()); // why not
assert_items_eq!(linked_chunk, ['a'] [-] ['b', 'c', 'd'] ['e'] [-] [-]);
assert_eq!(
linked_chunk.updates().unwrap().drain(..).collect::<Vec<_>>(),
linked_chunk.updates().unwrap().take(),
&[
NewGapChunk {
previous: Some(ChunkIdentifier(3)),
@@ -1414,7 +1349,7 @@ mod tests {
linked_chunk.push_items_back(['f', 'g', 'h', 'i']);
assert_items_eq!(linked_chunk, ['a'] [-] ['b', 'c', 'd'] ['e'] [-] [-] ['f', 'g', 'h'] ['i']);
assert_eq!(
linked_chunk.updates().unwrap().drain(..).collect::<Vec<_>>(),
linked_chunk.updates().unwrap().take(),
&[
NewItemsChunk {
previous: Some(ChunkIdentifier(5)),
@@ -1518,7 +1453,7 @@ mod tests {
}
#[test]
fn test_rchunks_from() -> Result<(), LinkedChunkError> {
fn test_rchunks_from() -> Result<(), Error> {
let mut linked_chunk = LinkedChunk::<2, char, ()>::new();
linked_chunk.push_items_back(['a', 'b']);
linked_chunk.push_gap_back(());
@@ -1550,7 +1485,7 @@ mod tests {
}
#[test]
fn test_chunks_from() -> Result<(), LinkedChunkError> {
fn test_chunks_from() -> Result<(), Error> {
let mut linked_chunk = LinkedChunk::<2, char, ()>::new();
linked_chunk.push_items_back(['a', 'b']);
linked_chunk.push_gap_back(());
@@ -1612,7 +1547,7 @@ mod tests {
}
#[test]
fn test_ritems_from() -> Result<(), LinkedChunkError> {
fn test_ritems_from() -> Result<(), Error> {
let mut linked_chunk = LinkedChunk::<2, char, ()>::new();
linked_chunk.push_items_back(['a', 'b']);
linked_chunk.push_gap_back(());
@@ -1630,7 +1565,7 @@ mod tests {
}
#[test]
fn test_items_from() -> Result<(), LinkedChunkError> {
fn test_items_from() -> Result<(), Error> {
let mut linked_chunk = LinkedChunk::<2, char, ()>::new();
linked_chunk.push_items_back(['a', 'b']);
linked_chunk.push_gap_back(());
@@ -1648,14 +1583,14 @@ mod tests {
}
#[test]
fn test_insert_items_at() -> Result<(), LinkedChunkError> {
use super::LinkedChunkUpdate::*;
fn test_insert_items_at() -> Result<(), Error> {
use super::Update::*;
let mut linked_chunk = LinkedChunk::<3, char, ()>::new_with_update_history();
linked_chunk.push_items_back(['a', 'b', 'c', 'd', 'e', 'f']);
assert_items_eq!(linked_chunk, ['a', 'b', 'c'] ['d', 'e', 'f']);
assert_eq!(
linked_chunk.updates().unwrap().drain(..).collect::<Vec<_>>(),
linked_chunk.updates().unwrap().take(),
&[
InsertItems { at: Position(ChunkIdentifier(0), 0), items: vec!['a', 'b', 'c'] },
NewItemsChunk {
@@ -1681,7 +1616,7 @@ mod tests {
);
assert_eq!(linked_chunk.len(), 10);
assert_eq!(
linked_chunk.updates().unwrap().drain(..).collect::<Vec<_>>(),
linked_chunk.updates().unwrap().take(),
&[
TruncateItems { chunk: ChunkIdentifier(1), length: 1 },
InsertItems { at: Position(ChunkIdentifier(1), 1), items: vec!['w', 'x'] },
@@ -1713,7 +1648,7 @@ mod tests {
);
assert_eq!(linked_chunk.len(), 14);
assert_eq!(
linked_chunk.updates().unwrap().drain(..).collect::<Vec<_>>(),
linked_chunk.updates().unwrap().take(),
&[
TruncateItems { chunk: ChunkIdentifier(0), length: 0 },
InsertItems { at: Position(ChunkIdentifier(0), 0), items: vec!['l', 'm', 'n'] },
@@ -1745,7 +1680,7 @@ mod tests {
);
assert_eq!(linked_chunk.len(), 16);
assert_eq!(
linked_chunk.updates().unwrap().drain(..).collect::<Vec<_>>(),
linked_chunk.updates().unwrap().take(),
&[
TruncateItems { chunk: ChunkIdentifier(5), length: 0 },
InsertItems { at: Position(ChunkIdentifier(5), 0), items: vec!['r', 's'] },
@@ -1766,7 +1701,7 @@ mod tests {
['l', 'm', 'n'] ['o', 'a', 'b'] ['r', 's', 'c'] ['d', 'w', 'x'] ['y', 'z', 'e'] ['f', 'p', 'q']
);
assert_eq!(
linked_chunk.updates().unwrap().drain(..).collect::<Vec<_>>(),
linked_chunk.updates().unwrap().take(),
&[InsertItems { at: Position(ChunkIdentifier(3), 1), items: vec!['p', 'q'] },]
);
assert_eq!(linked_chunk.len(), 18);
@@ -1776,18 +1711,18 @@ mod tests {
{
assert_matches!(
linked_chunk.insert_items_at(['u', 'v'], Position(ChunkIdentifier(128), 0)),
Err(LinkedChunkError::InvalidChunkIdentifier { identifier: ChunkIdentifier(128) })
Err(Error::InvalidChunkIdentifier { identifier: ChunkIdentifier(128) })
);
assert!(linked_chunk.updates().unwrap().drain(..).collect::<Vec<_>>().is_empty());
assert!(linked_chunk.updates().unwrap().take().is_empty());
}
// Insert in a chunk that exists, but at an item that does not exist.
{
assert_matches!(
linked_chunk.insert_items_at(['u', 'v'], Position(ChunkIdentifier(0), 128)),
Err(LinkedChunkError::InvalidItemIndex { index: 128 })
Err(Error::InvalidItemIndex { index: 128 })
);
assert!(linked_chunk.updates().unwrap().drain(..).collect::<Vec<_>>().is_empty());
assert!(linked_chunk.updates().unwrap().take().is_empty());
}
// Insert in a gap.
@@ -1799,7 +1734,7 @@ mod tests {
['l', 'm', 'n'] ['o', 'a', 'b'] ['r', 's', 'c'] ['d', 'w', 'x'] ['y', 'z', 'e'] ['f', 'p', 'q'] [-]
);
assert_eq!(
linked_chunk.updates().unwrap().drain(..).collect::<Vec<_>>(),
linked_chunk.updates().unwrap().take(),
&[NewGapChunk {
previous: Some(ChunkIdentifier(3)),
new: ChunkIdentifier(6),
@@ -1810,7 +1745,7 @@ mod tests {
assert_matches!(
linked_chunk.insert_items_at(['u', 'v'], Position(ChunkIdentifier(6), 0)),
Err(LinkedChunkError::ChunkIsAGap { identifier: ChunkIdentifier(6) })
Err(Error::ChunkIsAGap { identifier: ChunkIdentifier(6) })
);
}
@@ -1820,14 +1755,14 @@ mod tests {
}
#[test]
fn test_insert_gap_at() -> Result<(), LinkedChunkError> {
use super::LinkedChunkUpdate::*;
fn test_insert_gap_at() -> Result<(), Error> {
use super::Update::*;
let mut linked_chunk = LinkedChunk::<3, char, ()>::new_with_update_history();
linked_chunk.push_items_back(['a', 'b', 'c', 'd', 'e', 'f']);
assert_items_eq!(linked_chunk, ['a', 'b', 'c'] ['d', 'e', 'f']);
assert_eq!(
linked_chunk.updates().unwrap().drain(..).collect::<Vec<_>>(),
linked_chunk.updates().unwrap().take(),
&[
InsertItems { at: Position(ChunkIdentifier(0), 0), items: vec!['a', 'b', 'c'] },
NewItemsChunk {
@@ -1846,7 +1781,7 @@ mod tests {
assert_items_eq!(linked_chunk, ['a'] [-] ['b', 'c'] ['d', 'e', 'f']);
assert_eq!(
linked_chunk.updates().unwrap().drain(..).collect::<Vec<_>>(),
linked_chunk.updates().unwrap().take(),
&[
TruncateItems { chunk: ChunkIdentifier(0), length: 1 },
NewGapChunk {
@@ -1873,7 +1808,7 @@ mod tests {
// A new empty chunk is created as the first chunk.
assert_items_eq!(linked_chunk, [] [-] ['a'] [-] ['b', 'c'] ['d', 'e', 'f']);
assert_eq!(
linked_chunk.updates().unwrap().drain(..).collect::<Vec<_>>(),
linked_chunk.updates().unwrap().take(),
&[
TruncateItems { chunk: ChunkIdentifier(0), length: 0 },
NewGapChunk {
@@ -1902,7 +1837,7 @@ mod tests {
// space.
assert_items_eq!(linked_chunk, [] [-] ['a'] [-] ['b', 'c'] [-] ['d', 'e', 'f']);
assert_eq!(
linked_chunk.updates().unwrap().drain(..).collect::<Vec<_>>(),
linked_chunk.updates().unwrap().take(),
&[NewGapChunk {
previous: Some(ChunkIdentifier(3)),
new: ChunkIdentifier(6),
@@ -1917,9 +1852,9 @@ mod tests {
let position_of_first_empty_chunk = Position(ChunkIdentifier(0), 0);
assert_matches!(
linked_chunk.insert_gap_at((), position_of_first_empty_chunk),
Err(LinkedChunkError::InvalidItemIndex { index: 0 })
Err(Error::InvalidItemIndex { index: 0 })
);
assert!(linked_chunk.updates().unwrap().drain(..).collect::<Vec<_>>().is_empty());
assert!(linked_chunk.updates().unwrap().take().is_empty());
}
// Insert in an empty chunk.
@@ -1930,7 +1865,7 @@ mod tests {
assert_items_eq!(linked_chunk, [] [-] ['a'] [-] ['b', 'c'] [] ['d', 'e', 'f']);
assert_eq!(
linked_chunk.updates().unwrap().drain(..).collect::<Vec<_>>(),
linked_chunk.updates().unwrap().take(),
&[
NewItemsChunk {
previous: Some(ChunkIdentifier(6)),
@@ -1945,7 +1880,7 @@ mod tests {
assert_items_eq!(linked_chunk, [] [-] ['a'] [-] ['b', 'c'] [-] [] ['d', 'e', 'f']);
assert_eq!(
linked_chunk.updates().unwrap().drain(..).collect::<Vec<_>>(),
linked_chunk.updates().unwrap().take(),
&[NewGapChunk {
previous: Some(ChunkIdentifier(3)),
new: ChunkIdentifier(8),
@@ -1959,18 +1894,18 @@ mod tests {
{
assert_matches!(
linked_chunk.insert_items_at(['u', 'v'], Position(ChunkIdentifier(128), 0)),
Err(LinkedChunkError::InvalidChunkIdentifier { identifier: ChunkIdentifier(128) })
Err(Error::InvalidChunkIdentifier { identifier: ChunkIdentifier(128) })
);
assert!(linked_chunk.updates().unwrap().drain(..).collect::<Vec<_>>().is_empty());
assert!(linked_chunk.updates().unwrap().take().is_empty());
}
// Insert in a chunk that exists, but at an item that does not exist.
{
assert_matches!(
linked_chunk.insert_items_at(['u', 'v'], Position(ChunkIdentifier(0), 128)),
Err(LinkedChunkError::InvalidItemIndex { index: 128 })
Err(Error::InvalidItemIndex { index: 128 })
);
assert!(linked_chunk.updates().unwrap().drain(..).collect::<Vec<_>>().is_empty());
assert!(linked_chunk.updates().unwrap().take().is_empty());
}
// Insert in an existing gap.
@@ -1980,9 +1915,9 @@ mod tests {
let position_of_a_gap = Position(ChunkIdentifier(4), 0);
assert_matches!(
linked_chunk.insert_gap_at((), position_of_a_gap),
Err(LinkedChunkError::ChunkIsAGap { identifier: ChunkIdentifier(4) })
Err(Error::ChunkIsAGap { identifier: ChunkIdentifier(4) })
);
assert!(linked_chunk.updates().unwrap().drain(..).collect::<Vec<_>>().is_empty());
assert!(linked_chunk.updates().unwrap().take().is_empty());
}
assert_eq!(linked_chunk.len(), 6);
@@ -1991,8 +1926,8 @@ mod tests {
}
#[test]
fn test_replace_gap_at() -> Result<(), LinkedChunkError> {
use super::LinkedChunkUpdate::*;
fn test_replace_gap_at() -> Result<(), Error> {
use super::Update::*;
let mut linked_chunk = LinkedChunk::<3, char, ()>::new_with_update_history();
linked_chunk.push_items_back(['a', 'b']);
@@ -2000,7 +1935,7 @@ mod tests {
linked_chunk.push_items_back(['l', 'm']);
assert_items_eq!(linked_chunk, ['a', 'b'] [-] ['l', 'm']);
assert_eq!(
linked_chunk.updates().unwrap().drain(..).collect::<Vec<_>>(),
linked_chunk.updates().unwrap().take(),
&[
InsertItems { at: Position(ChunkIdentifier(0), 0), items: vec!['a', 'b'] },
NewGapChunk {
@@ -2031,7 +1966,7 @@ mod tests {
['a', 'b'] ['d', 'e', 'f'] ['g', 'h'] ['l', 'm']
);
assert_eq!(
linked_chunk.updates().unwrap().drain(..).collect::<Vec<_>>(),
linked_chunk.updates().unwrap().take(),
&[
NewItemsChunk {
previous: Some(ChunkIdentifier(1)),
@@ -2058,7 +1993,7 @@ mod tests {
['a', 'b'] ['d', 'e', 'f'] ['g', 'h'] ['l', 'm'] [-]
);
assert_eq!(
linked_chunk.updates().unwrap().drain(..).collect::<Vec<_>>(),
linked_chunk.updates().unwrap().take(),
&[NewGapChunk {
previous: Some(ChunkIdentifier(2)),
new: ChunkIdentifier(5),
@@ -2077,7 +2012,7 @@ mod tests {
['a', 'b'] ['d', 'e', 'f'] ['g', 'h'] ['l', 'm'] ['w', 'x', 'y'] ['z']
);
assert_eq!(
linked_chunk.updates().unwrap().drain(..).collect::<Vec<_>>(),
linked_chunk.updates().unwrap().take(),
&[
NewItemsChunk {
previous: Some(ChunkIdentifier(5)),

View File

@@ -0,0 +1,644 @@
// Copyright 2024 The Matrix.org Foundation C.I.C.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::{
collections::HashMap,
pin::Pin,
sync::{Arc, RwLock, Weak},
task::{Context, Poll, Waker},
};
use futures_core::Stream;
use super::{ChunkIdentifier, Position};
/// Represent the updates that have happened inside a [`LinkedChunk`].
///
/// To retrieve the updates, use [`LinkedChunk::updates`].
///
/// These updates are useful to store a `LinkedChunk` in another form of
/// storage, like a database or something similar.
#[derive(Debug, PartialEq)]
pub enum Update<Item, Gap> {
/// A new chunk of kind Items has been created.
NewItemsChunk {
/// The identifier of the previous chunk of this new chunk.
previous: Option<ChunkIdentifier>,
/// The identifier of the new chunk.
new: ChunkIdentifier,
/// The identifier of the next chunk of this new chunk.
next: Option<ChunkIdentifier>,
},
/// A new chunk of kind Gap has been created.
NewGapChunk {
/// The identifier of the previous chunk of this new chunk.
previous: Option<ChunkIdentifier>,
/// The identifier of the new chunk.
new: ChunkIdentifier,
/// The identifier of the next chunk of this new chunk.
next: Option<ChunkIdentifier>,
/// The content of the chunk.
gap: Gap,
},
/// A chunk has been removed.
RemoveChunk(ChunkIdentifier),
/// Items are inserted inside a chunk of kind Items.
InsertItems {
/// [`Position`] of the items.
at: Position,
/// The items.
items: Vec<Item>,
},
/// A chunk of kind Items has been truncated.
TruncateItems {
/// The identifier of the chunk.
chunk: ChunkIdentifier,
/// The new length of the chunk.
length: usize,
},
}
impl<Item, Gap> Clone for Update<Item, Gap>
where
Item: Clone,
Gap: Clone,
{
fn clone(&self) -> Self {
match self {
Self::NewItemsChunk { previous, new, next } => {
Self::NewItemsChunk { previous: *previous, new: *new, next: *next }
}
Self::NewGapChunk { previous, new, next, gap } => {
Self::NewGapChunk { previous: *previous, new: *new, next: *next, gap: gap.clone() }
}
Self::RemoveChunk(identifier) => Self::RemoveChunk(*identifier),
Self::InsertItems { at, items } => Self::InsertItems { at: *at, items: items.clone() },
Self::TruncateItems { chunk, length } => {
Self::TruncateItems { chunk: *chunk, length: *length }
}
}
}
}
/// A collection of [`Update`].
///
/// Get a value for this type with [`LinkedChunk::updates`].
pub struct Updates<Item, Gap> {
inner: Arc<RwLock<UpdatesInner<Item, Gap>>>,
}
/// A token used to represent readers that read the updates in
/// [`UpdatesInner`].
type ReaderToken = usize;
/// Inner type for [`Updates`].
///
/// The particularity of this type is that multiple readers can read the
/// updates. A reader has a [`ReaderToken`]. The public API (i.e.
/// [`Updates`]) is considered to be the _main reader_ (it has the token
/// [`Self::MAIN_READER_TOKEN`]).
///
/// An update that have been read by all readers are garbage collected to be
/// removed from the memory. An update will never be read twice by the same
/// reader.
///
/// Why do we need multiple readers? The public API reads the updates with
/// [`Updates::take`], but the private API must also read the updates for
/// example with [`UpdatesSubscriber`]. Of course, they can be multiple
/// `UpdatesSubscriber`s at the same time. Hence the need of supporting multiple
/// readers.
struct UpdatesInner<Item, Gap> {
/// All the updates that have not been read by all readers.
updates: Vec<Update<Item, Gap>>,
/// Updates are stored in [`Self::updates`]. Multiple readers can read them.
/// A reader is identified by a [`ReaderToken`].
///
/// To each reader token is associated an index that represents the index of
/// the last reading. It is used to never return the same update twice.
last_index_per_reader: HashMap<ReaderToken, usize>,
/// The last generated token. This is useful to generate new token.
last_token: ReaderToken,
/// Pending wakers for [`UpdateSubscriber`]s. A waker is removed
/// everytime it is called.
wakers: Vec<Waker>,
}
impl<Item, Gap> UpdatesInner<Item, Gap> {
/// The token used by the main reader. See [`Self::take`] to learn more.
const MAIN_READER_TOKEN: ReaderToken = 0;
/// Create a new [`Self`].
fn new() -> Self {
Self {
updates: Vec::with_capacity(8),
last_index_per_reader: {
let mut map = HashMap::with_capacity(2);
map.insert(Self::MAIN_READER_TOKEN, 0);
map
},
last_token: Self::MAIN_READER_TOKEN,
wakers: Vec::with_capacity(2),
}
}
/// Push a new update.
fn push(&mut self, update: Update<Item, Gap>) {
self.updates.push(update);
// Wake them up \o/.
for waker in self.wakers.drain(..) {
waker.wake();
}
}
/// Take new updates; it considers the caller is the main reader, i.e. it
/// will use the [`Self::MAIN_READER_TOKEN`].
///
/// Updates that have been read will never be read again by the current
/// reader.
///
/// Learn more by reading [`Self::take_with_token`].
fn take(&mut self) -> &[Update<Item, Gap>] {
self.take_with_token(Self::MAIN_READER_TOKEN)
}
/// Take new updates with a particular reader token.
///
/// Updates are stored in [`Self::updates`]. Multiple readers can read them.
/// A reader is identified by a [`ReaderToken`]. Every reader can
/// take/read/consume each update only once. An internal index is stored
/// per reader token to know where to start reading updates next time this
/// method is called.
fn take_with_token(&mut self, token: ReaderToken) -> &[Update<Item, Gap>] {
// Let's garbage collect unused updates.
self.garbage_collect();
let index = self
.last_index_per_reader
.get_mut(&token)
.expect("Given `UpdatesToken` does not map to any index");
// Read new updates, and update the index.
let slice = &self.updates[*index..];
*index = self.updates.len();
slice
}
/// Return the number of updates in the buffer.
fn len(&self) -> usize {
self.updates.len()
}
/// Garbage collect unused updates. An update is considered unused when it's
/// been read by all readers.
///
/// Basically, it reduces to finding the smallest last index for all
/// readers, and clear from 0 to that index.
fn garbage_collect(&mut self) {
let min_index = self.last_index_per_reader.values().min().copied().unwrap_or(0);
if min_index > 0 {
let _ = self.updates.drain(0..min_index);
// Let's shift the indices to the left by `min_index` to preserve them.
for index in self.last_index_per_reader.values_mut() {
*index -= min_index;
}
}
}
}
impl<Item, Gap> Updates<Item, Gap> {
/// Create a new [`Self`].
pub(super) fn new() -> Self {
Self { inner: Arc::new(RwLock::new(UpdatesInner::new())) }
}
/// Push a new update.
pub(super) fn push(&mut self, update: Update<Item, Gap>) {
self.inner.write().unwrap().push(update);
}
/// Take new updates.
///
/// Updates that have been taken will not be read again.
pub(super) fn take(&mut self) -> Vec<Update<Item, Gap>>
where
Item: Clone,
Gap: Clone,
{
self.inner.write().unwrap().take().to_owned()
}
/// Subscribe to updates by using a [`Stream`].
fn subscribe(&mut self) -> UpdatesSubscriber<Item, Gap> {
// A subscriber is a new update reader, it needs its own token.
let token = {
let mut inner = self.inner.write().unwrap();
inner.last_token += 1;
let last_token = inner.last_token;
inner.last_index_per_reader.insert(last_token, 0);
last_token
};
UpdatesSubscriber { updates: Arc::downgrade(&self.inner), token }
}
}
/// A subscriber to [`Updates`]. It is helpful to receive updates via a
/// [`Stream`].
struct UpdatesSubscriber<Item, Gap> {
/// Weak reference to [`UpdatesInner`].
///
/// Using a weak reference allows [`Updates`] to be dropped
/// freely even if a subscriber exists.
updates: Weak<RwLock<UpdatesInner<Item, Gap>>>,
/// The token to read the updates.
token: ReaderToken,
}
impl<Item, Gap> Stream for UpdatesSubscriber<Item, Gap>
where
Item: Clone,
Gap: Clone,
{
type Item = Vec<Update<Item, Gap>>;
fn poll_next(self: Pin<&mut Self>, context: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let Some(updates) = self.updates.upgrade() else {
// The `Updates` has been dropped. It's time to close this stream.
return Poll::Ready(None);
};
let mut updates = updates.write().unwrap();
let the_updates = updates.take_with_token(self.token);
// No updates.
if the_updates.is_empty() {
// Let's register the waker.
updates.wakers.push(context.waker().clone());
// The stream is pending.
return Poll::Pending;
}
// There is updates! Let's forward them in this stream.
Poll::Ready(Some(the_updates.to_owned()))
}
}
#[cfg(test)]
mod tests {
use std::{
sync::{Arc, Mutex},
task::{Context, Poll, Wake},
};
use assert_matches::assert_matches;
use futures_util::pin_mut;
use super::{super::LinkedChunk, ChunkIdentifier, Position, Stream, UpdatesInner};
#[test]
fn test_updates_take_and_garbage_collector() {
use super::Update::*;
let mut linked_chunk = LinkedChunk::<10, char, ()>::new_with_update_history();
// Simulate another updates “reader”, it can a subscriber.
let main_token = UpdatesInner::<char, ()>::MAIN_READER_TOKEN;
let other_token = {
let updates = linked_chunk.updates().unwrap();
let mut inner = updates.inner.write().unwrap();
inner.last_token += 1;
let other_token = inner.last_token;
inner.last_index_per_reader.insert(other_token, 0);
other_token
};
// There is no new update yet.
{
let updates = linked_chunk.updates().unwrap();
assert!(updates.take().is_empty());
assert!(updates.inner.write().unwrap().take_with_token(other_token).is_empty());
}
linked_chunk.push_items_back(['a']);
linked_chunk.push_items_back(['b']);
linked_chunk.push_items_back(['c']);
// Scenario 1: “main” takes the new updates, “other” doesn't take the new
// updates.
//
// 0 1 2 3
// +---+---+---+
// | a | b | c |
// +---+---+---+
//
// “main” will move its index from 0 to 3.
// “other” won't move its index.
{
let updates = linked_chunk.updates().unwrap();
{
// Inspect number of updates in memory.
assert_eq!(updates.inner.read().unwrap().len(), 3);
}
assert_eq!(
updates.take(),
&[
InsertItems { at: Position(ChunkIdentifier(0), 0), items: vec!['a'] },
InsertItems { at: Position(ChunkIdentifier(0), 1), items: vec!['b'] },
InsertItems { at: Position(ChunkIdentifier(0), 2), items: vec!['c'] },
]
);
{
let inner = updates.inner.read().unwrap();
// Inspect number of updates in memory.
// It must be the same number as before as the garbage collector weren't not
// able to remove any unused updates.
assert_eq!(inner.len(), 3);
// Inspect the indices.
let indices = &inner.last_index_per_reader;
assert_eq!(indices.get(&main_token), Some(&3));
assert_eq!(indices.get(&other_token), Some(&0));
}
}
linked_chunk.push_items_back(['d']);
linked_chunk.push_items_back(['e']);
linked_chunk.push_items_back(['f']);
// Scenario 2: “other“ takes the new updates, “main” doesn't take the
// new updates.
//
// 0 1 2 3 4 5 6
// +---+---+---+---+---+---+
// | a | b | c | d | e | f |
// +---+---+---+---+---+---+
//
// “main” won't move its index.
// “other” will move its index from 0 to 6.
{
let updates = linked_chunk.updates().unwrap();
assert_eq!(
updates.inner.write().unwrap().take_with_token(other_token),
&[
InsertItems { at: Position(ChunkIdentifier(0), 0), items: vec!['a'] },
InsertItems { at: Position(ChunkIdentifier(0), 1), items: vec!['b'] },
InsertItems { at: Position(ChunkIdentifier(0), 2), items: vec!['c'] },
InsertItems { at: Position(ChunkIdentifier(0), 3), items: vec!['d'] },
InsertItems { at: Position(ChunkIdentifier(0), 4), items: vec!['e'] },
InsertItems { at: Position(ChunkIdentifier(0), 5), items: vec!['f'] },
]
);
{
let inner = updates.inner.read().unwrap();
// Inspect number of updates in memory.
// It must be the same number as before as the garbage collector will be able to
// remove unused updates but at the next call…
assert_eq!(inner.len(), 6);
// Inspect the indices.
let indices = &inner.last_index_per_reader;
assert_eq!(indices.get(&main_token), Some(&3));
assert_eq!(indices.get(&other_token), Some(&6));
}
}
// Scenario 3: “other” take new updates, but there is none, “main”
// doesn't take new updates. The garbage collector will run and collect
// unused updates.
//
// 0 1 2 3
// +---+---+---+
// | d | e | f |
// +---+---+---+
//
// “main” will have its index updated from 3 to 0.
// “other” will have its index updated from 6 to 3.
{
let updates = linked_chunk.updates().unwrap();
assert!(updates.inner.write().unwrap().take_with_token(other_token).is_empty());
{
let inner = updates.inner.read().unwrap();
// Inspect number of updates in memory.
// The garbage collector has removed unused updates.
assert_eq!(inner.len(), 3);
// Inspect the indices. They must have been adjusted.
let indices = &inner.last_index_per_reader;
assert_eq!(indices.get(&main_token), Some(&0));
assert_eq!(indices.get(&other_token), Some(&3));
}
}
linked_chunk.push_items_back(['g']);
linked_chunk.push_items_back(['h']);
linked_chunk.push_items_back(['i']);
// Scenario 4: both “main” and “other” take the new updates.
//
// 0 1 2 3 4 5 6
// +---+---+---+---+---+---+
// | d | e | f | g | h | i |
// +---+---+---+---+---+---+
//
// “main” will have its index updated from 3 to 0.
// “other” will have its index updated from 6 to 3.
{
let updates = linked_chunk.updates().unwrap();
assert_eq!(
updates.take(),
&[
InsertItems { at: Position(ChunkIdentifier(0), 3), items: vec!['d'] },
InsertItems { at: Position(ChunkIdentifier(0), 4), items: vec!['e'] },
InsertItems { at: Position(ChunkIdentifier(0), 5), items: vec!['f'] },
InsertItems { at: Position(ChunkIdentifier(0), 6), items: vec!['g'] },
InsertItems { at: Position(ChunkIdentifier(0), 7), items: vec!['h'] },
InsertItems { at: Position(ChunkIdentifier(0), 8), items: vec!['i'] },
]
);
assert_eq!(
updates.inner.write().unwrap().take_with_token(other_token),
&[
InsertItems { at: Position(ChunkIdentifier(0), 6), items: vec!['g'] },
InsertItems { at: Position(ChunkIdentifier(0), 7), items: vec!['h'] },
InsertItems { at: Position(ChunkIdentifier(0), 8), items: vec!['i'] },
]
);
{
let inner = updates.inner.read().unwrap();
// Inspect number of updates in memory.
// The garbage collector had a chance to collect the first 3 updates.
assert_eq!(inner.len(), 3);
// Inspect the indices.
let indices = &inner.last_index_per_reader;
assert_eq!(indices.get(&main_token), Some(&3));
assert_eq!(indices.get(&other_token), Some(&3));
}
}
// Scenario 5: no more updates but they both try to take new updates.
// The garbage collector will collect all updates as all of them as
// been read already.
//
// “main” will have its index updated from 0 to 0.
// “other” will have its index updated from 3 to 0.
{
let updates = linked_chunk.updates().unwrap();
assert!(updates.take().is_empty());
assert!(updates.inner.write().unwrap().take_with_token(other_token).is_empty());
{
let inner = updates.inner.read().unwrap();
// Inspect number of updates in memory.
// The garbage collector had a chance to collect all updates.
assert_eq!(inner.len(), 0);
// Inspect the indices.
let indices = &inner.last_index_per_reader;
assert_eq!(indices.get(&main_token), Some(&0));
assert_eq!(indices.get(&other_token), Some(&0));
}
}
}
#[test]
fn test_updates_stream() {
use super::Update::*;
struct CounterWaker {
number_of_wakeup: Mutex<usize>,
}
impl Wake for CounterWaker {
fn wake(self: Arc<Self>) {
*self.number_of_wakeup.lock().unwrap() += 1;
}
}
let counter_waker = Arc::new(CounterWaker { number_of_wakeup: Mutex::new(0) });
let waker = counter_waker.clone().into();
let mut context = Context::from_waker(&waker);
let mut linked_chunk = LinkedChunk::<3, char, ()>::new_with_update_history();
let updates_subscriber = linked_chunk.updates().unwrap().subscribe();
pin_mut!(updates_subscriber);
// No update, stream is pending.
assert_matches!(updates_subscriber.as_mut().poll_next(&mut context), Poll::Pending);
assert_eq!(*counter_waker.number_of_wakeup.lock().unwrap(), 0);
// Let's generate an update.
linked_chunk.push_items_back(['a']);
// The waker must have been called.
assert_eq!(*counter_waker.number_of_wakeup.lock().unwrap(), 1);
// There is an update! Right after that, the stream is pending again.
assert_matches!(
updates_subscriber.as_mut().poll_next(&mut context),
Poll::Ready(Some(items)) => {
assert_eq!(
items,
&[InsertItems { at: Position(ChunkIdentifier(0), 0), items: vec!['a'] }]
);
}
);
assert_matches!(updates_subscriber.as_mut().poll_next(&mut context), Poll::Pending);
// Let's generate two other updates.
linked_chunk.push_items_back(['b']);
linked_chunk.push_items_back(['c']);
// The waker must have been called only once for the two updates.
assert_eq!(*counter_waker.number_of_wakeup.lock().unwrap(), 2);
// We can consume the updates without the stream, but the stream continues to
// know it has updates.
assert_eq!(
linked_chunk.updates().unwrap().take(),
&[
InsertItems { at: Position(ChunkIdentifier(0), 0), items: vec!['a'] },
InsertItems { at: Position(ChunkIdentifier(0), 1), items: vec!['b'] },
InsertItems { at: Position(ChunkIdentifier(0), 2), items: vec!['c'] },
]
);
assert_matches!(
updates_subscriber.as_mut().poll_next(&mut context),
Poll::Ready(Some(items)) => {
assert_eq!(
items,
&[
InsertItems { at: Position(ChunkIdentifier(0), 1), items: vec!['b'] },
InsertItems { at: Position(ChunkIdentifier(0), 2), items: vec!['c'] },
]
);
}
);
assert_matches!(updates_subscriber.as_mut().poll_next(&mut context), Poll::Pending);
// When dropping the `LinkedChunk`, it closes the stream.
drop(linked_chunk);
assert_matches!(updates_subscriber.as_mut().poll_next(&mut context), Poll::Ready(None));
// Wakers calls have not changed.
assert_eq!(*counter_waker.number_of_wakeup.lock().unwrap(), 2);
}
}

View File

@@ -17,8 +17,7 @@ use std::{fmt, iter::once};
use matrix_sdk_common::deserialized_responses::SyncTimelineEvent;
use super::linked_chunk::{
Chunk, ChunkIdentifier, LinkedChunk, LinkedChunkError, LinkedChunkIter,
LinkedChunkIterBackward, Position,
Chunk, ChunkIdentifier, Error, Iter, IterBackward, LinkedChunk, Position,
};
/// A newtype wrapper for a pagination token returned by a /messages response.
@@ -82,11 +81,7 @@ impl RoomEvents {
}
/// Insert events at a specified position.
pub fn insert_events_at<I>(
&mut self,
events: I,
position: Position,
) -> Result<(), LinkedChunkError>
pub fn insert_events_at<I>(&mut self, events: I, position: Position) -> Result<(), Error>
where
I: IntoIterator<Item = SyncTimelineEvent>,
I::IntoIter: ExactSizeIterator,
@@ -95,7 +90,7 @@ impl RoomEvents {
}
/// Insert a gap at a specified position.
pub fn insert_gap_at(&mut self, gap: Gap, position: Position) -> Result<(), LinkedChunkError> {
pub fn insert_gap_at(&mut self, gap: Gap, position: Position) -> Result<(), Error> {
self.chunks.insert_gap_at(gap, position)
}
@@ -110,7 +105,7 @@ impl RoomEvents {
&mut self,
events: I,
gap_identifier: ChunkIdentifier,
) -> Result<&Chunk<DEFAULT_CHUNK_CAPACITY, SyncTimelineEvent, Gap>, LinkedChunkError>
) -> Result<&Chunk<DEFAULT_CHUNK_CAPACITY, SyncTimelineEvent, Gap>, Error>
where
I: IntoIterator<Item = SyncTimelineEvent>,
I::IntoIter: ExactSizeIterator,
@@ -137,16 +132,14 @@ impl RoomEvents {
/// Iterate over the chunks, backward.
///
/// The most recent chunk comes first.
pub fn rchunks(
&self,
) -> LinkedChunkIterBackward<'_, DEFAULT_CHUNK_CAPACITY, SyncTimelineEvent, Gap> {
pub fn rchunks(&self) -> IterBackward<'_, DEFAULT_CHUNK_CAPACITY, SyncTimelineEvent, Gap> {
self.chunks.rchunks()
}
/// Iterate over the chunks, forward.
///
/// The oldest chunk comes first.
pub fn chunks(&self) -> LinkedChunkIter<'_, DEFAULT_CHUNK_CAPACITY, SyncTimelineEvent, Gap> {
pub fn chunks(&self) -> Iter<'_, DEFAULT_CHUNK_CAPACITY, SyncTimelineEvent, Gap> {
self.chunks.chunks()
}
@@ -154,10 +147,7 @@ impl RoomEvents {
pub fn rchunks_from(
&self,
identifier: ChunkIdentifier,
) -> Result<
LinkedChunkIterBackward<'_, DEFAULT_CHUNK_CAPACITY, SyncTimelineEvent, Gap>,
LinkedChunkError,
> {
) -> Result<IterBackward<'_, DEFAULT_CHUNK_CAPACITY, SyncTimelineEvent, Gap>, Error> {
self.chunks.rchunks_from(identifier)
}
@@ -166,8 +156,7 @@ impl RoomEvents {
pub fn chunks_from(
&self,
identifier: ChunkIdentifier,
) -> Result<LinkedChunkIter<'_, DEFAULT_CHUNK_CAPACITY, SyncTimelineEvent, Gap>, LinkedChunkError>
{
) -> Result<Iter<'_, DEFAULT_CHUNK_CAPACITY, SyncTimelineEvent, Gap>, Error> {
self.chunks.chunks_from(identifier)
}
@@ -189,7 +178,7 @@ impl RoomEvents {
pub fn revents_from(
&self,
position: Position,
) -> Result<impl Iterator<Item = (Position, &SyncTimelineEvent)>, LinkedChunkError> {
) -> Result<impl Iterator<Item = (Position, &SyncTimelineEvent)>, Error> {
self.chunks.ritems_from(position)
}
@@ -198,7 +187,7 @@ impl RoomEvents {
pub fn events_from(
&self,
position: Position,
) -> Result<impl Iterator<Item = (Position, &SyncTimelineEvent)>, LinkedChunkError> {
) -> Result<impl Iterator<Item = (Position, &SyncTimelineEvent)>, Error> {
self.chunks.items_from(position)
}
}