mirror of
https://github.com/matrix-org/matrix-rust-sdk.git
synced 2026-05-06 23:15:08 -04:00
feat(r2d2): Send out a report when backups get enabled
This commit is contained in:
@@ -101,6 +101,10 @@ async fn redecryption_report_task(timeline_controller: TimelineController) {
|
||||
// redecryptor to attempt redecryption of our timeline items.
|
||||
timeline_controller.retry_event_decryption(None).await;
|
||||
}
|
||||
Ok(RedecryptorReport::BackupAvailable) => {
|
||||
// Do nothing for now, this is handled by the
|
||||
// `backup_states_task()`.
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -280,7 +280,7 @@ impl EventCache {
|
||||
.take()
|
||||
.expect("We should have initialized the channel an subscribing should happen only once");
|
||||
|
||||
redecryptor::Redecryptor::new(Arc::downgrade(&self.inner), receiver, &self.inner.linked_chunk_update_sender)
|
||||
redecryptor::Redecryptor::new(&client, Arc::downgrade(&self.inner), receiver, &self.inner.linked_chunk_update_sender)
|
||||
};
|
||||
|
||||
|
||||
|
||||
@@ -151,7 +151,8 @@ use tracing::{info, instrument, trace, warn};
|
||||
use super::RoomEventCache;
|
||||
use super::{EventCache, EventCacheError, EventCacheInner, EventsOrigin, RoomEventCacheUpdate};
|
||||
use crate::{
|
||||
Room,
|
||||
Client, Room,
|
||||
encryption::backups::BackupState,
|
||||
event_cache::{RoomEventCacheGenericUpdate, RoomEventCacheLinkedChunkUpdate},
|
||||
room::PushContext,
|
||||
};
|
||||
@@ -189,6 +190,11 @@ pub enum RedecryptorReport {
|
||||
/// The redecryptor might have missed some room keys so it might not have
|
||||
/// re-decrypted events that are now decryptable.
|
||||
Lagging,
|
||||
/// A room key backup has become available.
|
||||
///
|
||||
/// This means that components might want to tell R2D2 about events they
|
||||
/// care about to attempt a decryption.
|
||||
BackupAvailable,
|
||||
}
|
||||
|
||||
pub(super) struct RedecryptorChannels {
|
||||
@@ -625,6 +631,14 @@ impl EventCache {
|
||||
/// // it which events we care about, i.e. which events we're displaying to the
|
||||
/// // user, and let it redecrypt things with an explicit request.
|
||||
/// }
|
||||
/// RedecryptorReport::BackupAvailable => {
|
||||
/// // A backup has become available. We can, just like in the Lagging case, tell
|
||||
/// // the event cache to attempt to redecrypt some events.
|
||||
/// //
|
||||
/// // This is only necessary with the BackupDownloadStrategy::OnDecryptionFailure
|
||||
/// // as the decryption attempt in this case will trigger the download of the
|
||||
/// // room key from the backup.
|
||||
/// }
|
||||
/// RedecryptorReport::ResolvedUtds { .. } => {
|
||||
/// // This may be interesting for statistical reasons or in case we'd like to
|
||||
/// // fetch and inspect these events in some manner.
|
||||
@@ -645,13 +659,12 @@ fn upgrade_event_cache(cache: &Weak<EventCacheInner>) -> Option<EventCache> {
|
||||
cache.upgrade().map(|inner| EventCache { inner })
|
||||
}
|
||||
|
||||
fn report_lag(cache: &Weak<EventCacheInner>) -> Result<(), ()> {
|
||||
fn send_report(cache: &Weak<EventCacheInner>, report: RedecryptorReport) -> Result<(), ()> {
|
||||
let Some(cache) = upgrade_event_cache(cache) else {
|
||||
return Err(());
|
||||
};
|
||||
|
||||
let message = RedecryptorReport::Lagging;
|
||||
let _ = cache.inner.redecryption_channels.utd_reporter.send(message);
|
||||
let _ = cache.inner.redecryption_channels.utd_reporter.send(report);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
@@ -672,11 +685,13 @@ impl Redecryptor {
|
||||
/// This creates a task that listens to various streams and attempts to
|
||||
/// redecrypt UTDs that can be found inside the [`EventCache`].
|
||||
pub(super) fn new(
|
||||
client: &Client,
|
||||
cache: Weak<EventCacheInner>,
|
||||
receiver: UnboundedReceiver<DecryptionRetryRequest>,
|
||||
linked_chunk_update_sender: &Sender<RoomEventCacheLinkedChunkUpdate>,
|
||||
) -> Self {
|
||||
let linked_chunk_stream = BroadcastStream::new(linked_chunk_update_sender.subscribe());
|
||||
let backup_state_stream = client.encryption().backups().state_stream();
|
||||
|
||||
let task = spawn(async {
|
||||
let request_redecryption_stream = UnboundedReceiverStream::new(receiver);
|
||||
@@ -685,6 +700,7 @@ impl Redecryptor {
|
||||
cache,
|
||||
request_redecryption_stream,
|
||||
linked_chunk_stream,
|
||||
backup_state_stream,
|
||||
)
|
||||
.await;
|
||||
})
|
||||
@@ -718,6 +734,9 @@ impl Redecryptor {
|
||||
events_stream: &mut Pin<
|
||||
&mut impl Stream<Item = Result<RoomEventCacheLinkedChunkUpdate, BroadcastStreamRecvError>>,
|
||||
>,
|
||||
backup_state_stream: &mut Pin<
|
||||
&mut impl Stream<Item = Result<BackupState, BroadcastStreamRecvError>>,
|
||||
>,
|
||||
) -> bool {
|
||||
let Some((room_key_stream, withheld_stream)) =
|
||||
Self::subscribe_to_room_key_stream(cache).await
|
||||
@@ -794,7 +813,7 @@ impl Redecryptor {
|
||||
// user.
|
||||
warn!("The room key stream lagged, reporting the lag to our listeners");
|
||||
|
||||
if report_lag(cache).is_err() {
|
||||
if send_report(cache, RedecryptorReport::Lagging).is_err() {
|
||||
break false;
|
||||
}
|
||||
},
|
||||
@@ -848,7 +867,35 @@ impl Redecryptor {
|
||||
);
|
||||
}
|
||||
Err(_) => {
|
||||
if report_lag(cache).is_err() {
|
||||
if send_report(cache, RedecryptorReport::Lagging).is_err() {
|
||||
break false;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
Some(backup_state_update) = backup_state_stream.next() => {
|
||||
match backup_state_update {
|
||||
Ok(state) => {
|
||||
match state {
|
||||
BackupState::Unknown |
|
||||
BackupState::Creating |
|
||||
BackupState::Enabling |
|
||||
BackupState::Resuming |
|
||||
BackupState::Downloading |
|
||||
BackupState::Disabling =>{
|
||||
// Those states aren't particularly interesting to components
|
||||
// listening to R2D2 reports.
|
||||
}
|
||||
BackupState::Enabled => {
|
||||
if send_report(cache, RedecryptorReport::BackupAvailable).is_err() {
|
||||
break false;
|
||||
}
|
||||
|
||||
}
|
||||
}
|
||||
}
|
||||
Err(_) => {
|
||||
if send_report(cache, RedecryptorReport::Lagging).is_err() {
|
||||
break false;
|
||||
}
|
||||
}
|
||||
@@ -863,21 +910,28 @@ impl Redecryptor {
|
||||
cache: Weak<EventCacheInner>,
|
||||
decryption_request_stream: UnboundedReceiverStream<DecryptionRetryRequest>,
|
||||
events_stream: BroadcastStream<RoomEventCacheLinkedChunkUpdate>,
|
||||
backup_state_stream: impl Stream<Item = Result<BackupState, BroadcastStreamRecvError>>,
|
||||
) {
|
||||
// We pin the decryption request stream here since that one doesn't need to be
|
||||
// recreated and we don't want to miss messages coming from the stream
|
||||
// while recreating it unnecessarily.
|
||||
pin_mut!(decryption_request_stream);
|
||||
pin_mut!(events_stream);
|
||||
pin_mut!(backup_state_stream);
|
||||
|
||||
while Self::redecryption_loop(&cache, &mut decryption_request_stream, &mut events_stream)
|
||||
.await
|
||||
while Self::redecryption_loop(
|
||||
&cache,
|
||||
&mut decryption_request_stream,
|
||||
&mut events_stream,
|
||||
&mut backup_state_stream,
|
||||
)
|
||||
.await
|
||||
{
|
||||
info!("Regenerating the re-decryption streams");
|
||||
|
||||
// Report that the stream got recreated so listeners can attempt to redecrypt
|
||||
// any UTDs they might be seeing.
|
||||
if report_lag(&cache).is_err() {
|
||||
if send_report(&cache, RedecryptorReport::Lagging).is_err() {
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user