From f17f4e2bf6963f309b6a705f008e07256c7878e8 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Damir=20Jeli=C4=87?= Date: Thu, 28 Nov 2024 12:47:15 +0100 Subject: [PATCH] feat: Add a stream to listen to room keys being inserted to the store --- crates/matrix-sdk/src/encryption/mod.rs | 41 +++++++++++++++++++++++++ 1 file changed, 41 insertions(+) diff --git a/crates/matrix-sdk/src/encryption/mod.rs b/crates/matrix-sdk/src/encryption/mod.rs index 78376cf99..2344fbc77 100644 --- a/crates/matrix-sdk/src/encryption/mod.rs +++ b/crates/matrix-sdk/src/encryption/mod.rs @@ -31,6 +31,7 @@ use futures_util::{ stream::{self, StreamExt}, }; use matrix_sdk_base::crypto::{ + store::RoomKeyInfo, types::requests::{ OutgoingRequest, OutgoingVerificationRequest, RoomMessageRequest, ToDeviceRequest, }, @@ -58,6 +59,7 @@ use ruma::{ }; use serde::Deserialize; use tokio::sync::{Mutex, RwLockReadGuard}; +use tokio_stream::wrappers::errors::BroadcastStreamRecvError; use tracing::{debug, error, instrument, trace, warn}; use url::Url; use vodozemac::Curve25519PublicKey; @@ -1444,6 +1446,45 @@ impl Encryption { Ok(ret) } + /// Receive notifications of room keys being received as a [`Stream`]. + /// + /// Each time a room key is updated in any way, an update will be sent to + /// the stream. Updates that happen at the same time are batched into a + /// [`Vec`]. + /// + /// If the reader of the stream lags too far behind, an error is broadcast + /// containing the number of skipped items. + /// + /// # Examples + /// + /// ```no_run + /// # use matrix_sdk::Client; + /// # use url::Url; + /// # async { + /// # let homeserver = Url::parse("http://example.com")?; + /// # let client = Client::new(homeserver).await?; + /// use futures_util::StreamExt; + /// + /// let Some(mut room_keys_stream) = + /// client.encryption().room_keys_received_stream().await + /// else { + /// return Ok(()); + /// }; + /// + /// while let Some(update) = room_keys_stream.next().await { + /// println!("Received room keys {update:?}"); + /// } + /// # anyhow::Ok(()) }; + /// ``` + pub async fn room_keys_received_stream( + &self, + ) -> Option, BroadcastStreamRecvError>>> { + let olm = self.client.olm_machine().await; + let olm = olm.as_ref()?; + + Some(olm.store().room_keys_received_stream()) + } + /// Get the secret storage manager of the client. pub fn secret_storage(&self) -> SecretStorage { SecretStorage { client: self.client.to_owned() }