From 04d56130c32149aef92f2d7baffa9c01b3b32d9f Mon Sep 17 00:00:00 2001 From: Jonas Platte Date: Tue, 27 Jun 2023 16:54:45 +0200 Subject: [PATCH] sdk: Re-export room_keys[_for_room]_received_stream from Encryption --- crates/matrix-sdk/src/encryption/mod.rs | 37 +++++++++++++++++++++++-- 1 file changed, 35 insertions(+), 2 deletions(-) diff --git a/crates/matrix-sdk/src/encryption/mod.rs b/crates/matrix-sdk/src/encryption/mod.rs index 13a3c0108..38f810017 100644 --- a/crates/matrix-sdk/src/encryption/mod.rs +++ b/crates/matrix-sdk/src/encryption/mod.rs @@ -25,11 +25,14 @@ use std::{ }; use eyeball::shared::Observable as SharedObservable; +use futures_core::Stream; use futures_util::{ future::try_join, stream::{self, StreamExt}, }; -use matrix_sdk_base::crypto::{OlmMachine, OutgoingRequest, RoomMessageRequest, ToDeviceRequest}; +use matrix_sdk_base::crypto::{ + store::RoomKeyInfo, OlmMachine, OutgoingRequest, RoomMessageRequest, ToDeviceRequest, +}; use ruma::{ api::client::{ backup::add_backup_keys::v3::Response as KeysBackupResponse, @@ -50,7 +53,7 @@ use ruma::{ }, ImageInfo, MediaSource, ThumbnailInfo, }, - DeviceId, OwnedDeviceId, OwnedUserId, TransactionId, UserId, + DeviceId, OwnedDeviceId, OwnedUserId, RoomId, TransactionId, UserId, }; use tokio::sync::RwLockReadGuard; use tracing::{debug, instrument, trace, warn}; @@ -846,6 +849,36 @@ impl Encryption { Ok(olm.import_room_keys(import, false, |_, _| {}).await?) } + /// Receive notifications of room keys being received as a [`Stream`]. + /// + /// Each time a room key is updated in any way, an update will be sent to + /// the stream. Updates that happen at the same time are batched into a + /// [`Vec`]. + /// + /// If the reader of the stream lags too far behind, a warning will be + /// logged and items will be dropped. + pub async fn room_keys_received_stream(&self) -> Option>> { + Some(self.client.olm_machine().await.as_ref()?.store().room_keys_received_stream()) + } + + /// Receive notifications of a room keys for a specific room being received. + /// + /// Same as [`room_keys_received_stream`][Self::room_keys_received_stream], + /// but filtered by the given room ID. + pub async fn room_keys_for_room_received_stream( + &self, + room_id: &RoomId, + ) -> Option>> { + Some( + self.client + .olm_machine() + .await + .as_ref()? + .store() + .room_keys_for_room_received_stream(room_id), + ) + } + /// Enables the crypto-store cross-process lock. /// /// This may be required if there are multiple processes that may do writes