doc(event cache): Document the redecryptor

This commit is contained in:
Damir Jelić
2025-10-02 14:10:48 +02:00
parent e934235045
commit 5c3bca86a4

View File

@@ -12,14 +12,63 @@
// See the License for the specific language governing permissions and
// limitations under the License.
//! The REDECRYPTOR is a layer that handles redecryption of events in case we
//! couldn't decrypt them imediatelly
//! The Redecryptor (Rd) is a layer and long-running background task which
//! handles redecryption of events in case we couldn't decrypt them imediatelly.
//!
//! Rd listens to the OlmMachine for received room keys. If a new room key has
//! been received it attempts to find any UTDs in the [`EventCache`]. If Rd
//! decrypts any UTDs from the event cache it will replace the events in the
//! cache and send out new [`RoomEventCacheUpdates`] to any of its listeners.
//!
//! There's an additional gotcha, the [`OlmMachine`] might get recreated by
//! calls to [`BaseClient::regenerate_olm()`]. When this happens we will receive
//! a `None` on the room keys stream and we need to re-listen to it.
//!
//! Another gotcha is that room keys might be received on another process if the
//! Client is operating on a iOS device. A separate process is used in this case
//! to receive push notifications. In this case the room key will be received
//! and Rd won't get notified about it. To work around this decryption requests
//! can be explicitly sent to Rd.
//!
//!
//! ┌─────────────┐
//! │ │
//! ┌───────────┤ Timeline │◄────────────┐
//! │ │ │ │
//! │ └─────▲───────┘ │
//! │ │ │
//! │ │ │
//! │ │ │
//! Decryption │ Redecryptor
//! request │ report
//! │ RoomEventCacheUpdates │
//! │ │ │
//! │ │ │
//! │ ┌──────────┴────────────┐ │
//! │ │ │ │
//! └──────► Redecryptor │────────┘
//! │ │
//! └───────────▲───────────┘
//! │
//! │
//! │
//! Received room keys stream
//! │
//! │
//! │
//! ┌───────┴──────┐
//! │ │
//! │ OlmMachine │
//! │ │
//! └──────────────┘
use std::{collections::BTreeSet, pin::Pin, sync::Weak};
use as_variant::as_variant;
use futures_core::Stream;
use futures_util::{StreamExt, pin_mut};
#[cfg(doc)]
use matrix_sdk_base::{BaseClient, crypto::OlmMachine};
use matrix_sdk_base::{
crypto::{store::types::RoomKeyInfo, types::events::room::encrypted::EncryptedEvent},
deserialized_responses::{DecryptedRoomEvent, TimelineEvent, TimelineEventKind},
@@ -44,6 +93,13 @@ pub struct DecryptionRetryRequest {
type SessionId<'a> = &'a str;
impl EventCache {
/// Retrieve a set of events that we weren't able to decrypt.
///
/// # Arguments
///
/// * `room_id` - The ID of the room where the events were sent to.
/// * `session_id` - The unique ID of the room key that was used to encrypt
/// the event.
async fn get_utds(
&self,
room_id: &RoomId,
@@ -70,6 +126,17 @@ impl EventCache {
Ok(events.into_iter().filter_map(map_timeline_event).collect())
}
/// Handle a chunk of events that we were previously unable to decrypt but
/// have now successfully decrypted.
///
/// This function will replace the existing UTD events in memory and the
/// store and send out a [`RoomEventCacheUpdate`] for the newly
/// decrypted events.
///
/// # Arguments
///
/// * `room_id` - The ID of the room where the events were sent to.
/// * `events` - A chunk of events that were successfully decrypted.
#[instrument(skip_all, fields(room_id))]
async fn on_resolved_utds(
&self,
@@ -110,12 +177,17 @@ impl EventCache {
Ok(())
}
/// Attempt to decrypt a single event.
async fn decrypt_event(
&self,
room_id: &RoomId,
event: &Raw<EncryptedEvent>,
) -> Option<DecryptedRoomEvent> {
let client = self.inner.client().ok()?;
// TODO: Do we need to use the `Room` object to decrypt these events so we can
// calculate if the event should count as a notification, i.e. get the push
// actions. I thing we do, what happens if the room can't be found? We fallback
// to this?
let machine = client.olm_machine().await;
let machine = machine.as_ref()?;
@@ -138,7 +210,10 @@ impl EventCache {
) -> Result<(), EventCacheError> {
trace!("Retrying to decrypt");
// Get all the relevant UTDs.
let events = self.get_utds(room_id, session_id).await?;
// Let's attempt to decrypt them them.
let mut decrypted_events = Vec::with_capacity(events.len());
for (event_id, event) in events {
@@ -149,6 +224,8 @@ impl EventCache {
}
}
// Replace the events and notify listeners that UTDs have been replaced with
// decrypted events.
self.on_resolved_utds(room_id, decrypted_events).await?;
Ok(())
@@ -167,6 +244,10 @@ impl Drop for Redecryptor {
}
impl Redecryptor {
/// Create a new [`Redecryptor`].
///
/// This creates a task that listens to various streams and attempts to
/// redecrypt UTDs that can be found inside the [`EventCache`].
pub(super) fn new(cache: Weak<EventCacheInner>) -> Self {
let (request_decryption_sender, receiver) = tokio::sync::mpsc::unbounded_channel();
@@ -186,6 +267,10 @@ impl Redecryptor {
});
}
/// (Re)-subscribe to the room key stream from the [`OlmMachine`].
///
/// This needs to happen any time this stream returns a `None` meaning that
/// the sending part of the stream has been dropped.
async fn subscribe_to_room_key_stream(
cache: &Weak<EventCacheInner>,
) -> Option<impl Stream<Item = Result<Vec<RoomKeyInfo>, BroadcastStreamRecvError>>> {
@@ -212,6 +297,8 @@ impl Redecryptor {
loop {
tokio::select! {
// An explicit request, presumably from the timeline, has been received to decrypt
// events that were encrypted with a certain room key.
Some(request) = decryption_request_stream.next() => {
let Some(cache) = Self::upgrade_event_cache(cache) else {
break false;
@@ -224,6 +311,8 @@ impl Redecryptor {
.inspect_err(|e| warn!("Error redecrypting after an explicit request was received {e:?}"));
}
}
// The room key stream from the OlmMachine. Needs to be recreated every time we
// receive a `None` from the stream.
room_keys = room_key_stream.next() => {
match room_keys {
Some(Ok(room_keys)) => {
@@ -241,6 +330,8 @@ impl Redecryptor {
Some(Err(_)) => {
todo!("Handle lagging here, how?")
},
// The stream got closed, this could mean that our OlmMachine got
// regenerated, let's return true and try to recreate the stream.
None => {
break true
}
@@ -255,6 +346,9 @@ impl Redecryptor {
cache: Weak<EventCacheInner>,
decryption_request_stream: UnboundedReceiverStream<DecryptionRetryRequest>,
) {
// We pin the decryption request stream here since that one doesn't need to be
// recreated and we don't want to miss messages coming from the stream
// while recreating it unnecessarily.
pin_mut!(decryption_request_stream);
while Self::redecryption_loop(&cache, &mut decryption_request_stream).await {