From 5c3bca86a438c8eecc253cab1eb80ec583d5be4f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Damir=20Jeli=C4=87?= Date: Thu, 2 Oct 2025 14:10:48 +0200 Subject: [PATCH] doc(event cache): Document the redecryptor --- .../matrix-sdk/src/event_cache/redecryptor.rs | 98 ++++++++++++++++++- 1 file changed, 96 insertions(+), 2 deletions(-) diff --git a/crates/matrix-sdk/src/event_cache/redecryptor.rs b/crates/matrix-sdk/src/event_cache/redecryptor.rs index f3e81c554..9c02c0bff 100644 --- a/crates/matrix-sdk/src/event_cache/redecryptor.rs +++ b/crates/matrix-sdk/src/event_cache/redecryptor.rs @@ -12,14 +12,63 @@ // See the License for the specific language governing permissions and // limitations under the License. -//! The REDECRYPTOR is a layer that handles redecryption of events in case we -//! couldn't decrypt them imediatelly +//! The Redecryptor (Rd) is a layer and long-running background task which +//! handles redecryption of events in case we couldn't decrypt them imediatelly. +//! +//! Rd listens to the OlmMachine for received room keys. If a new room key has +//! been received it attempts to find any UTDs in the [`EventCache`]. If Rd +//! decrypts any UTDs from the event cache it will replace the events in the +//! cache and send out new [`RoomEventCacheUpdates`] to any of its listeners. +//! +//! There's an additional gotcha, the [`OlmMachine`] might get recreated by +//! calls to [`BaseClient::regenerate_olm()`]. When this happens we will receive +//! a `None` on the room keys stream and we need to re-listen to it. +//! +//! Another gotcha is that room keys might be received on another process if the +//! Client is operating on a iOS device. A separate process is used in this case +//! to receive push notifications. In this case the room key will be received +//! and Rd won't get notified about it. To work around this decryption requests +//! can be explicitly sent to Rd. +//! +//! +//! ┌─────────────┐ +//! │ │ +//! ┌───────────┤ Timeline │◄────────────┐ +//! │ │ │ │ +//! │ └─────▲───────┘ │ +//! │ │ │ +//! │ │ │ +//! │ │ │ +//! Decryption │ Redecryptor +//! request │ report +//! │ RoomEventCacheUpdates │ +//! │ │ │ +//! │ │ │ +//! │ ┌──────────┴────────────┐ │ +//! │ │ │ │ +//! └──────► Redecryptor │────────┘ +//! │ │ +//! └───────────▲───────────┘ +//! │ +//! │ +//! │ +//! Received room keys stream +//! │ +//! │ +//! │ +//! ┌───────┴──────┐ +//! │ │ +//! │ OlmMachine │ +//! │ │ +//! └──────────────┘ use std::{collections::BTreeSet, pin::Pin, sync::Weak}; use as_variant::as_variant; use futures_core::Stream; use futures_util::{StreamExt, pin_mut}; +#[cfg(doc)] +use matrix_sdk_base::{BaseClient, crypto::OlmMachine}; use matrix_sdk_base::{ crypto::{store::types::RoomKeyInfo, types::events::room::encrypted::EncryptedEvent}, deserialized_responses::{DecryptedRoomEvent, TimelineEvent, TimelineEventKind}, @@ -44,6 +93,13 @@ pub struct DecryptionRetryRequest { type SessionId<'a> = &'a str; impl EventCache { + /// Retrieve a set of events that we weren't able to decrypt. + /// + /// # Arguments + /// + /// * `room_id` - The ID of the room where the events were sent to. + /// * `session_id` - The unique ID of the room key that was used to encrypt + /// the event. async fn get_utds( &self, room_id: &RoomId, @@ -70,6 +126,17 @@ impl EventCache { Ok(events.into_iter().filter_map(map_timeline_event).collect()) } + /// Handle a chunk of events that we were previously unable to decrypt but + /// have now successfully decrypted. + /// + /// This function will replace the existing UTD events in memory and the + /// store and send out a [`RoomEventCacheUpdate`] for the newly + /// decrypted events. + /// + /// # Arguments + /// + /// * `room_id` - The ID of the room where the events were sent to. + /// * `events` - A chunk of events that were successfully decrypted. #[instrument(skip_all, fields(room_id))] async fn on_resolved_utds( &self, @@ -110,12 +177,17 @@ impl EventCache { Ok(()) } + /// Attempt to decrypt a single event. async fn decrypt_event( &self, room_id: &RoomId, event: &Raw, ) -> Option { let client = self.inner.client().ok()?; + // TODO: Do we need to use the `Room` object to decrypt these events so we can + // calculate if the event should count as a notification, i.e. get the push + // actions. I thing we do, what happens if the room can't be found? We fallback + // to this? let machine = client.olm_machine().await; let machine = machine.as_ref()?; @@ -138,7 +210,10 @@ impl EventCache { ) -> Result<(), EventCacheError> { trace!("Retrying to decrypt"); + // Get all the relevant UTDs. let events = self.get_utds(room_id, session_id).await?; + + // Let's attempt to decrypt them them. let mut decrypted_events = Vec::with_capacity(events.len()); for (event_id, event) in events { @@ -149,6 +224,8 @@ impl EventCache { } } + // Replace the events and notify listeners that UTDs have been replaced with + // decrypted events. self.on_resolved_utds(room_id, decrypted_events).await?; Ok(()) @@ -167,6 +244,10 @@ impl Drop for Redecryptor { } impl Redecryptor { + /// Create a new [`Redecryptor`]. + /// + /// This creates a task that listens to various streams and attempts to + /// redecrypt UTDs that can be found inside the [`EventCache`]. pub(super) fn new(cache: Weak) -> Self { let (request_decryption_sender, receiver) = tokio::sync::mpsc::unbounded_channel(); @@ -186,6 +267,10 @@ impl Redecryptor { }); } + /// (Re)-subscribe to the room key stream from the [`OlmMachine`]. + /// + /// This needs to happen any time this stream returns a `None` meaning that + /// the sending part of the stream has been dropped. async fn subscribe_to_room_key_stream( cache: &Weak, ) -> Option, BroadcastStreamRecvError>>> { @@ -212,6 +297,8 @@ impl Redecryptor { loop { tokio::select! { + // An explicit request, presumably from the timeline, has been received to decrypt + // events that were encrypted with a certain room key. Some(request) = decryption_request_stream.next() => { let Some(cache) = Self::upgrade_event_cache(cache) else { break false; @@ -224,6 +311,8 @@ impl Redecryptor { .inspect_err(|e| warn!("Error redecrypting after an explicit request was received {e:?}")); } } + // The room key stream from the OlmMachine. Needs to be recreated every time we + // receive a `None` from the stream. room_keys = room_key_stream.next() => { match room_keys { Some(Ok(room_keys)) => { @@ -241,6 +330,8 @@ impl Redecryptor { Some(Err(_)) => { todo!("Handle lagging here, how?") }, + // The stream got closed, this could mean that our OlmMachine got + // regenerated, let's return true and try to recreate the stream. None => { break true } @@ -255,6 +346,9 @@ impl Redecryptor { cache: Weak, decryption_request_stream: UnboundedReceiverStream, ) { + // We pin the decryption request stream here since that one doesn't need to be + // recreated and we don't want to miss messages coming from the stream + // while recreating it unnecessarily. pin_mut!(decryption_request_stream); while Self::redecryption_loop(&cache, &mut decryption_request_stream).await {