mirror of
https://github.com/matrix-org/matrix-rust-sdk.git
synced 2026-05-05 06:28:20 -04:00
feat(timeline): Listen to the room keys stream to retry decryptions
This commit is contained in:
@@ -23,6 +23,7 @@ use matrix_sdk::{
|
||||
};
|
||||
use ruma::{events::AnySyncTimelineEvent, RoomVersionId};
|
||||
use tokio::sync::broadcast::error::RecvError;
|
||||
use tokio_stream::wrappers::errors::BroadcastStreamRecvError;
|
||||
use tracing::{info, info_span, trace, warn, Instrument, Span};
|
||||
|
||||
use super::{
|
||||
@@ -433,6 +434,47 @@ impl TimelineBuilder {
|
||||
})
|
||||
};
|
||||
|
||||
// TODO: Technically, this should be the only stream we need to listen to get
|
||||
// notified when we should retry to decrypt an event. We sadly can't do that,
|
||||
// since the cross-process support kills the `OlmMachine` which then in
|
||||
// turn kills this stream. Once this is solved remove all the other ways we
|
||||
// listen for room keys.
|
||||
let room_keys_received_join_handle = {
|
||||
let inner = controller.clone();
|
||||
let stream = client.encryption().room_keys_received_stream().await.expect(
|
||||
"We should be logged in by now, so we should have access to an OlmMachine \
|
||||
to be able to listen to this stream",
|
||||
);
|
||||
|
||||
spawn(async move {
|
||||
pin_mut!(stream);
|
||||
|
||||
while let Some(room_keys) = stream.next().await {
|
||||
let session_ids = match room_keys {
|
||||
Ok(room_keys) => {
|
||||
let session_ids: BTreeSet<String> = room_keys
|
||||
.into_iter()
|
||||
.filter(|info| info.room_id == inner.room().room_id())
|
||||
.map(|info| info.session_id)
|
||||
.collect();
|
||||
|
||||
Some(session_ids)
|
||||
}
|
||||
Err(BroadcastStreamRecvError::Lagged(missed_updates)) => {
|
||||
// We lagged, let's retry to decrypt anything we have, maybe something
|
||||
// was received.
|
||||
warn!(missed_updates, "The room keys stream has lagged, retrying to decrypt the whole timeline");
|
||||
|
||||
None
|
||||
}
|
||||
};
|
||||
|
||||
let room = inner.room();
|
||||
inner.retry_event_decryption(room, session_ids).await;
|
||||
}
|
||||
})
|
||||
};
|
||||
|
||||
let timeline = Timeline {
|
||||
controller,
|
||||
event_cache: room_event_cache,
|
||||
@@ -443,6 +485,7 @@ impl TimelineBuilder {
|
||||
pinned_events_join_handle,
|
||||
room_key_from_backups_join_handle,
|
||||
room_key_backup_enabled_join_handle,
|
||||
room_keys_received_join_handle,
|
||||
local_echo_listener_handle,
|
||||
_event_cache_drop_handle: event_cache_drop,
|
||||
encryption_changes_handle,
|
||||
|
||||
@@ -832,6 +832,7 @@ struct TimelineDropHandle {
|
||||
room_update_join_handle: JoinHandle<()>,
|
||||
pinned_events_join_handle: Option<JoinHandle<()>>,
|
||||
room_key_from_backups_join_handle: JoinHandle<()>,
|
||||
room_keys_received_join_handle: JoinHandle<()>,
|
||||
room_key_backup_enabled_join_handle: JoinHandle<()>,
|
||||
local_echo_listener_handle: JoinHandle<()>,
|
||||
_event_cache_drop_handle: Arc<EventCacheDropHandles>,
|
||||
@@ -852,6 +853,7 @@ impl Drop for TimelineDropHandle {
|
||||
self.room_update_join_handle.abort();
|
||||
self.room_key_from_backups_join_handle.abort();
|
||||
self.room_key_backup_enabled_join_handle.abort();
|
||||
self.room_keys_received_join_handle.abort();
|
||||
self.encryption_changes_handle.abort();
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user