feat: Add a stream to listen to room keys being inserted to the store

This commit is contained in:
Damir Jelić
2024-11-28 12:47:15 +01:00
parent 177ec1216f
commit f17f4e2bf6

View File

@@ -31,6 +31,7 @@ use futures_util::{
stream::{self, StreamExt},
};
use matrix_sdk_base::crypto::{
store::RoomKeyInfo,
types::requests::{
OutgoingRequest, OutgoingVerificationRequest, RoomMessageRequest, ToDeviceRequest,
},
@@ -58,6 +59,7 @@ use ruma::{
};
use serde::Deserialize;
use tokio::sync::{Mutex, RwLockReadGuard};
use tokio_stream::wrappers::errors::BroadcastStreamRecvError;
use tracing::{debug, error, instrument, trace, warn};
use url::Url;
use vodozemac::Curve25519PublicKey;
@@ -1444,6 +1446,45 @@ impl Encryption {
Ok(ret)
}
/// Receive notifications of room keys being received as a [`Stream`].
///
/// Each time a room key is updated in any way, an update will be sent to
/// the stream. Updates that happen at the same time are batched into a
/// [`Vec`].
///
/// If the reader of the stream lags too far behind, an error is broadcast
/// containing the number of skipped items.
///
/// # Examples
///
/// ```no_run
/// # use matrix_sdk::Client;
/// # use url::Url;
/// # async {
/// # let homeserver = Url::parse("http://example.com")?;
/// # let client = Client::new(homeserver).await?;
/// use futures_util::StreamExt;
///
/// let Some(mut room_keys_stream) =
/// client.encryption().room_keys_received_stream().await
/// else {
/// return Ok(());
/// };
///
/// while let Some(update) = room_keys_stream.next().await {
/// println!("Received room keys {update:?}");
/// }
/// # anyhow::Ok(()) };
/// ```
pub async fn room_keys_received_stream(
&self,
) -> Option<impl Stream<Item = Result<Vec<RoomKeyInfo>, BroadcastStreamRecvError>>> {
let olm = self.client.olm_machine().await;
let olm = olm.as_ref()?;
Some(olm.store().room_keys_received_stream())
}
/// Get the secret storage manager of the client.
pub fn secret_storage(&self) -> SecretStorage {
SecretStorage { client: self.client.to_owned() }