Impmenent Store::identity_stream_raw

An alternative to `user_identity_stream`, which does not hold a reference to
the `CryptoStore` and hence is less prone to leaking references
This commit is contained in:
Richard van der Hoff
2023-09-07 22:13:04 +01:00
parent eb865f490a
commit 783adb424e
2 changed files with 51 additions and 1 deletions

View File

@@ -996,7 +996,7 @@ pub(crate) mod tests {
device_id, user_id, TransactionId,
};
use serde_json::json;
use stream_assert::assert_ready;
use stream_assert::{assert_closed, assert_pending, assert_ready};
use super::testing::{device_id, key_query, manager, other_key_query, other_user_id, user_id};
use crate::identities::manager::testing::own_key_query;
@@ -1240,4 +1240,30 @@ pub(crate) mod tests {
let update = assert_ready!(stream);
assert!(!update.new.is_empty(), "The identities update should contain some identities");
}
#[async_test]
async fn identities_stream_raw() {
let mut manager = Some(manager().await);
let (request_id, _) = manager.as_ref().unwrap().build_key_query_for_users(vec![user_id()]);
let stream = manager.as_ref().unwrap().store.identities_stream_raw();
pin_mut!(stream);
manager
.as_ref()
.unwrap()
.receive_keys_query_response(&request_id, &own_key_query())
.await
.unwrap();
let (identity_update, _) = assert_ready!(stream);
assert_eq!(identity_update.new.len(), 1);
assert_eq!(identity_update.new[0].user_id(), user_id());
assert_pending!(stream);
// dropping the manager (and hence dropping the store) should close the stream
manager.take();
assert_closed!(stream);
}
}

View File

@@ -1071,6 +1071,9 @@ impl Store {
///
/// If the reader of the stream lags too far behind, a warning will be
/// logged and items will be dropped.
///
/// The stream will terminate once all references to the underlying
/// [`CryptoStoreWrapper`] are dropped.
pub fn room_keys_received_stream(&self) -> impl Stream<Item = Vec<RoomKeyInfo>> {
self.inner.store.room_keys_received_stream()
}
@@ -1083,6 +1086,10 @@ impl Store {
/// changed. Users can subscribe to this stream and receive updates in
/// real-time.
///
/// Caution: the returned stream will never terminate, and it holds a
/// reference to the [`CryptoStore`]. Listeners should be careful to avoid
/// resource leaks.
///
/// # Examples
///
/// ```no_run
@@ -1132,6 +1139,10 @@ impl Store {
/// is discovered or when an existing device's information is changed. Users
/// can subscribe to this stream and receive updates in real-time.
///
/// Caution: the returned stream will never terminate, and it holds a
/// reference to the [`CryptoStore`]. Listeners should be careful to avoid
/// resource leaks.
///
/// # Examples
///
/// ```no_run
@@ -1165,6 +1176,19 @@ impl Store {
})
}
/// Returns a [`Stream`] of user identity and device updates
///
/// The stream returned by this method returns the same data as
/// [`Store::user_identities_stream`] and [`Store::devices_stream`] but does
/// not include references to the `VerificationMachine`. It is therefore a
/// lower-level view on that data.
///
/// The stream will terminate once all references to the underlying
/// [`CryptoStoreWrapper`] are dropped.
pub fn identities_stream_raw(&self) -> impl Stream<Item = (IdentityChanges, DeviceChanges)> {
self.inner.store.identities_stream().map(|(_, identities, devices)| (identities, devices))
}
/// Creates a `CrossProcessStoreLock` for this store, that will contain the
/// given key and value when hold.
pub fn create_store_lock(