feat: Redecryptor start to send out redecryptor reports

This commit is contained in:
Damir Jelić
2025-10-07 16:08:16 +02:00
parent 4ed239351a
commit a2f89e85b9
2 changed files with 137 additions and 22 deletions

View File

@@ -208,6 +208,9 @@ impl EventCache {
linked_chunk_update_sender.clone(),
)));
#[cfg(feature = "e2e-encryption")]
let redecryption_channels = redecryptor::RedecryptorChannels::new();
Self {
inner: Arc::new(EventCacheInner {
client,
@@ -221,6 +224,8 @@ impl EventCache {
_thread_subscriber_task: thread_subscriber_task,
#[cfg(feature = "experimental-search")]
_search_indexing_task: search_indexing_task,
#[cfg(feature = "e2e-encryption")]
redecryption_channels,
thread_subscriber_receiver,
}),
}
@@ -266,7 +271,18 @@ impl EventCache {
));
#[cfg(feature = "e2e-encryption")]
let redecryptor = redecryptor::Redecryptor::new(Arc::downgrade(&self.inner));
let redecryptor = {
let receiver = self
.inner
.redecryption_channels
.decryption_request_receiver
.lock()
.take()
.expect("We should have initialized the channel an subscribing should happen only once");
redecryptor::Redecryptor::new(Arc::downgrade(&self.inner), receiver)
};
Arc::new(EventCacheDropHandles {
listen_updates_task,
@@ -853,6 +869,9 @@ struct EventCacheInner {
/// This is helpful for tests to coordinate that a new thread subscription
/// has been sent or not.
thread_subscriber_receiver: Receiver<()>,
#[cfg(feature = "e2e-encryption")]
redecryption_channels: redecryptor::RedecryptorChannels,
}
type AutoShrinkChannelPayload = OwnedRoomId;

View File

@@ -72,25 +72,75 @@ use matrix_sdk_base::{BaseClient, crypto::OlmMachine};
use matrix_sdk_base::{
crypto::{store::types::RoomKeyInfo, types::events::room::encrypted::EncryptedEvent},
deserialized_responses::{DecryptedRoomEvent, TimelineEvent, TimelineEventKind},
locks::Mutex,
};
#[cfg(doc)]
use matrix_sdk_common::deserialized_responses::EncryptionInfo;
use matrix_sdk_common::executor::{JoinHandle, spawn};
use ruma::{OwnedEventId, OwnedRoomId, RoomId, events::AnySyncTimelineEvent, serde::Raw};
use tokio::sync::mpsc::UnboundedSender;
use tokio_stream::wrappers::{UnboundedReceiverStream, errors::BroadcastStreamRecvError};
use tokio::sync::{
broadcast,
mpsc::{UnboundedReceiver, UnboundedSender, unbounded_channel},
};
use tokio_stream::wrappers::{
BroadcastStream, UnboundedReceiverStream, errors::BroadcastStreamRecvError,
};
use tracing::{info, instrument, trace, warn};
use crate::event_cache::{
EventCache, EventCacheError, EventCacheInner, EventsOrigin, RoomEventCacheUpdate,
};
type SessionId<'a> = &'a str;
type OwnedSessionId = String;
/// The information sent across the channel to the long-running task requesting
/// that the supplied set of sessions be retried.
#[derive(Debug, Clone)]
pub struct DecryptionRetryRequest {
room_id: OwnedRoomId,
session_ids: BTreeSet<String>,
/// The room ID of the room the events belong to.
pub room_id: OwnedRoomId,
/// Events that are not decrypted.
pub utd_session_ids: BTreeSet<OwnedSessionId>,
/// Events that are decrypted but might need to have their
/// [`EncryptionInfo`] refreshed.
pub refresh_info_session_ids: BTreeSet<OwnedSessionId>,
}
type SessionId<'a> = &'a str;
/// A report coming from the redecryptor.
#[derive(Debug, Clone)]
pub enum RedecryptorReport {
/// Events which we were able to decrypt.
ResolvedUtds {
/// The room ID of the room the events belong to.
room_id: OwnedRoomId,
/// The list of event IDs of the decrypted events.
events: BTreeSet<OwnedEventId>,
},
/// The redecryptor might have missed some room keys so it might not have
/// re-decrypted events that are now decryptable.
Lagging,
}
pub(super) struct RedecryptorChannels {
utd_reporter: broadcast::Sender<RedecryptorReport>,
pub(super) decryption_request_sender: UnboundedSender<DecryptionRetryRequest>,
pub(super) decryption_request_receiver:
Mutex<Option<UnboundedReceiver<DecryptionRetryRequest>>>,
}
impl RedecryptorChannels {
pub(super) fn new() -> Self {
let (utd_reporter, _) = broadcast::channel(100);
let (decryption_request_sender, decryption_request_receiver) = unbounded_channel();
Self {
utd_reporter,
decryption_request_sender,
decryption_request_receiver: Mutex::new(Some(decryption_request_receiver)),
}
}
}
impl EventCache {
/// Retrieve a set of events that we weren't able to decrypt.
@@ -148,7 +198,7 @@ impl EventCache {
let (room_cache, _drop_handles) = self.for_room(room_id).await?;
let mut state = room_cache.inner.state.write().await;
let event_ids: Vec<_> = events.iter().map(|(event_id, _)| event_id).collect();
let event_ids: BTreeSet<_> = events.iter().cloned().map(|(event_id, _)| event_id).collect();
trace!(?event_ids, "Replacing successfully re-decrypted events");
@@ -174,6 +224,14 @@ impl EventCache {
origin: EventsOrigin::Cache,
});
// We report that we resolved some UTDs, this is mainly for listeners that don't
// care about the actual events, just about the fact that UTDs got
// resolved. Not sure if we'll have more such listeners but the UTD hook
// is one such thing.
let report =
RedecryptorReport::ResolvedUtds { room_id: room_id.to_owned(), events: event_ids };
let _ = self.inner.redecryption_channels.utd_reporter.send(report);
Ok(())
}
@@ -230,10 +288,28 @@ impl EventCache {
Ok(())
}
/// Explicitly request the redecryption of a set of events.
///
/// TODO: Explain when and why this might be useful.
pub fn request_decryption(&self, request: DecryptionRetryRequest) {
let _ =
self.inner.redecryption_channels.decryption_request_sender.send(request).inspect_err(
|_| warn!("Requesting a decryption while the redecryption task has been shut down"),
);
}
/// Subscribe to reports that the redecryptor generates.
///
/// TODO: Explain when the redecryptor might send such reports.
pub fn subscrube_to_decryption_reports(
&self,
) -> impl Stream<Item = Result<RedecryptorReport, BroadcastStreamRecvError>> {
BroadcastStream::new(self.inner.redecryption_channels.utd_reporter.subscribe())
}
}
pub(crate) struct Redecryptor {
request_decryption_sender: UnboundedSender<DecryptionRetryRequest>,
task: JoinHandle<()>,
}
@@ -248,23 +324,17 @@ impl Redecryptor {
///
/// This creates a task that listens to various streams and attempts to
/// redecrypt UTDs that can be found inside the [`EventCache`].
pub(super) fn new(cache: Weak<EventCacheInner>) -> Self {
let (request_decryption_sender, receiver) = tokio::sync::mpsc::unbounded_channel();
pub(super) fn new(
cache: Weak<EventCacheInner>,
receiver: UnboundedReceiver<DecryptionRetryRequest>,
) -> Self {
let task = spawn(async {
let request_redecryption_stream = UnboundedReceiverStream::new(receiver);
Self::listen_for_room_keys_task(cache, request_redecryption_stream).await;
});
Self { task, request_decryption_sender }
}
#[allow(dead_code)]
pub(super) fn request_decryption(&self, request: DecryptionRetryRequest) {
let _ = self.request_decryption_sender.send(request).inspect_err(|_| {
warn!("Requesting a decryption while the redecryption task has been shut down")
});
Self { task }
}
/// (Re)-subscribe to the room key stream from the [`OlmMachine`].
@@ -304,18 +374,23 @@ impl Redecryptor {
break false;
};
for session_id in request.session_ids {
for session_id in request.utd_session_ids {
let _ = cache
.retry_decryption(&request.room_id, &session_id)
.await
.inspect_err(|e| warn!("Error redecrypting after an explicit request was received {e:?}"));
}
// TODO: Deal with encryption info updating as well.
}
// The room key stream from the OlmMachine. Needs to be recreated every time we
// receive a `None` from the stream.
room_keys = room_key_stream.next() => {
match room_keys {
Some(Ok(room_keys)) => {
// Alright, some room keys were received and persisted in our store,
// let's attempt to redecrypt events that were encrypted using these
// room keys.
let Some(cache) = Self::upgrade_event_cache(cache) else {
break false;
};
@@ -326,9 +401,21 @@ impl Redecryptor {
.await
.inspect_err(|e| warn!("Error redecrypting {e:?}"));
}
// TODO: Deal with encryption info updating as well.
},
Some(Err(_)) => {
todo!("Handle lagging here, how?")
// We missed some room keys, we need to report this in case a listener
// has and idea which UTDs we should attempt to redecrypt.
//
// This would most likely be the timeline. The timeline might attempt
// to redecrypt all UTDs it is showing to the user.
let Some(cache) = Self::upgrade_event_cache(cache) else {
break false;
};
let message = RedecryptorReport::Lagging;
let _ = cache.inner.redecryption_channels.utd_reporter.send(message);
},
// The stream got closed, this could mean that our OlmMachine got
// regenerated, let's return true and try to recreate the stream.
@@ -352,7 +439,16 @@ impl Redecryptor {
pin_mut!(decryption_request_stream);
while Self::redecryption_loop(&cache, &mut decryption_request_stream).await {
info!("Regenerating the re-decryption streams")
info!("Regenerating the re-decryption streams");
let Some(cache) = Self::upgrade_event_cache(&cache) else {
break;
};
// Report that the stream got recreated so listeners can attempt to redecrypt
// any UTDs they might be seeing.
let message = RedecryptorReport::Lagging;
let _ = cache.inner.redecryption_channels.utd_reporter.send(message);
}
info!("Shutting down the event cache redecryptor");