Revert "ui: Use new room_key stream for retrying decryption in Timeline"

This reverts commit 06600ac53f.
This commit is contained in:
Jonas Platte
2023-06-28 11:37:45 +02:00
parent 06600ac53f
commit b53eba5ec9
3 changed files with 112 additions and 52 deletions

View File

@@ -12,21 +12,19 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use std::{iter, pin::pin, sync::Arc};
use std::sync::Arc;
use async_std::sync::Mutex;
use futures_util::StreamExt;
use imbl::Vector;
use matrix_sdk::{
deserialized_responses::SyncTimelineEvent, executor::spawn, room, sync::RoomUpdate, Client,
};
use ruma::{
events::receipt::{ReceiptThread, ReceiptType},
OwnedRoomId,
deserialized_responses::SyncTimelineEvent, executor::spawn, room, sync::RoomUpdate,
};
use ruma::events::receipt::{ReceiptThread, ReceiptType};
use tokio::sync::{broadcast, mpsc};
use tracing::{error, info, warn};
#[cfg(feature = "e2e-encryption")]
use super::to_device::{handle_forwarded_room_key_event, handle_room_key_event};
use super::{inner::TimelineInner, queue::send_queued_messages, Timeline, TimelineDropHandle};
/// Builder that allows creating and configuring various parts of a
@@ -191,30 +189,24 @@ impl TimelineBuilder {
}
});
// Not using room.add_event_handler here because RoomKey events are
// to-device events that are not received in the context of a room.
#[cfg(feature = "e2e-encryption")]
let room_key_update_join_handle = {
let inner = inner.clone();
match client.encryption().room_keys_for_room_received_stream(room.room_id()).await {
Some(stream) => spawn(async move {
let mut stream = pin!(stream);
while let Some(vec) = stream.next().await {
for room_key_info in vec {
retry_decryption(
client.clone(),
inner.clone(),
room_key_info.room_id,
room_key_info.session_id,
)
.await;
}
}
}),
None => {
error!("Can't listen for room key updates, OlmMachine not set up");
spawn(async move {})
}
}
};
let room_key_handle = client
.add_event_handler(handle_room_key_event(inner.clone(), room.room_id().to_owned()));
#[cfg(feature = "e2e-encryption")]
let forwarded_room_key_handle = client.add_event_handler(handle_forwarded_room_key_event(
inner.clone(),
room.room_id().to_owned(),
));
let handles = vec![
#[cfg(feature = "e2e-encryption")]
room_key_handle,
#[cfg(feature = "e2e-encryption")]
forwarded_room_key_handle,
];
let (msg_sender, msg_receiver) = mpsc::channel(1);
if !read_only {
@@ -229,9 +221,9 @@ impl TimelineBuilder {
_end_token: Mutex::new(None),
msg_sender,
drop_handle: Arc::new(TimelineDropHandle {
client,
event_handler_handles: handles,
room_update_join_handle,
#[cfg(feature = "e2e-encryption")]
room_key_update_join_handle,
}),
};
@@ -250,18 +242,3 @@ impl TimelineBuilder {
timeline
}
}
#[tracing::instrument(skip(client, inner))]
async fn retry_decryption(
client: Client,
inner: Arc<TimelineInner>,
room_id: OwnedRoomId,
session_id: String,
) {
let Some(room) = client.get_room(&room_id) else {
error!("Failed to fetch room object");
return;
};
inner.retry_event_decryption(&room, Some(iter::once(session_id.as_str()).collect())).await;
}

View File

@@ -24,9 +24,10 @@ use futures_core::Stream;
use imbl::Vector;
use matrix_sdk::{
attachment::AttachmentConfig,
event_handler::EventHandlerHandle,
executor::JoinHandle,
room::{self, Joined, MessagesOptions, Receipts, Room},
Result,
Client, Result,
};
use mime::Mime;
use pin_project_lite::pin_project;
@@ -59,6 +60,8 @@ mod read_receipts;
mod sliding_sync_ext;
#[cfg(test)]
mod tests;
#[cfg(feature = "e2e-encryption")]
mod to_device;
mod traits;
mod virtual_item;
@@ -649,16 +652,17 @@ impl Timeline {
#[derive(Debug)]
struct TimelineDropHandle {
client: Client,
event_handler_handles: Vec<EventHandlerHandle>,
room_update_join_handle: JoinHandle<()>,
#[cfg(feature = "e2e-encryption")]
room_key_update_join_handle: JoinHandle<()>,
}
impl Drop for TimelineDropHandle {
fn drop(&mut self) {
for handle in self.event_handler_handles.drain(..) {
self.client.remove_event_handler(handle);
}
self.room_update_join_handle.abort();
#[cfg(feature = "e2e-encryption")]
self.room_key_update_join_handle.abort();
}
}

View File

@@ -0,0 +1,79 @@
// Copyright 2023 The Matrix.org Foundation C.I.C.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::{iter, sync::Arc};
use matrix_sdk::{event_handler::EventHandler, Client};
use ruma::{
events::{forwarded_room_key::ToDeviceForwardedRoomKeyEvent, room_key::ToDeviceRoomKeyEvent},
OwnedRoomId,
};
use tracing::{debug_span, error, trace, Instrument};
use super::inner::TimelineInner;
pub(super) fn handle_room_key_event(
inner: Arc<TimelineInner>,
room_id: OwnedRoomId,
) -> impl EventHandler<ToDeviceRoomKeyEvent, (Client,)> {
move |event: ToDeviceRoomKeyEvent, client: Client| {
let inner = inner.clone();
let room_id = room_id.clone();
async move {
let event_room_id = event.content.room_id;
let session_id = event.content.session_id;
retry_decryption(client, inner, room_id, event_room_id, session_id).await;
}
.instrument(debug_span!("handle_room_key_event"))
}
}
pub(super) fn handle_forwarded_room_key_event(
inner: Arc<TimelineInner>,
room_id: OwnedRoomId,
) -> impl EventHandler<ToDeviceForwardedRoomKeyEvent, (Client,)> {
move |event: ToDeviceForwardedRoomKeyEvent, client: Client| {
let inner = inner.clone();
let room_id = room_id.clone();
async move {
let event_room_id = event.content.room_id;
let session_id = event.content.session_id;
retry_decryption(client, inner, room_id, event_room_id, session_id).await;
}
.instrument(debug_span!("handle_forwarded_room_key_event"))
}
}
async fn retry_decryption(
client: Client,
inner: Arc<TimelineInner>,
room_id: OwnedRoomId,
event_room_id: OwnedRoomId,
session_id: String,
) {
if event_room_id != room_id {
trace!(
?event_room_id, timeline_room_id = ?room_id, ?session_id,
"Received to-device room key event for a different room, ignoring"
);
return;
}
let Some(room) = client.get_room(&room_id) else {
error!("Failed to fetch room object");
return;
};
inner.retry_event_decryption(&room, Some(iter::once(session_id.as_str()).collect())).await;
}