ui: Spawn a background task for decryption retrying

This commit is contained in:
Jonas Platte
2023-07-06 12:30:03 +02:00
committed by Jonas Platte
parent c600c64b95
commit 07a82c841f
9 changed files with 153 additions and 109 deletions

View File

@@ -23,3 +23,13 @@ pub mod timeline;
#[cfg(feature = "experimental-room-list")]
pub use self::room_list_service::RoomListService;
pub use self::timeline::Timeline;
#[cfg(all(test, not(target_arch = "wasm32")))]
#[ctor::ctor]
fn init_logging() {
use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt};
tracing_subscriber::registry()
.with(tracing_subscriber::EnvFilter::from_default_env())
.with(tracing_subscriber::fmt::layer().with_test_writer())
.init();
}

View File

@@ -111,7 +111,7 @@ impl TimelineBuilder {
.await
{
Ok(Some(read_receipt)) => {
inner.set_initial_user_receipt(ReceiptType::Read, read_receipt);
inner.set_initial_user_receipt(ReceiptType::Read, read_receipt).await;
}
Err(e) => {
error!("Failed to get public read receipt of own user from the store: {e}");
@@ -128,7 +128,9 @@ impl TimelineBuilder {
.await
{
Ok(Some(private_read_receipt)) => {
inner.set_initial_user_receipt(ReceiptType::ReadPrivate, private_read_receipt);
inner
.set_initial_user_receipt(ReceiptType::ReadPrivate, private_read_receipt)
.await;
}
Err(e) => {
error!("Failed to get private read receipt of own user from the store: {e}");
@@ -144,7 +146,6 @@ impl TimelineBuilder {
inner.load_fully_read_event().await;
}
let inner = Arc::new(inner);
let room = inner.room();
let client = room.client();

View File

@@ -69,9 +69,9 @@ use super::{
};
use crate::events::SyncTimelineEventWithoutContent;
#[derive(Debug)]
#[derive(Clone, Debug)]
pub(super) struct TimelineInner<P: RoomDataProvider = room::Common> {
state: Mutex<TimelineInnerState>,
state: Arc<Mutex<TimelineInnerState>>,
room_data_provider: P,
track_read_receipts: bool,
}
@@ -128,7 +128,7 @@ impl<P: RoomDataProvider> TimelineInner<P> {
items: ObservableVector::with_capacity(32),
..Default::default()
};
Self { state: Mutex::new(state), room_data_provider, track_read_receipts: false }
Self { state: Arc::new(Mutex::new(state)), room_data_provider, track_read_receipts: false }
}
pub(super) fn with_read_receipt_tracking(mut self, track_read_receipts: bool) -> Self {
@@ -280,14 +280,15 @@ impl<P: RoomDataProvider> TimelineInner<P> {
Ok(result)
}
pub(super) fn set_initial_user_receipt(
pub(super) async fn set_initial_user_receipt(
&mut self,
receipt_type: ReceiptType,
receipt: (OwnedEventId, Receipt),
) {
let own_user_id = self.room_data_provider.own_user_id().to_owned();
self.state
.get_mut()
.lock()
.await
.users_read_receipts
.entry(own_user_id)
.or_default()
@@ -302,7 +303,7 @@ impl<P: RoomDataProvider> TimelineInner<P> {
debug!("Adding {} initial events", events.len());
let state = self.state.get_mut();
let mut state = self.state.lock().await;
for event in events {
state
.handle_remote_event(
@@ -699,34 +700,32 @@ impl<P: RoomDataProvider> TimelineInner<P> {
pub(super) async fn retry_event_decryption(
&self,
room: &room::Common,
session_ids: Option<BTreeSet<&str>>,
session_ids: Option<BTreeSet<String>>,
) {
self.retry_event_decryption_inner(room, session_ids).await
self.retry_event_decryption_inner(room.to_owned(), session_ids).await
}
#[cfg(all(test, feature = "e2e-encryption"))]
pub(super) async fn retry_event_decryption_test(
&self,
room_id: &RoomId,
olm_machine: &OlmMachine,
session_ids: Option<BTreeSet<&str>>,
olm_machine: OlmMachine,
session_ids: Option<BTreeSet<String>>,
) {
self.retry_event_decryption_inner((olm_machine, room_id), session_ids).await
self.retry_event_decryption_inner((olm_machine, room_id.to_owned()), session_ids).await
}
#[cfg(feature = "e2e-encryption")]
async fn retry_event_decryption_inner(
&self,
decryptor: impl Decryptor,
session_ids: Option<BTreeSet<&str>>,
session_ids: Option<BTreeSet<String>>,
) {
use super::EncryptedMessage;
trace!("Retrying decryption");
let mut state = self.state.clone().lock_owned().await;
let push_rules_context = self.room_data_provider.push_rules_and_context().await;
let should_retry = |session_id: &str| {
let should_retry = move |session_id: &str| {
if let Some(session_ids) = &session_ids {
session_ids.contains(session_id)
} else {
@@ -734,82 +733,112 @@ impl<P: RoomDataProvider> TimelineInner<P> {
}
};
let retry_one = |item: Arc<TimelineItem>| {
async move {
let event_item = item.as_event()?;
let retry_indices: Vec<_> = state
.items
.iter()
.enumerate()
.filter_map(|(idx, item)| match item.as_event()?.content().as_unable_to_decrypt()? {
EncryptedMessage::MegolmV1AesSha2 { session_id, .. }
if should_retry(session_id) =>
{
Some(idx)
}
EncryptedMessage::MegolmV1AesSha2 { .. }
| EncryptedMessage::OlmV1Curve25519AesSha2 { .. }
| EncryptedMessage::Unknown => None,
})
.collect();
let session_id = match event_item.content().as_unable_to_decrypt()? {
EncryptedMessage::MegolmV1AesSha2 { session_id, .. }
if should_retry(session_id) =>
{
session_id
}
EncryptedMessage::MegolmV1AesSha2 { .. }
| EncryptedMessage::OlmV1Curve25519AesSha2 { .. }
| EncryptedMessage::Unknown => return None,
};
if retry_indices.is_empty() {
return;
}
tracing::Span::current().record("session_id", session_id);
debug!("Retrying decryption");
let Some(remote_event) = event_item.as_remote() else {
error!("Key for unable-to-decrypt timeline item is not an event ID");
return None;
};
let track_read_receipts = self.track_read_receipts;
let room_data_provider = self.room_data_provider.clone();
let push_rules_context = room_data_provider.push_rules_and_context().await;
tracing::Span::current().record("event_id", debug(&remote_event.event_id));
matrix_sdk::executor::spawn(async move {
let retry_one = |item: Arc<TimelineItem>| {
let decryptor = decryptor.clone();
let should_retry = &should_retry;
async move {
let event_item = item.as_event()?;
match decryptor.decrypt_event_impl(&remote_event.original_json).await {
Ok(event) => {
trace!("Successfully decrypted event that previously failed to decrypt");
Some(event)
}
Err(e) => {
info!("Failed to decrypt event after receiving room key: {e}");
None
let session_id = match event_item.content().as_unable_to_decrypt()? {
EncryptedMessage::MegolmV1AesSha2 { session_id, .. }
if should_retry(session_id) =>
{
session_id
}
EncryptedMessage::MegolmV1AesSha2 { .. }
| EncryptedMessage::OlmV1Curve25519AesSha2 { .. }
| EncryptedMessage::Unknown => return None,
};
tracing::Span::current().record("session_id", session_id);
let Some(remote_event) = event_item.as_remote() else {
error!("Key for unable-to-decrypt timeline item is not an event ID");
return None;
};
tracing::Span::current().record("event_id", debug(&remote_event.event_id));
match decryptor.decrypt_event_impl(&remote_event.original_json).await {
Ok(event) => {
trace!(
"Successfully decrypted event that previously failed to decrypt"
);
Some(event)
}
Err(e) => {
info!("Failed to decrypt event after receiving room key: {e}");
None
}
}
}
}
.instrument(info_span!(
"retry_one",
session_id = field::Empty,
event_id = field::Empty
))
};
let mut state = self.state.lock().await;
// We loop through all the items in the timeline, if we successfully
// decrypt a UTD item we either replace it or remove it and update
// another one.
let mut idx = 0;
while let Some(item) = state.items.get(idx) {
let Some(mut event) = retry_one(item.clone()).await else {
idx += 1;
continue;
.instrument(info_span!(
"retry_one",
session_id = field::Empty,
event_id = field::Empty
))
};
event.push_actions = push_rules_context
.as_ref()
.map(|(push_rules, push_context)| {
push_rules.get_actions(&event.event, push_context).to_owned()
})
.unwrap_or_default();
// Loop through all the indices, in order so we don't decrypt edits
// before the event being edited, if both were UTD. Keep track of
// index change as UTDs are removed instead of updated.
let mut offset = 0;
for idx in retry_indices {
let idx = idx - offset;
let Some(mut event) = retry_one(state.items[idx].clone()).await else {
continue;
};
let result = state
.handle_remote_event(
event.into(),
TimelineItemPosition::Update(idx),
&self.room_data_provider,
self.track_read_receipts,
)
.await;
event.push_actions = push_rules_context
.as_ref()
.map(|(push_rules, push_context)| {
push_rules.get_actions(&event.event, push_context).to_owned()
})
.unwrap_or_default();
// If the UTD was removed rather than updated, run the loop again
// with the same index.
if !result.item_removed {
idx += 1;
let result = state
.handle_remote_event(
event.into(),
TimelineItemPosition::Update(idx),
&room_data_provider,
track_read_receipts,
)
.await;
// If the UTD was removed rather than updated, offset all
// subsequent loop iterations.
if result.item_removed {
offset += 1;
}
}
}
});
}
pub(super) async fn set_sender_profiles_pending(&self) {

View File

@@ -97,7 +97,7 @@ const DEFAULT_SANITIZER_MODE: HtmlSanitizerMode = HtmlSanitizerMode::Compat;
/// messages.
#[derive(Debug)]
pub struct Timeline {
inner: Arc<TimelineInner<room::Common>>,
inner: TimelineInner<room::Common>,
start_token: Arc<Mutex<Option<String>>>,
start_token_condvar: Arc<Condvar>,
@@ -266,14 +266,14 @@ impl Timeline {
/// # anyhow::Ok(()) };
/// ```
#[cfg(feature = "e2e-encryption")]
pub async fn retry_decryption<'a, S: AsRef<str> + 'a>(
&'a self,
session_ids: impl IntoIterator<Item = &'a S>,
pub async fn retry_decryption<S: Into<String>>(
&self,
session_ids: impl IntoIterator<Item = S>,
) {
self.inner
.retry_event_decryption(
self.room(),
Some(session_ids.into_iter().map(AsRef::as_ref).collect()),
Some(session_ids.into_iter().map(Into::into).collect()),
)
.await;
}

View File

@@ -44,7 +44,7 @@ pub(super) struct LocalMessage {
#[instrument(skip_all, fields(room_id = ?room.room_id()))]
pub(super) async fn send_queued_messages(
timeline_inner: Arc<TimelineInner>,
timeline_inner: TimelineInner,
room: room::Common,
mut msg_receiver: Receiver<LocalMessage>,
) {
@@ -100,7 +100,7 @@ async fn handle_message(
room: room::Common,
send_task: &mut SendMessageTask,
queue: &mut VecDeque<LocalMessage>,
timeline_inner: &Arc<TimelineInner>,
timeline_inner: &TimelineInner,
) {
if queue.is_empty() && send_task.is_idle() {
match Room::from(room) {
@@ -129,7 +129,7 @@ async fn handle_task_ready(
result: SendMessageResult,
send_task: &mut SendMessageTask,
queue: &mut VecDeque<LocalMessage>,
timeline_inner: &Arc<TimelineInner>,
timeline_inner: &TimelineInner,
) {
match result {
SendMessageResult::Success { room } => {
@@ -198,7 +198,7 @@ impl SendMessageTask {
matches!(self, Self::Idle)
}
fn start(&mut self, room: room::Joined, timeline_inner: Arc<TimelineInner>, msg: LocalMessage) {
fn start(&mut self, room: room::Joined, timeline_inner: TimelineInner, msg: LocalMessage) {
debug!("Spawning message-sending task");
let txn_id = msg.txn_id.clone();
let join_handle = spawn(async move {

View File

@@ -14,7 +14,7 @@
#![cfg(not(target_arch = "wasm32"))]
use std::{collections::BTreeSet, io::Cursor, iter};
use std::{io::Cursor, iter};
use assert_matches::assert_matches;
use eyeball_im::VectorDiff;
@@ -101,8 +101,8 @@ async fn retry_message_decryption() {
.inner
.retry_event_decryption_test(
room_id!("!DovneieKSTkdHKpIXy:morpheus.localhost"),
&olm_machine,
Some(iter::once(SESSION_ID).collect()),
olm_machine,
Some(iter::once(SESSION_ID.to_owned()).collect()),
)
.await;
@@ -192,6 +192,9 @@ async fn retry_edit_decryption() {
)
.await;
let items = timeline.inner.items().await;
assert_eq!(items.len(), 3);
let mut keys = decrypt_room_key_export(Cursor::new(SESSION1_KEY), "1234").unwrap();
keys.extend(decrypt_room_key_export(Cursor::new(SESSION2_KEY), "1234").unwrap());
@@ -203,7 +206,7 @@ async fn retry_edit_decryption() {
.inner
.retry_event_decryption_test(
room_id!("!bdsREiCPHyZAPkpXer:morpheus.localhost"),
&olm_machine,
olm_machine,
None,
)
.await;
@@ -306,8 +309,8 @@ async fn retry_edit_and_more() {
.inner
.retry_event_decryption_test(
room_id!("!wFnAUSQbxMcfIMgvNX:flipdot.org"),
&olm_machine,
Some(BTreeSet::from_iter([SESSION_ID])),
olm_machine,
Some(iter::once(SESSION_ID.to_owned()).collect()),
)
.await;
@@ -391,8 +394,8 @@ async fn retry_message_decryption_highlighted() {
.inner
.retry_event_decryption_test(
room_id!("!rYtFvMGENJleNQVJzb:matrix.org"),
&olm_machine,
Some(iter::once(SESSION_ID).collect()),
olm_machine,
Some(iter::once(SESSION_ID.to_owned()).collect()),
)
.await;

View File

@@ -367,6 +367,7 @@ impl TestTimeline {
}
}
#[derive(Clone)]
struct TestRoomDataProvider;
#[async_trait]

View File

@@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use std::{iter, sync::Arc};
use std::iter;
use matrix_sdk::{event_handler::EventHandler, Client};
use ruma::{
@@ -24,7 +24,7 @@ use tracing::{debug_span, error, trace, Instrument};
use super::inner::TimelineInner;
pub(super) fn handle_room_key_event(
inner: Arc<TimelineInner>,
inner: TimelineInner,
room_id: OwnedRoomId,
) -> impl EventHandler<ToDeviceRoomKeyEvent, (Client,)> {
move |event: ToDeviceRoomKeyEvent, client: Client| {
@@ -40,7 +40,7 @@ pub(super) fn handle_room_key_event(
}
pub(super) fn handle_forwarded_room_key_event(
inner: Arc<TimelineInner>,
inner: TimelineInner,
room_id: OwnedRoomId,
) -> impl EventHandler<ToDeviceForwardedRoomKeyEvent, (Client,)> {
move |event: ToDeviceForwardedRoomKeyEvent, client: Client| {
@@ -57,7 +57,7 @@ pub(super) fn handle_forwarded_room_key_event(
async fn retry_decryption(
client: Client,
inner: Arc<TimelineInner>,
inner: TimelineInner,
room_id: OwnedRoomId,
event_room_id: OwnedRoomId,
session_id: String,
@@ -75,5 +75,5 @@ async fn retry_decryption(
return;
};
inner.retry_event_decryption(&room, Some(iter::once(session_id.as_str()).collect())).await;
inner.retry_event_decryption(&room, Some(iter::once(session_id).collect())).await;
}

View File

@@ -47,7 +47,7 @@ impl RoomExt for room::Common {
}
#[async_trait]
pub(super) trait RoomDataProvider {
pub(super) trait RoomDataProvider: Clone + Send + Sync + 'static {
fn own_user_id(&self) -> &UserId;
async fn profile(&self, user_id: &UserId) -> Option<Profile>;
async fn read_receipts_for_event(&self, event_id: &EventId) -> IndexMap<OwnedUserId, Receipt>;
@@ -115,13 +115,13 @@ impl RoomDataProvider for room::Common {
// object, which is annoying to create for testing and not really needed
#[cfg(feature = "e2e-encryption")]
#[async_trait]
pub(super) trait Decryptor: Copy {
pub(super) trait Decryptor: Clone + Send + Sync + 'static {
async fn decrypt_event_impl(&self, raw: &Raw<AnySyncTimelineEvent>) -> Result<TimelineEvent>;
}
#[cfg(feature = "e2e-encryption")]
#[async_trait]
impl Decryptor for &room::Common {
impl Decryptor for room::Common {
async fn decrypt_event_impl(&self, raw: &Raw<AnySyncTimelineEvent>) -> Result<TimelineEvent> {
self.decrypt_event(raw.cast_ref()).await
}
@@ -129,7 +129,7 @@ impl Decryptor for &room::Common {
#[cfg(all(test, feature = "e2e-encryption"))]
#[async_trait]
impl Decryptor for (&matrix_sdk_base::crypto::OlmMachine, &ruma::RoomId) {
impl Decryptor for (matrix_sdk_base::crypto::OlmMachine, ruma::OwnedRoomId) {
async fn decrypt_event_impl(&self, raw: &Raw<AnySyncTimelineEvent>) -> Result<TimelineEvent> {
let (olm_machine, room_id) = self;
let event = olm_machine.decrypt_room_event(raw.cast_ref(), room_id).await?;