From fa1ebbfdb84afc8c05ef04b441ce2fbc044f9fe9 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Damir=20Jeli=C4=87?= Date: Mon, 15 Dec 2025 12:11:29 +0100 Subject: [PATCH] feat(r2d2): Send out a report when backups get enabled --- .../controller/decryption_retry_task.rs | 4 ++ crates/matrix-sdk/src/event_cache/mod.rs | 2 +- .../matrix-sdk/src/event_cache/redecryptor.rs | 72 ++++++++++++++++--- 3 files changed, 68 insertions(+), 10 deletions(-) diff --git a/crates/matrix-sdk-ui/src/timeline/controller/decryption_retry_task.rs b/crates/matrix-sdk-ui/src/timeline/controller/decryption_retry_task.rs index 5d3d4be57..03f2653c5 100644 --- a/crates/matrix-sdk-ui/src/timeline/controller/decryption_retry_task.rs +++ b/crates/matrix-sdk-ui/src/timeline/controller/decryption_retry_task.rs @@ -101,6 +101,10 @@ async fn redecryption_report_task(timeline_controller: TimelineController) { // redecryptor to attempt redecryption of our timeline items. timeline_controller.retry_event_decryption(None).await; } + Ok(RedecryptorReport::BackupAvailable) => { + // Do nothing for now, this is handled by the + // `backup_states_task()`. + } } } } diff --git a/crates/matrix-sdk/src/event_cache/mod.rs b/crates/matrix-sdk/src/event_cache/mod.rs index 37c2bd0fe..fd6d91b6c 100644 --- a/crates/matrix-sdk/src/event_cache/mod.rs +++ b/crates/matrix-sdk/src/event_cache/mod.rs @@ -280,7 +280,7 @@ impl EventCache { .take() .expect("We should have initialized the channel an subscribing should happen only once"); - redecryptor::Redecryptor::new(Arc::downgrade(&self.inner), receiver, &self.inner.linked_chunk_update_sender) + redecryptor::Redecryptor::new(&client, Arc::downgrade(&self.inner), receiver, &self.inner.linked_chunk_update_sender) }; diff --git a/crates/matrix-sdk/src/event_cache/redecryptor.rs b/crates/matrix-sdk/src/event_cache/redecryptor.rs index 9ac89af37..17747f6a1 100644 --- a/crates/matrix-sdk/src/event_cache/redecryptor.rs +++ b/crates/matrix-sdk/src/event_cache/redecryptor.rs @@ -151,7 +151,8 @@ use tracing::{info, instrument, trace, warn}; use super::RoomEventCache; use super::{EventCache, EventCacheError, EventCacheInner, EventsOrigin, RoomEventCacheUpdate}; use crate::{ - Room, + Client, Room, + encryption::backups::BackupState, event_cache::{RoomEventCacheGenericUpdate, RoomEventCacheLinkedChunkUpdate}, room::PushContext, }; @@ -189,6 +190,11 @@ pub enum RedecryptorReport { /// The redecryptor might have missed some room keys so it might not have /// re-decrypted events that are now decryptable. Lagging, + /// A room key backup has become available. + /// + /// This means that components might want to tell R2D2 about events they + /// care about to attempt a decryption. + BackupAvailable, } pub(super) struct RedecryptorChannels { @@ -625,6 +631,14 @@ impl EventCache { /// // it which events we care about, i.e. which events we're displaying to the /// // user, and let it redecrypt things with an explicit request. /// } + /// RedecryptorReport::BackupAvailable => { + /// // A backup has become available. We can, just like in the Lagging case, tell + /// // the event cache to attempt to redecrypt some events. + /// // + /// // This is only necessary with the BackupDownloadStrategy::OnDecryptionFailure + /// // as the decryption attempt in this case will trigger the download of the + /// // room key from the backup. + /// } /// RedecryptorReport::ResolvedUtds { .. } => { /// // This may be interesting for statistical reasons or in case we'd like to /// // fetch and inspect these events in some manner. @@ -645,13 +659,12 @@ fn upgrade_event_cache(cache: &Weak) -> Option { cache.upgrade().map(|inner| EventCache { inner }) } -fn report_lag(cache: &Weak) -> Result<(), ()> { +fn send_report(cache: &Weak, report: RedecryptorReport) -> Result<(), ()> { let Some(cache) = upgrade_event_cache(cache) else { return Err(()); }; - let message = RedecryptorReport::Lagging; - let _ = cache.inner.redecryption_channels.utd_reporter.send(message); + let _ = cache.inner.redecryption_channels.utd_reporter.send(report); Ok(()) } @@ -672,11 +685,13 @@ impl Redecryptor { /// This creates a task that listens to various streams and attempts to /// redecrypt UTDs that can be found inside the [`EventCache`]. pub(super) fn new( + client: &Client, cache: Weak, receiver: UnboundedReceiver, linked_chunk_update_sender: &Sender, ) -> Self { let linked_chunk_stream = BroadcastStream::new(linked_chunk_update_sender.subscribe()); + let backup_state_stream = client.encryption().backups().state_stream(); let task = spawn(async { let request_redecryption_stream = UnboundedReceiverStream::new(receiver); @@ -685,6 +700,7 @@ impl Redecryptor { cache, request_redecryption_stream, linked_chunk_stream, + backup_state_stream, ) .await; }) @@ -718,6 +734,9 @@ impl Redecryptor { events_stream: &mut Pin< &mut impl Stream>, >, + backup_state_stream: &mut Pin< + &mut impl Stream>, + >, ) -> bool { let Some((room_key_stream, withheld_stream)) = Self::subscribe_to_room_key_stream(cache).await @@ -794,7 +813,7 @@ impl Redecryptor { // user. warn!("The room key stream lagged, reporting the lag to our listeners"); - if report_lag(cache).is_err() { + if send_report(cache, RedecryptorReport::Lagging).is_err() { break false; } }, @@ -848,7 +867,35 @@ impl Redecryptor { ); } Err(_) => { - if report_lag(cache).is_err() { + if send_report(cache, RedecryptorReport::Lagging).is_err() { + break false; + } + } + } + } + Some(backup_state_update) = backup_state_stream.next() => { + match backup_state_update { + Ok(state) => { + match state { + BackupState::Unknown | + BackupState::Creating | + BackupState::Enabling | + BackupState::Resuming | + BackupState::Downloading | + BackupState::Disabling =>{ + // Those states aren't particularly interesting to components + // listening to R2D2 reports. + } + BackupState::Enabled => { + if send_report(cache, RedecryptorReport::BackupAvailable).is_err() { + break false; + } + + } + } + } + Err(_) => { + if send_report(cache, RedecryptorReport::Lagging).is_err() { break false; } } @@ -863,21 +910,28 @@ impl Redecryptor { cache: Weak, decryption_request_stream: UnboundedReceiverStream, events_stream: BroadcastStream, + backup_state_stream: impl Stream>, ) { // We pin the decryption request stream here since that one doesn't need to be // recreated and we don't want to miss messages coming from the stream // while recreating it unnecessarily. pin_mut!(decryption_request_stream); pin_mut!(events_stream); + pin_mut!(backup_state_stream); - while Self::redecryption_loop(&cache, &mut decryption_request_stream, &mut events_stream) - .await + while Self::redecryption_loop( + &cache, + &mut decryption_request_stream, + &mut events_stream, + &mut backup_state_stream, + ) + .await { info!("Regenerating the re-decryption streams"); // Report that the stream got recreated so listeners can attempt to redecrypt // any UTDs they might be seeing. - if report_lag(&cache).is_err() { + if send_report(&cache, RedecryptorReport::Lagging).is_err() { break; } }