From a2f89e85b967175f8283cefcdb7ea7a3f574c321 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Damir=20Jeli=C4=87?= Date: Tue, 7 Oct 2025 16:08:16 +0200 Subject: [PATCH] feat: Redecryptor start to send out redecryptor reports --- crates/matrix-sdk/src/event_cache/mod.rs | 21 ++- .../matrix-sdk/src/event_cache/redecryptor.rs | 138 +++++++++++++++--- 2 files changed, 137 insertions(+), 22 deletions(-) diff --git a/crates/matrix-sdk/src/event_cache/mod.rs b/crates/matrix-sdk/src/event_cache/mod.rs index 217033b88..50c67c1b7 100644 --- a/crates/matrix-sdk/src/event_cache/mod.rs +++ b/crates/matrix-sdk/src/event_cache/mod.rs @@ -208,6 +208,9 @@ impl EventCache { linked_chunk_update_sender.clone(), ))); + #[cfg(feature = "e2e-encryption")] + let redecryption_channels = redecryptor::RedecryptorChannels::new(); + Self { inner: Arc::new(EventCacheInner { client, @@ -221,6 +224,8 @@ impl EventCache { _thread_subscriber_task: thread_subscriber_task, #[cfg(feature = "experimental-search")] _search_indexing_task: search_indexing_task, + #[cfg(feature = "e2e-encryption")] + redecryption_channels, thread_subscriber_receiver, }), } @@ -266,7 +271,18 @@ impl EventCache { )); #[cfg(feature = "e2e-encryption")] - let redecryptor = redecryptor::Redecryptor::new(Arc::downgrade(&self.inner)); + let redecryptor = { + let receiver = self + .inner + .redecryption_channels + .decryption_request_receiver + .lock() + .take() + .expect("We should have initialized the channel an subscribing should happen only once"); + + redecryptor::Redecryptor::new(Arc::downgrade(&self.inner), receiver) + }; + Arc::new(EventCacheDropHandles { listen_updates_task, @@ -853,6 +869,9 @@ struct EventCacheInner { /// This is helpful for tests to coordinate that a new thread subscription /// has been sent or not. thread_subscriber_receiver: Receiver<()>, + + #[cfg(feature = "e2e-encryption")] + redecryption_channels: redecryptor::RedecryptorChannels, } type AutoShrinkChannelPayload = OwnedRoomId; diff --git a/crates/matrix-sdk/src/event_cache/redecryptor.rs b/crates/matrix-sdk/src/event_cache/redecryptor.rs index 9c02c0bff..26bff9be2 100644 --- a/crates/matrix-sdk/src/event_cache/redecryptor.rs +++ b/crates/matrix-sdk/src/event_cache/redecryptor.rs @@ -72,25 +72,75 @@ use matrix_sdk_base::{BaseClient, crypto::OlmMachine}; use matrix_sdk_base::{ crypto::{store::types::RoomKeyInfo, types::events::room::encrypted::EncryptedEvent}, deserialized_responses::{DecryptedRoomEvent, TimelineEvent, TimelineEventKind}, + locks::Mutex, }; +#[cfg(doc)] +use matrix_sdk_common::deserialized_responses::EncryptionInfo; use matrix_sdk_common::executor::{JoinHandle, spawn}; use ruma::{OwnedEventId, OwnedRoomId, RoomId, events::AnySyncTimelineEvent, serde::Raw}; -use tokio::sync::mpsc::UnboundedSender; -use tokio_stream::wrappers::{UnboundedReceiverStream, errors::BroadcastStreamRecvError}; +use tokio::sync::{ + broadcast, + mpsc::{UnboundedReceiver, UnboundedSender, unbounded_channel}, +}; +use tokio_stream::wrappers::{ + BroadcastStream, UnboundedReceiverStream, errors::BroadcastStreamRecvError, +}; use tracing::{info, instrument, trace, warn}; use crate::event_cache::{ EventCache, EventCacheError, EventCacheInner, EventsOrigin, RoomEventCacheUpdate, }; +type SessionId<'a> = &'a str; +type OwnedSessionId = String; + /// The information sent across the channel to the long-running task requesting /// that the supplied set of sessions be retried. +#[derive(Debug, Clone)] pub struct DecryptionRetryRequest { - room_id: OwnedRoomId, - session_ids: BTreeSet, + /// The room ID of the room the events belong to. + pub room_id: OwnedRoomId, + /// Events that are not decrypted. + pub utd_session_ids: BTreeSet, + /// Events that are decrypted but might need to have their + /// [`EncryptionInfo`] refreshed. + pub refresh_info_session_ids: BTreeSet, } -type SessionId<'a> = &'a str; +/// A report coming from the redecryptor. +#[derive(Debug, Clone)] +pub enum RedecryptorReport { + /// Events which we were able to decrypt. + ResolvedUtds { + /// The room ID of the room the events belong to. + room_id: OwnedRoomId, + /// The list of event IDs of the decrypted events. + events: BTreeSet, + }, + /// The redecryptor might have missed some room keys so it might not have + /// re-decrypted events that are now decryptable. + Lagging, +} + +pub(super) struct RedecryptorChannels { + utd_reporter: broadcast::Sender, + pub(super) decryption_request_sender: UnboundedSender, + pub(super) decryption_request_receiver: + Mutex>>, +} + +impl RedecryptorChannels { + pub(super) fn new() -> Self { + let (utd_reporter, _) = broadcast::channel(100); + let (decryption_request_sender, decryption_request_receiver) = unbounded_channel(); + + Self { + utd_reporter, + decryption_request_sender, + decryption_request_receiver: Mutex::new(Some(decryption_request_receiver)), + } + } +} impl EventCache { /// Retrieve a set of events that we weren't able to decrypt. @@ -148,7 +198,7 @@ impl EventCache { let (room_cache, _drop_handles) = self.for_room(room_id).await?; let mut state = room_cache.inner.state.write().await; - let event_ids: Vec<_> = events.iter().map(|(event_id, _)| event_id).collect(); + let event_ids: BTreeSet<_> = events.iter().cloned().map(|(event_id, _)| event_id).collect(); trace!(?event_ids, "Replacing successfully re-decrypted events"); @@ -174,6 +224,14 @@ impl EventCache { origin: EventsOrigin::Cache, }); + // We report that we resolved some UTDs, this is mainly for listeners that don't + // care about the actual events, just about the fact that UTDs got + // resolved. Not sure if we'll have more such listeners but the UTD hook + // is one such thing. + let report = + RedecryptorReport::ResolvedUtds { room_id: room_id.to_owned(), events: event_ids }; + let _ = self.inner.redecryption_channels.utd_reporter.send(report); + Ok(()) } @@ -230,10 +288,28 @@ impl EventCache { Ok(()) } + + /// Explicitly request the redecryption of a set of events. + /// + /// TODO: Explain when and why this might be useful. + pub fn request_decryption(&self, request: DecryptionRetryRequest) { + let _ = + self.inner.redecryption_channels.decryption_request_sender.send(request).inspect_err( + |_| warn!("Requesting a decryption while the redecryption task has been shut down"), + ); + } + + /// Subscribe to reports that the redecryptor generates. + /// + /// TODO: Explain when the redecryptor might send such reports. + pub fn subscrube_to_decryption_reports( + &self, + ) -> impl Stream> { + BroadcastStream::new(self.inner.redecryption_channels.utd_reporter.subscribe()) + } } pub(crate) struct Redecryptor { - request_decryption_sender: UnboundedSender, task: JoinHandle<()>, } @@ -248,23 +324,17 @@ impl Redecryptor { /// /// This creates a task that listens to various streams and attempts to /// redecrypt UTDs that can be found inside the [`EventCache`]. - pub(super) fn new(cache: Weak) -> Self { - let (request_decryption_sender, receiver) = tokio::sync::mpsc::unbounded_channel(); - + pub(super) fn new( + cache: Weak, + receiver: UnboundedReceiver, + ) -> Self { let task = spawn(async { let request_redecryption_stream = UnboundedReceiverStream::new(receiver); Self::listen_for_room_keys_task(cache, request_redecryption_stream).await; }); - Self { task, request_decryption_sender } - } - - #[allow(dead_code)] - pub(super) fn request_decryption(&self, request: DecryptionRetryRequest) { - let _ = self.request_decryption_sender.send(request).inspect_err(|_| { - warn!("Requesting a decryption while the redecryption task has been shut down") - }); + Self { task } } /// (Re)-subscribe to the room key stream from the [`OlmMachine`]. @@ -304,18 +374,23 @@ impl Redecryptor { break false; }; - for session_id in request.session_ids { + for session_id in request.utd_session_ids { let _ = cache .retry_decryption(&request.room_id, &session_id) .await .inspect_err(|e| warn!("Error redecrypting after an explicit request was received {e:?}")); } + + // TODO: Deal with encryption info updating as well. } // The room key stream from the OlmMachine. Needs to be recreated every time we // receive a `None` from the stream. room_keys = room_key_stream.next() => { match room_keys { Some(Ok(room_keys)) => { + // Alright, some room keys were received and persisted in our store, + // let's attempt to redecrypt events that were encrypted using these + // room keys. let Some(cache) = Self::upgrade_event_cache(cache) else { break false; }; @@ -326,9 +401,21 @@ impl Redecryptor { .await .inspect_err(|e| warn!("Error redecrypting {e:?}")); } + + // TODO: Deal with encryption info updating as well. }, Some(Err(_)) => { - todo!("Handle lagging here, how?") + // We missed some room keys, we need to report this in case a listener + // has and idea which UTDs we should attempt to redecrypt. + // + // This would most likely be the timeline. The timeline might attempt + // to redecrypt all UTDs it is showing to the user. + let Some(cache) = Self::upgrade_event_cache(cache) else { + break false; + }; + + let message = RedecryptorReport::Lagging; + let _ = cache.inner.redecryption_channels.utd_reporter.send(message); }, // The stream got closed, this could mean that our OlmMachine got // regenerated, let's return true and try to recreate the stream. @@ -352,7 +439,16 @@ impl Redecryptor { pin_mut!(decryption_request_stream); while Self::redecryption_loop(&cache, &mut decryption_request_stream).await { - info!("Regenerating the re-decryption streams") + info!("Regenerating the re-decryption streams"); + + let Some(cache) = Self::upgrade_event_cache(&cache) else { + break; + }; + + // Report that the stream got recreated so listeners can attempt to redecrypt + // any UTDs they might be seeing. + let message = RedecryptorReport::Lagging; + let _ = cache.inner.redecryption_channels.utd_reporter.send(message); } info!("Shutting down the event cache redecryptor");