From 783adb424e23eeac8067844bcf5510f9bede70e1 Mon Sep 17 00:00:00 2001 From: Richard van der Hoff Date: Thu, 7 Sep 2023 22:13:04 +0100 Subject: [PATCH] Impmenent `Store::identity_stream_raw` An alternative to `user_identity_stream`, which does not hold a reference to the `CryptoStore` and hence is less prone to leaking references --- .../src/identities/manager.rs | 28 ++++++++++++++++++- crates/matrix-sdk-crypto/src/store/mod.rs | 24 ++++++++++++++++ 2 files changed, 51 insertions(+), 1 deletion(-) diff --git a/crates/matrix-sdk-crypto/src/identities/manager.rs b/crates/matrix-sdk-crypto/src/identities/manager.rs index 52c873235..620d8e554 100644 --- a/crates/matrix-sdk-crypto/src/identities/manager.rs +++ b/crates/matrix-sdk-crypto/src/identities/manager.rs @@ -996,7 +996,7 @@ pub(crate) mod tests { device_id, user_id, TransactionId, }; use serde_json::json; - use stream_assert::assert_ready; + use stream_assert::{assert_closed, assert_pending, assert_ready}; use super::testing::{device_id, key_query, manager, other_key_query, other_user_id, user_id}; use crate::identities::manager::testing::own_key_query; @@ -1240,4 +1240,30 @@ pub(crate) mod tests { let update = assert_ready!(stream); assert!(!update.new.is_empty(), "The identities update should contain some identities"); } + + #[async_test] + async fn identities_stream_raw() { + let mut manager = Some(manager().await); + let (request_id, _) = manager.as_ref().unwrap().build_key_query_for_users(vec![user_id()]); + + let stream = manager.as_ref().unwrap().store.identities_stream_raw(); + pin_mut!(stream); + + manager + .as_ref() + .unwrap() + .receive_keys_query_response(&request_id, &own_key_query()) + .await + .unwrap(); + + let (identity_update, _) = assert_ready!(stream); + assert_eq!(identity_update.new.len(), 1); + assert_eq!(identity_update.new[0].user_id(), user_id()); + + assert_pending!(stream); + + // dropping the manager (and hence dropping the store) should close the stream + manager.take(); + assert_closed!(stream); + } } diff --git a/crates/matrix-sdk-crypto/src/store/mod.rs b/crates/matrix-sdk-crypto/src/store/mod.rs index 069f1f100..bd3959c05 100644 --- a/crates/matrix-sdk-crypto/src/store/mod.rs +++ b/crates/matrix-sdk-crypto/src/store/mod.rs @@ -1071,6 +1071,9 @@ impl Store { /// /// If the reader of the stream lags too far behind, a warning will be /// logged and items will be dropped. + /// + /// The stream will terminate once all references to the underlying + /// [`CryptoStoreWrapper`] are dropped. pub fn room_keys_received_stream(&self) -> impl Stream> { self.inner.store.room_keys_received_stream() } @@ -1083,6 +1086,10 @@ impl Store { /// changed. Users can subscribe to this stream and receive updates in /// real-time. /// + /// Caution: the returned stream will never terminate, and it holds a + /// reference to the [`CryptoStore`]. Listeners should be careful to avoid + /// resource leaks. + /// /// # Examples /// /// ```no_run @@ -1132,6 +1139,10 @@ impl Store { /// is discovered or when an existing device's information is changed. Users /// can subscribe to this stream and receive updates in real-time. /// + /// Caution: the returned stream will never terminate, and it holds a + /// reference to the [`CryptoStore`]. Listeners should be careful to avoid + /// resource leaks. + /// /// # Examples /// /// ```no_run @@ -1165,6 +1176,19 @@ impl Store { }) } + /// Returns a [`Stream`] of user identity and device updates + /// + /// The stream returned by this method returns the same data as + /// [`Store::user_identities_stream`] and [`Store::devices_stream`] but does + /// not include references to the `VerificationMachine`. It is therefore a + /// lower-level view on that data. + /// + /// The stream will terminate once all references to the underlying + /// [`CryptoStoreWrapper`] are dropped. + pub fn identities_stream_raw(&self) -> impl Stream { + self.inner.store.identities_stream().map(|(_, identities, devices)| (identities, devices)) + } + /// Creates a `CrossProcessStoreLock` for this store, that will contain the /// given key and value when hold. pub fn create_store_lock(