mirror of
https://github.com/matrix-org/matrix-rust-sdk.git
synced 2026-05-07 23:44:53 -04:00
feat(base): Create an ObservableMap for wasm32-unknown-unknown.
This patch creates an `ObservableMap` for `wasm32-unknown-unknown` which simply wraps a `BTreeMap`, and without the `stream` method. Indeed, the first implementation uses `eyeball_im::ObservableVector` which requires `Send` and `Sync` on its values, which cannot compile to Wasm.
This commit is contained in:
@@ -21,7 +21,9 @@ use std::{
|
||||
use std::{ops::Deref, sync::Arc};
|
||||
|
||||
use eyeball::{SharedObservable, Subscriber};
|
||||
#[cfg(not(target_arch = "wasm32"))]
|
||||
use eyeball_im::{Vector, VectorDiff};
|
||||
#[cfg(not(target_arch = "wasm32"))]
|
||||
use futures_util::Stream;
|
||||
use matrix_sdk_common::instant::Instant;
|
||||
#[cfg(feature = "e2e-encryption")]
|
||||
@@ -173,6 +175,7 @@ impl BaseClient {
|
||||
|
||||
/// Get a stream of all the rooms changes, in addition to the existing
|
||||
/// rooms.
|
||||
#[cfg(not(target_arch = "wasm32"))]
|
||||
pub fn rooms_stream(&self) -> (Vector<Room>, impl Stream<Item = Vec<VectorDiff<Room>>>) {
|
||||
self.store.rooms_stream()
|
||||
}
|
||||
|
||||
@@ -21,17 +21,17 @@
|
||||
//! store.
|
||||
|
||||
use std::{
|
||||
borrow::Borrow,
|
||||
collections::{BTreeMap, BTreeSet, HashMap},
|
||||
collections::{BTreeMap, BTreeSet},
|
||||
fmt,
|
||||
hash::Hash,
|
||||
ops::Deref,
|
||||
result::Result as StdResult,
|
||||
str::Utf8Error,
|
||||
sync::{Arc, RwLock as StdRwLock},
|
||||
};
|
||||
|
||||
use eyeball_im::{ObservableVector, Vector, VectorDiff};
|
||||
#[cfg(not(target_arch = "wasm32"))]
|
||||
use eyeball_im::{Vector, VectorDiff};
|
||||
#[cfg(not(target_arch = "wasm32"))]
|
||||
use futures_util::Stream;
|
||||
use once_cell::sync::OnceCell;
|
||||
|
||||
@@ -226,6 +226,7 @@ impl Store {
|
||||
}
|
||||
|
||||
/// Get a stream of all the rooms, in addition to the existing rooms.
|
||||
#[cfg(not(target_arch = "wasm32"))]
|
||||
pub fn rooms_stream(&self) -> (Vector<Room>, impl Stream<Item = Vec<VectorDiff<Room>>>) {
|
||||
self.rooms.read().unwrap().stream()
|
||||
}
|
||||
@@ -276,115 +277,184 @@ impl Deref for Store {
|
||||
}
|
||||
}
|
||||
|
||||
/// An observable map.
|
||||
///
|
||||
/// This is an “observable map” naive implementation. Just like regular hashmap,
|
||||
/// we have a redirection from a key to a position, and from a position to
|
||||
/// a value. The (key, position) tuples are stored in an [`HashMap`]. The
|
||||
/// (position, value) tuples are stored in an [`ObservableVector`]. The (key,
|
||||
/// position) tuple is only provided for fast _reading_ implementations, like
|
||||
/// `Self::get` and `Self::get_or_create`. The (position, value) tuples are
|
||||
/// observable, this is what interests us the most here.
|
||||
///
|
||||
/// Why not implementing a new `ObservableMap` type in `eyeball-im` instead of
|
||||
/// this custom implementation? Because we want to continue providing
|
||||
/// `VectorDiff` when observing the changes, so that the rest of the API in the
|
||||
/// Matrix Rust SDK aren't broken. Indeed, an `ObservableMap` must produce
|
||||
/// `MapDiff`, which would be quite different.
|
||||
/// Plus, we would like to re-use all our existing code, test, stream adapters
|
||||
/// and so on.
|
||||
///
|
||||
/// This is a trade-off. And this implementation is simple enough for the
|
||||
/// moment, and basically does the job.
|
||||
#[derive(Debug)]
|
||||
struct ObservableMap<K, V>
|
||||
where
|
||||
V: Clone + Send + Sync + 'static,
|
||||
{
|
||||
/// The (key, position) tuples.
|
||||
mapping: HashMap<K, usize>,
|
||||
#[cfg(not(target_arch = "wasm32"))]
|
||||
mod observable_map {
|
||||
use std::{borrow::Borrow, collections::HashMap, hash::Hash};
|
||||
|
||||
/// The values where the indices are the `position` part of `Self::mapping`.
|
||||
values: ObservableVector<V>,
|
||||
}
|
||||
use eyeball_im::{ObservableVector, Vector, VectorDiff};
|
||||
use futures_util::Stream;
|
||||
|
||||
impl<K, V> ObservableMap<K, V>
|
||||
where
|
||||
K: Hash + Eq,
|
||||
V: Clone + Send + Sync + 'static,
|
||||
{
|
||||
/// Create a new `Self`.
|
||||
fn new() -> Self {
|
||||
Self { mapping: HashMap::new(), values: ObservableVector::new() }
|
||||
/// An observable map.
|
||||
///
|
||||
/// This is an “observable map” naive implementation. Just like regular
|
||||
/// hashmap, we have a redirection from a key to a position, and from a
|
||||
/// position to a value. The (key, position) tuples are stored in an
|
||||
/// [`HashMap`]. The (position, value) tuples are stored in an
|
||||
/// [`ObservableVector`]. The (key, position) tuple is only provided for
|
||||
/// fast _reading_ implementations, like `Self::get` and
|
||||
/// `Self::get_or_create`. The (position, value) tuples are observable,
|
||||
/// this is what interests us the most here.
|
||||
///
|
||||
/// Why not implementing a new `ObservableMap` type in `eyeball-im` instead
|
||||
/// of this custom implementation? Because we want to continue providing
|
||||
/// `VectorDiff` when observing the changes, so that the rest of the API in
|
||||
/// the Matrix Rust SDK aren't broken. Indeed, an `ObservableMap` must
|
||||
/// produce `MapDiff`, which would be quite different.
|
||||
/// Plus, we would like to re-use all our existing code, test, stream
|
||||
/// adapters and so on.
|
||||
///
|
||||
/// This is a trade-off. This implementation is simple enough for the
|
||||
/// moment, and basically does the job.
|
||||
#[derive(Debug)]
|
||||
pub(super) struct ObservableMap<K, V>
|
||||
where
|
||||
V: Clone + Send + Sync + 'static,
|
||||
{
|
||||
/// The (key, position) tuples.
|
||||
mapping: HashMap<K, usize>,
|
||||
|
||||
/// The values where the indices are the `position` part of
|
||||
/// `Self::mapping`.
|
||||
values: ObservableVector<V>,
|
||||
}
|
||||
|
||||
/// Insert a new `V` in the collection.
|
||||
///
|
||||
/// The position of the inserted value is returned.
|
||||
///
|
||||
/// If the `V` value already exists, it will be updated to the new one.
|
||||
fn insert(&mut self, key: K, value: V) -> usize {
|
||||
match self.mapping.get(&key) {
|
||||
Some(position) => {
|
||||
self.values.set(*position, value);
|
||||
impl<K, V> ObservableMap<K, V>
|
||||
where
|
||||
K: Hash + Eq,
|
||||
V: Clone + Send + Sync + 'static,
|
||||
{
|
||||
/// Create a new `Self`.
|
||||
pub(super) fn new() -> Self {
|
||||
Self { mapping: HashMap::new(), values: ObservableVector::new() }
|
||||
}
|
||||
|
||||
*position
|
||||
}
|
||||
None => {
|
||||
let position = self.values.len();
|
||||
/// Insert a new `V` in the collection.
|
||||
///
|
||||
/// If the `V` value already exists, it will be updated to the new one.
|
||||
pub(super) fn insert(&mut self, key: K, value: V) -> usize {
|
||||
match self.mapping.get(&key) {
|
||||
Some(position) => {
|
||||
self.values.set(*position, value);
|
||||
|
||||
self.values.push_back(value);
|
||||
self.mapping.insert(key, position);
|
||||
*position
|
||||
}
|
||||
None => {
|
||||
let position = self.values.len();
|
||||
|
||||
position
|
||||
self.values.push_back(value);
|
||||
self.mapping.insert(key, position);
|
||||
|
||||
position
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Reading one `V` value based on their ID, if it exists.
|
||||
fn get<L>(&self, key: &L) -> Option<&V>
|
||||
where
|
||||
K: Borrow<L>,
|
||||
L: Hash + Eq + ?Sized,
|
||||
{
|
||||
self.mapping.get(key).and_then(|position| self.values.get(*position))
|
||||
}
|
||||
/// Reading one `V` value based on their ID, if it exists.
|
||||
pub(super) fn get<L>(&self, key: &L) -> Option<&V>
|
||||
where
|
||||
K: Borrow<L>,
|
||||
L: Hash + Eq + ?Sized,
|
||||
{
|
||||
self.mapping.get(key).and_then(|position| self.values.get(*position))
|
||||
}
|
||||
|
||||
/// Reading one `V` value based on their ID, or create a new one (by using
|
||||
/// `default`).
|
||||
fn get_or_create<L, F>(&mut self, key: &L, default: F) -> &V
|
||||
where
|
||||
K: Borrow<L>,
|
||||
L: Hash + Eq + ?Sized + ToOwned<Owned = K>,
|
||||
F: FnOnce() -> V,
|
||||
{
|
||||
let position = match self.mapping.get(key) {
|
||||
Some(position) => *position,
|
||||
None => {
|
||||
let value = default();
|
||||
self.insert(key.to_owned(), value)
|
||||
}
|
||||
};
|
||||
/// Reading one `V` value based on their ID, or create a new one (by
|
||||
/// using `default`).
|
||||
pub(super) fn get_or_create<L, F>(&mut self, key: &L, default: F) -> &V
|
||||
where
|
||||
K: Borrow<L>,
|
||||
L: Hash + Eq + ?Sized + ToOwned<Owned = K>,
|
||||
F: FnOnce() -> V,
|
||||
{
|
||||
let position = match self.mapping.get(key) {
|
||||
Some(position) => *position,
|
||||
None => {
|
||||
let value = default();
|
||||
self.insert(key.to_owned(), value)
|
||||
}
|
||||
};
|
||||
|
||||
self.values
|
||||
.get(position)
|
||||
.expect("Value should be present or has just been inserted, but it's missing")
|
||||
}
|
||||
self.values
|
||||
.get(position)
|
||||
.expect("Value should be present or has just been inserted, but it's missing")
|
||||
}
|
||||
|
||||
/// Return an iterator over the existing values.
|
||||
fn iter(&self) -> impl Iterator<Item = &V> {
|
||||
self.values.iter()
|
||||
}
|
||||
/// Return an iterator over the existing values.
|
||||
pub(super) fn iter(&self) -> impl Iterator<Item = &V> {
|
||||
self.values.iter()
|
||||
}
|
||||
|
||||
/// Get a [`Stream`] of it.
|
||||
fn stream(&self) -> (Vector<V>, impl Stream<Item = Vec<VectorDiff<V>>>) {
|
||||
self.values.subscribe().into_values_and_batched_stream()
|
||||
/// Get a [`Stream`] of it.
|
||||
pub(super) fn stream(&self) -> (Vector<V>, impl Stream<Item = Vec<VectorDiff<V>>>) {
|
||||
self.values.subscribe().into_values_and_batched_stream()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(target_arch = "wasm32")]
|
||||
mod observable_map_wasm32 {
|
||||
use std::{borrow::Borrow, collections::BTreeMap, hash::Hash};
|
||||
|
||||
/// An observable map for Wasm. It's a simple wrapper around `BTreeMap`.
|
||||
#[derive(Debug)]
|
||||
pub(super) struct ObservableMap<K, V>(BTreeMap<K, V>)
|
||||
where
|
||||
V: Clone + 'static;
|
||||
|
||||
impl<K, V> ObservableMap<K, V>
|
||||
where
|
||||
K: Hash + Eq + Ord,
|
||||
V: Clone + 'static,
|
||||
{
|
||||
/// Create a new `Self`.
|
||||
pub(super) fn new() -> Self {
|
||||
Self(BTreeMap::new())
|
||||
}
|
||||
|
||||
/// Insert a new `V` in the collection.
|
||||
///
|
||||
/// If the `V` value already exists, it will be updated to the new one.
|
||||
pub(super) fn insert(&mut self, key: K, value: V) {
|
||||
self.0.insert(key, value);
|
||||
}
|
||||
|
||||
/// Reading one `V` value based on their ID, if it exists.
|
||||
pub(super) fn get<L>(&self, key: &L) -> Option<&V>
|
||||
where
|
||||
K: Borrow<L>,
|
||||
L: Hash + Eq + Ord + ?Sized,
|
||||
{
|
||||
self.0.get(key)
|
||||
}
|
||||
|
||||
/// Reading one `V` value based on their ID, or create a new one (by
|
||||
/// using `default`).
|
||||
pub(super) fn get_or_create<L, F>(&mut self, key: &L, default: F) -> &V
|
||||
where
|
||||
K: Borrow<L>,
|
||||
L: Hash + Eq + ?Sized + ToOwned<Owned = K>,
|
||||
F: FnOnce() -> V,
|
||||
{
|
||||
self.0.entry(key.to_owned()).or_insert_with(default)
|
||||
}
|
||||
|
||||
/// Return an iterator over the existing values.
|
||||
pub(super) fn iter(&self) -> impl Iterator<Item = &V> {
|
||||
self.0.values()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(not(target_arch = "wasm32"))]
|
||||
use observable_map::ObservableMap;
|
||||
#[cfg(target_arch = "wasm32")]
|
||||
use observable_map_wasm32::ObservableMap;
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests_observable_map {
|
||||
#[cfg(not(target_arch = "wasm32"))]
|
||||
use eyeball_im::VectorDiff;
|
||||
#[cfg(not(target_arch = "wasm32"))]
|
||||
use stream_assert::{assert_closed, assert_next_eq, assert_pending};
|
||||
|
||||
use super::ObservableMap;
|
||||
@@ -398,22 +468,22 @@ mod tests_observable_map {
|
||||
assert!(map.get(&'c').is_none());
|
||||
|
||||
// new items
|
||||
assert_eq!(map.insert('a', 'e'), 0);
|
||||
assert_eq!(map.insert('b', 'f'), 1);
|
||||
map.insert('a', 'e');
|
||||
map.insert('b', 'f');
|
||||
|
||||
assert_eq!(map.get(&'a'), Some(&'e'));
|
||||
assert_eq!(map.get(&'b'), Some(&'f'));
|
||||
assert!(map.get(&'c').is_none());
|
||||
|
||||
// one new item
|
||||
assert_eq!(map.insert('c', 'g'), 2);
|
||||
map.insert('c', 'g');
|
||||
|
||||
assert_eq!(map.get(&'a'), Some(&'e'));
|
||||
assert_eq!(map.get(&'b'), Some(&'f'));
|
||||
assert_eq!(map.get(&'c'), Some(&'g'));
|
||||
|
||||
// update one item
|
||||
assert_eq!(map.insert('b', 'F'), 1);
|
||||
map.insert('b', 'F');
|
||||
|
||||
assert_eq!(map.get(&'a'), Some(&'e'));
|
||||
assert_eq!(map.get(&'b'), Some(&'F'));
|
||||
@@ -425,7 +495,7 @@ mod tests_observable_map {
|
||||
let mut map = ObservableMap::<char, char>::new();
|
||||
|
||||
// insert one item
|
||||
assert_eq!(map.insert('b', 'f'), 0);
|
||||
map.insert('b', 'f');
|
||||
|
||||
// get or create many items
|
||||
assert_eq!(map.get_or_create(&'a', || 'E'), &'E');
|
||||
@@ -442,9 +512,9 @@ mod tests_observable_map {
|
||||
let mut map = ObservableMap::<char, char>::new();
|
||||
|
||||
// new items
|
||||
assert_eq!(map.insert('a', 'e'), 0);
|
||||
assert_eq!(map.insert('b', 'f'), 1);
|
||||
assert_eq!(map.insert('c', 'g'), 2);
|
||||
map.insert('a', 'e');
|
||||
map.insert('b', 'f');
|
||||
map.insert('c', 'g');
|
||||
|
||||
assert_eq!(
|
||||
map.iter().map(|c| c.to_ascii_uppercase()).collect::<Vec<_>>(),
|
||||
@@ -452,12 +522,13 @@ mod tests_observable_map {
|
||||
);
|
||||
}
|
||||
|
||||
#[cfg(not(target_arch = "wasm32"))]
|
||||
#[test]
|
||||
fn test_stream() {
|
||||
let mut map = ObservableMap::<char, char>::new();
|
||||
|
||||
// insert one item
|
||||
assert_eq!(map.insert('b', 'f'), 0);
|
||||
map.insert('b', 'f');
|
||||
|
||||
let (initial_values, mut stream) = map.stream();
|
||||
assert_eq!(initial_values.iter().cloned().collect::<Vec<_>>(), &['f']);
|
||||
@@ -465,8 +536,8 @@ mod tests_observable_map {
|
||||
assert_pending!(stream);
|
||||
|
||||
// insert two items
|
||||
assert_eq!(map.insert('c', 'g'), 1);
|
||||
assert_eq!(map.insert('a', 'e'), 2);
|
||||
map.insert('c', 'g');
|
||||
map.insert('a', 'e');
|
||||
assert_next_eq!(
|
||||
stream,
|
||||
vec![VectorDiff::PushBack { value: 'g' }, VectorDiff::PushBack { value: 'e' }]
|
||||
@@ -475,7 +546,7 @@ mod tests_observable_map {
|
||||
assert_pending!(stream);
|
||||
|
||||
// update one item
|
||||
assert_eq!(map.insert('b', 'F'), 0);
|
||||
map.insert('b', 'F');
|
||||
assert_next_eq!(stream, vec![VectorDiff::Set { index: 0, value: 'F' }]);
|
||||
|
||||
assert_pending!(stream);
|
||||
@@ -484,7 +555,6 @@ mod tests_observable_map {
|
||||
assert_closed!(stream);
|
||||
}
|
||||
}
|
||||
|
||||
/// Store state changes and pass them to the StateStore.
|
||||
#[derive(Clone, Debug, Default)]
|
||||
pub struct StateChanges {
|
||||
|
||||
@@ -23,9 +23,12 @@ use std::{
|
||||
};
|
||||
|
||||
use eyeball::{SharedObservable, Subscriber};
|
||||
#[cfg(not(target_arch = "wasm32"))]
|
||||
use eyeball_im::VectorDiff;
|
||||
use futures_core::Stream;
|
||||
#[cfg(not(target_arch = "wasm32"))]
|
||||
use futures_util::StreamExt;
|
||||
#[cfg(not(target_arch = "wasm32"))]
|
||||
use imbl::Vector;
|
||||
#[cfg(feature = "e2e-encryption")]
|
||||
use matrix_sdk_base::crypto::store::LockableCryptoStore;
|
||||
@@ -917,6 +920,7 @@ impl Client {
|
||||
}
|
||||
|
||||
/// Get a stream of all the rooms, in addition to the existing rooms.
|
||||
#[cfg(not(target_arch = "wasm32"))]
|
||||
pub fn rooms_stream(&self) -> (Vector<Room>, impl Stream<Item = Vec<VectorDiff<Room>>> + '_) {
|
||||
let (rooms, stream) = self.base_client().rooms_stream();
|
||||
|
||||
|
||||
Reference in New Issue
Block a user