feat(sdk): Allow LinkedChunkUpdatesInner to have multiple readers.

This patch removes the notion of `take` vs. `peek` from
`LinkedChunkUpdatesInner` and widens the problem to a more general
approach: `LinkedChunkUpdatesInner` must support multiple readers, not
only two (`take` was the first reader, `peek` was the second reader,
kind of).

Why do we need multiple readers? `LinkedChunkUpdates::take` is
clearly the first reader, it's part of the public API. But the private
API needs to read the updates too, without consuming them, like
`LinkedChunkUpdatesSubscriber`. `peek` was nice for that, but it's
possible to have multiple `LinkedChunkUpdatesSubscriber` at the same
time! Hence the need to widen the approach from 2 readers to many
readers.

This patch introduces a `ReaderToken` to identify readers. The last
indexes are now all stored in a `HashMap<ReaderToken, usize>`. The rest
of the modifications are the consequence of that.

The test `test_updates_take_and_peek` has been entirely rewritten to be
`test_updates_take_and_garbage_collector` where it tests 2 readers and
see how the garbage collector reacts to that.
This commit is contained in:
Ivan Enderlin
2024-05-08 12:29:14 +02:00
parent 73ae1cc6da
commit 9531a4041e

View File

@@ -15,7 +15,7 @@
#![allow(dead_code)]
use std::{
cmp::min,
collections::HashMap,
fmt,
marker::PhantomData,
ops::Not,
@@ -139,21 +139,64 @@ pub struct LinkedChunkUpdates<Item, Gap> {
inner: Arc<RwLock<LinkedChunkUpdatesInner<Item, Gap>>>,
}
/// A token used to represent readers that read the updates in
/// [`LinkedChunkUpdatesInner`].
type ReaderToken = usize;
/// Inner type for [`LinkedChunkUpdates`].
///
/// The particularity of this type is that multiple readers can read the
/// updates. A reader has a [`ReaderToken`]. The public API (i.e.
/// [`LinkedChunkUpdates`]) is considered to be the _main reader_ (it has the
/// token [`Self::MAIN_READER_TOKEN`]).
///
/// An update that have been read by all readers are garbage collected to be
/// removed from the memory. An update will never be read twice by the same
/// reader.
///
/// Why do we need multiple readers? The public API reads the updates with
/// [`LinkedChunkUpdates::take`], but the private API must also read the updates
/// for example with [`LinkedChunkUpdatesSubscriber`]. Of course, they can be
/// multiple `LinkedChunkUpdatesSubscriber`s at the same time. Hence the need of
/// supporting multiple readers.
struct LinkedChunkUpdatesInner<Item, Gap> {
/// All the updates that have not been peeked nor taken.
/// All the updates that have not been read by all readers.
updates: Vec<LinkedChunkUpdate<Item, Gap>>,
/// The last index used by the last call of [`Self::take`].
last_taken_index: usize,
/// Updates are stored in [`Self::updates`]. Multiple readers can read them.
/// A reader is identified by a [`ReaderToken`].
///
/// To each reader token is associated an index that represents the index of
/// the last reading. It is used to never return the same update twice.
last_index_per_reader: HashMap<ReaderToken, usize>,
/// The last index used by the last call of [`Self::peek`].
last_peeked_index: usize,
/// The last generated token. This is useful to generate new token.
last_token: ReaderToken,
/// Pending wakers for [`LinkedChunkUpdateSubscriber`]s.
/// Pending wakers for [`LinkedChunkUpdateSubscriber`]s. A waker is removed
/// everytime it is called.
wakers: Vec<Waker>,
}
impl<Item, Gap> LinkedChunkUpdatesInner<Item, Gap> {
/// The token used by the main reader. See [`Self::take`] to learn more.
const MAIN_READER_TOKEN: ReaderToken = 0;
/// Create a new [`Self`].
fn new() -> Self {
Self {
updates: Vec::with_capacity(8),
last_index_per_reader: {
let mut map = HashMap::with_capacity(2);
map.insert(Self::MAIN_READER_TOKEN, 0);
map
},
last_token: Self::MAIN_READER_TOKEN,
wakers: Vec::with_capacity(2),
}
}
/// Push a new update.
fn push(&mut self, update: LinkedChunkUpdate<Item, Gap>) {
self.updates.push(update);
@@ -164,65 +207,60 @@ impl<Item, Gap> LinkedChunkUpdatesInner<Item, Gap> {
}
}
/// Take new updates.
/// Take new updates; it considers the caller is the main reader, i.e. it
/// will use the [`Self::MAIN_READER_TOKEN`].
///
/// Updates that have been taken will not be read again.
/// Updates that have been read will never be read again by the current
/// reader.
///
/// Learn more by reading [`Self::take_with_token`].
fn take(&mut self) -> &[LinkedChunkUpdate<Item, Gap>] {
self.take_with_token(Self::MAIN_READER_TOKEN)
}
/// Take new updates with a particular reader token.
///
/// Updates are stored in [`Self::updates`]. Multiple readers can read them.
/// A reader is identified by a [`ReaderToken`]. Every reader can
/// take/read/consume each update only once. An internal index is stored
/// per reader token to know where to start reading updates next time this
/// method is called.
fn take_with_token(&mut self, token: ReaderToken) -> &[LinkedChunkUpdate<Item, Gap>] {
// Let's garbage collect unused updates.
self.garbage_collect();
// Read new updates and update the `Self::last_taken_index`.
let slice = &self.updates[self.last_taken_index..];
self.last_taken_index = self.updates.len();
let index = self
.last_index_per_reader
.get_mut(&token)
.expect("Given `UpdatesToken` does not map to any index");
// Read new updates, and update the index.
let slice = &self.updates[*index..];
*index = self.updates.len();
slice
}
/// Peek updates.
///
/// That's another channel to read the updates. The difference with
/// [`Self::take`] is that peeking updates doesn't consume them. However,
/// updates that have been peeked cannot be peeked again.
///
/// Basically, [`Self::take`] is intended to be used by the public API,
/// whilst [`Self::peek`] is for internal usage that must not conflict with
/// [`Self::take`].
fn peek(&mut self) -> &[LinkedChunkUpdate<Item, Gap>] {
// Let's garbage collect unused updates.
self.garbage_collect();
// Read new updates and update the `Self::last_peeked_index`.
let slice = &self.updates[self.last_peeked_index..];
self.last_peeked_index = self.updates.len();
slice
}
/// Return `true` if there is new update that can be read with
/// [`Self::peek`].
fn has_new_peekable_updates(&self) -> bool {
self.last_peeked_index < self.updates.len()
}
/// Return the number of updates in the buffer.
fn len(&self) -> usize {
self.updates.len()
}
/// Garbage collect unused updates. An update is considered unused when it's
/// been read by `Self::take` **and** by `Self::peek`.
/// been read by all readers.
///
/// Find the smallest index between `Self::last_taken_index` and
/// `Self::last_peeked_index`, and clear from 0 to that index.
/// Basically, it reduces to finding the smallest last index for all
/// readers, and clear from 0 to that index.
fn garbage_collect(&mut self) {
let min_index = min(self.last_taken_index, self.last_peeked_index);
let min_index = self.last_index_per_reader.values().min().map(|min| *min).unwrap_or(0);
if min_index > 0 {
let _ = self.updates.drain(0..min_index);
// Let's shift the index to the left by `min_index` to preserve them.
self.last_taken_index -= min_index;
self.last_peeked_index -= min_index;
// Let's shift the indices to the left by `min_index` to preserve them.
for index in self.last_index_per_reader.values_mut() {
*index -= min_index;
}
}
}
}
@@ -230,14 +268,7 @@ impl<Item, Gap> LinkedChunkUpdatesInner<Item, Gap> {
impl<Item, Gap> LinkedChunkUpdates<Item, Gap> {
/// Create a new [`Self`].
fn new() -> Self {
Self {
inner: Arc::new(RwLock::new(LinkedChunkUpdatesInner {
updates: Vec::new(),
last_taken_index: 0,
last_peeked_index: 0,
wakers: Vec::new(),
})),
}
Self { inner: Arc::new(RwLock::new(LinkedChunkUpdatesInner::new())) }
}
/// Push a new update.
@@ -256,21 +287,20 @@ impl<Item, Gap> LinkedChunkUpdates<Item, Gap> {
self.inner.write().unwrap().take().to_owned()
}
/// Return `true` if there is new updates that can be read with
/// [`Self::take`].
pub fn has_new_takable_updates(&self) -> bool {
let inner = self.inner.read().unwrap();
inner.last_taken_index < inner.updates.len()
}
/// Subscribe to updates by using a [`Stream`].
///
/// TODO: only one subscriber must exist so far because multiple concurrent
/// subscriber would conflict on the garbage collector. It's not complex to
/// fix, I will do it.
fn subscribe(&self) -> LinkedChunkUpdatesSubscriber<Item, Gap> {
LinkedChunkUpdatesSubscriber { updates: Arc::downgrade(&self.inner) }
fn subscribe(&mut self) -> LinkedChunkUpdatesSubscriber<Item, Gap> {
// A subscriber is a new update reader, it needs its own token.
let token = {
let mut inner = self.inner.write().unwrap();
inner.last_token += 1;
let last_token = inner.last_token;
inner.last_index_per_reader.insert(last_token, 0);
last_token
};
LinkedChunkUpdatesSubscriber { updates: Arc::downgrade(&self.inner), token }
}
}
@@ -282,6 +312,9 @@ struct LinkedChunkUpdatesSubscriber<Item, Gap> {
/// Using a weak reference allows [`LinkedChunkUpdates`] to be dropped
/// freely even if a subscriber exists.
updates: Weak<RwLock<LinkedChunkUpdatesInner<Item, Gap>>>,
/// The token to read the updates.
token: ReaderToken,
}
impl<Item, Gap> Stream for LinkedChunkUpdatesSubscriber<Item, Gap>
@@ -298,9 +331,10 @@ where
};
let mut updates = updates.write().unwrap();
let the_updates = updates.take_with_token(self.token);
// No updates to peek.
if updates.has_new_peekable_updates().not() {
// No updates.
if the_updates.is_empty() {
// Let's register the waker.
updates.wakers.push(context.waker().clone());
@@ -308,8 +342,8 @@ where
return Poll::Pending;
}
// There is updates to peek! Let's forward them in this stream.
return Poll::Ready(Some(updates.peek().to_owned()));
// There is updates! Let's forward them in this stream.
Poll::Ready(Some(the_updates.to_owned()))
}
}
@@ -1405,8 +1439,9 @@ mod tests {
use super::{
Chunk, ChunkContent, ChunkIdentifier, ChunkIdentifierGenerator, LinkedChunk,
LinkedChunkError, Not, Position, Stream,
LinkedChunkError, Position, Stream,
};
use crate::event_cache::linked_chunk::LinkedChunkUpdatesInner;
/// A macro to test the items and the gap of a `LinkedChunk`.
/// A chunk is delimited by `[` and `]`. An item chunk has the form `[a, b,
@@ -1515,138 +1550,230 @@ mod tests {
}
#[test]
fn test_updates_take_and_peek() {
fn test_updates_take_and_garbage_collector() {
use super::LinkedChunkUpdate::*;
let mut linked_chunk = LinkedChunk::<3, char, ()>::new_with_update_history();
let mut linked_chunk = LinkedChunk::<10, char, ()>::new_with_update_history();
// There is no new update.
// Simulate another updates “reader”, it can a subscriber.
let main_token = LinkedChunkUpdatesInner::<char, ()>::MAIN_READER_TOKEN;
let other_token = {
let updates = linked_chunk.updates().unwrap();
let mut inner = updates.inner.write().unwrap();
inner.last_token += 1;
let other_token = inner.last_token;
inner.last_index_per_reader.insert(other_token, 0);
other_token
};
// There is no new update yet.
{
let updates = linked_chunk.updates().unwrap();
assert!(updates.inner.read().unwrap().has_new_peekable_updates().not());
assert!(updates.has_new_takable_updates().not());
assert!(updates.inner.write().unwrap().peek().is_empty());
assert!(updates.take().is_empty());
assert!(updates.inner.write().unwrap().take_with_token(other_token).is_empty());
}
linked_chunk.push_items_back(['a']);
// Let's `peek` only the new update.
{
let updates = linked_chunk.updates().unwrap();
{
// Inspect number of updates in memory.
assert_eq!(updates.inner.read().unwrap().len(), 1);
}
// Peek the update.
assert!(updates.has_new_takable_updates());
assert!(updates.inner.read().unwrap().has_new_peekable_updates());
assert_eq!(
updates.inner.write().unwrap().peek(),
&[InsertItems { at: Position(ChunkIdentifier(0), 0), items: vec!['a'] }]
);
// No more update to peek.
assert!(updates.has_new_takable_updates());
assert!(updates.inner.read().unwrap().has_new_peekable_updates().not());
assert!(updates.inner.write().unwrap().peek().is_empty());
{
// Inspect number of updates in memory.
assert_eq!(updates.inner.read().unwrap().len(), 1);
}
}
linked_chunk.push_items_back(['b']);
linked_chunk.push_items_back(['c']);
// Let's `peek` then `take` the new update.
// Scenario 1: “main” takes the new updates, “other” doesn't take the new
// updates.
//
// 0 1 2 3
// +---+---+---+
// | a | b | c |
// +---+---+---+
//
// “main” will move its index from 0 to 3.
// “other” won't move its index.
{
let updates = linked_chunk.updates().unwrap();
// Inspect number of updates in memory.
assert_eq!(updates.inner.read().unwrap().len(), 2);
// Peek the update…
assert!(updates.has_new_takable_updates());
assert!(updates.inner.read().unwrap().has_new_peekable_updates());
assert_eq!(
updates.inner.write().unwrap().peek(),
&[InsertItems { at: Position(ChunkIdentifier(0), 1), items: vec!['b'] },]
);
{
// Inspect number of updates in memory.
assert_eq!(updates.inner.read().unwrap().len(), 2);
assert_eq!(updates.inner.read().unwrap().len(), 3);
}
// … and take the update.
assert_eq!(
updates.take(),
&[
InsertItems { at: Position(ChunkIdentifier(0), 0), items: vec!['a'] },
InsertItems { at: Position(ChunkIdentifier(0), 1), items: vec!['b'] },
InsertItems { at: Position(ChunkIdentifier(0), 2), items: vec!['c'] },
]
);
{
// Inspect number of updates in memory.
assert_eq!(updates.inner.read().unwrap().len(), 2);
}
let inner = updates.inner.read().unwrap();
// No more update to peek or to take.
assert!(updates.has_new_takable_updates().not());
assert!(updates.inner.read().unwrap().has_new_peekable_updates().not());
assert!(updates.inner.write().unwrap().peek().is_empty());
assert!(updates.take().is_empty());
{
// Inspect number of updates in memory.
// The updates have been garbage collected.
assert_eq!(updates.inner.read().unwrap().len(), 0);
// It must be the same number as before as the garbage collector weren't not
// able to remove any unused updates.
assert_eq!(inner.len(), 3);
// Inspect the indices.
let indices = &inner.last_index_per_reader;
assert_eq!(indices.get(&main_token), Some(&3));
assert_eq!(indices.get(&other_token), Some(&0));
}
}
linked_chunk.push_items_back(['c']);
linked_chunk.push_items_back(['d']);
linked_chunk.push_items_back(['e']);
linked_chunk.push_items_back(['f']);
// Let's `take` then `peek` the new update.
// Scenario 2: “other“ takes the new updates, “main” doesn't take the
// new updates.
//
// 0 1 2 3 4 5 6
// +---+---+---+---+---+---+
// | a | b | c | d | e | f |
// +---+---+---+---+---+---+
//
// “main” won't move its index.
// “other” will move its index from 0 to 6.
{
let updates = linked_chunk.updates().unwrap();
{
// Inspect number of updates in memory.
assert_eq!(updates.inner.read().unwrap().len(), 1);
}
assert_eq!(
updates.inner.write().unwrap().take_with_token(other_token),
&[
InsertItems { at: Position(ChunkIdentifier(0), 0), items: vec!['a'] },
InsertItems { at: Position(ChunkIdentifier(0), 1), items: vec!['b'] },
InsertItems { at: Position(ChunkIdentifier(0), 2), items: vec!['c'] },
InsertItems { at: Position(ChunkIdentifier(0), 3), items: vec!['d'] },
InsertItems { at: Position(ChunkIdentifier(0), 4), items: vec!['e'] },
InsertItems { at: Position(ChunkIdentifier(0), 5), items: vec!['f'] },
]
);
{
let inner = updates.inner.read().unwrap();
// Inspect number of updates in memory.
// It must be the same number as before as the garbage collector will be able to
// remove unused updates but at the next call…
assert_eq!(inner.len(), 6);
// Inspect the indices.
let indices = &inner.last_index_per_reader;
assert_eq!(indices.get(&main_token), Some(&3));
assert_eq!(indices.get(&other_token), Some(&6));
}
}
// Scenario 3: “other” take new updates, but there is none, “main”
// doesn't take new updates. The garbage collector will run and collect
// unused updates.
//
// 0 1 2 3
// +---+---+---+
// | d | e | f |
// +---+---+---+
//
// “main” will have its index updated from 3 to 0.
// “other” will have its index updated from 6 to 3.
{
let updates = linked_chunk.updates().unwrap();
assert!(updates.inner.write().unwrap().take_with_token(other_token).is_empty());
{
let inner = updates.inner.read().unwrap();
// Inspect number of updates in memory.
// The garbage collector has removed unused updates.
assert_eq!(inner.len(), 3);
// Inspect the indices. They must have been adjusted.
let indices = &inner.last_index_per_reader;
assert_eq!(indices.get(&main_token), Some(&0));
assert_eq!(indices.get(&other_token), Some(&3));
}
}
linked_chunk.push_items_back(['g']);
linked_chunk.push_items_back(['h']);
linked_chunk.push_items_back(['i']);
// Scenario 4: both “main” and “other” take the new updates.
//
// 0 1 2 3 4 5 6
// +---+---+---+---+---+---+
// | d | e | f | g | h | i |
// +---+---+---+---+---+---+
//
// “main” will have its index updated from 3 to 0.
// “other” will have its index updated from 6 to 3.
{
let updates = linked_chunk.updates().unwrap();
// Take and peek the update.
assert!(updates.has_new_takable_updates());
assert!(updates.inner.read().unwrap().has_new_peekable_updates());
assert_eq!(
updates.take(),
&[InsertItems { at: Position(ChunkIdentifier(0), 2), items: vec!['c'] },]
&[
InsertItems { at: Position(ChunkIdentifier(0), 3), items: vec!['d'] },
InsertItems { at: Position(ChunkIdentifier(0), 4), items: vec!['e'] },
InsertItems { at: Position(ChunkIdentifier(0), 5), items: vec!['f'] },
InsertItems { at: Position(ChunkIdentifier(0), 6), items: vec!['g'] },
InsertItems { at: Position(ChunkIdentifier(0), 7), items: vec!['h'] },
InsertItems { at: Position(ChunkIdentifier(0), 8), items: vec!['i'] },
]
);
assert_eq!(
updates.inner.write().unwrap().peek(),
&[InsertItems { at: Position(ChunkIdentifier(0), 2), items: vec!['c'] },]
updates.inner.write().unwrap().take_with_token(other_token),
&[
InsertItems { at: Position(ChunkIdentifier(0), 6), items: vec!['g'] },
InsertItems { at: Position(ChunkIdentifier(0), 7), items: vec!['h'] },
InsertItems { at: Position(ChunkIdentifier(0), 8), items: vec!['i'] },
]
);
{
// Inspect number of updates in memory.
assert_eq!(updates.inner.read().unwrap().len(), 1);
}
let inner = updates.inner.read().unwrap();
// Inspect number of updates in memory.
// The garbage collector had a chance to collect the first 3 updates.
assert_eq!(inner.len(), 3);
// Inspect the indices.
let indices = &inner.last_index_per_reader;
assert_eq!(indices.get(&main_token), Some(&3));
assert_eq!(indices.get(&other_token), Some(&3));
}
}
// Scenario 5: no more updates but they both try to take new updates.
// The garbage collector will collect all updates as all of them as
// been read already.
//
// “main” will have its index updated from 0 to 0.
// “other” will have its index updated from 3 to 0.
{
let updates = linked_chunk.updates().unwrap();
// No more update to peek or to take.
assert!(updates.has_new_takable_updates().not());
assert!(updates.inner.read().unwrap().has_new_peekable_updates().not());
assert!(updates.inner.write().unwrap().peek().is_empty());
assert!(updates.take().is_empty());
assert!(updates.inner.write().unwrap().take_with_token(other_token).is_empty());
{
let inner = updates.inner.read().unwrap();
// Inspect number of updates in memory.
// The update has been garbage collected.
assert_eq!(updates.inner.read().unwrap().len(), 0);
// The garbage collector had a chance to collect all updates.
assert_eq!(inner.len(), 0);
// Inspect the indices.
let indices = &inner.last_index_per_reader;
assert_eq!(indices.get(&main_token), Some(&0));
assert_eq!(indices.get(&other_token), Some(&0));
}
}
}