refactor(timeline): Move finding retry indices into DecryptionRetryTask

This commit is contained in:
Andy Balaam
2025-02-24 16:08:46 +00:00
committed by Andy Balaam
parent a0282ec71b
commit c74ecff3f0
2 changed files with 46 additions and 43 deletions

View File

@@ -16,11 +16,7 @@ use std::{collections::BTreeSet, sync::Arc};
use matrix_sdk::deserialized_responses::TimelineEventKind as SdkTimelineEventKind;
use tokio::sync::RwLock;
use tracing::{
error,
field::{self, debug},
info, info_span, Instrument as _,
};
use tracing::{debug, error, field, info, info_span, Instrument as _};
use crate::timeline::{
controller::{TimelineSettings, TimelineState},
@@ -50,6 +46,48 @@ impl<P: RoomDataProvider> DecryptionRetryTask<P> {
&self,
decryptor: impl Decryptor,
session_ids: Option<BTreeSet<String>>,
) {
let state = self.state.clone().read_owned().await;
let should_retry = |session_id: &str| {
if let Some(session_ids) = &session_ids {
session_ids.contains(session_id)
} else {
true
}
};
let retry_indices: Vec<_> = state
.items
.iter()
.enumerate()
.filter_map(|(idx, item)| match item.as_event()?.content().as_unable_to_decrypt()? {
EncryptedMessage::MegolmV1AesSha2 { session_id, .. }
if should_retry(session_id) =>
{
Some(idx)
}
EncryptedMessage::MegolmV1AesSha2 { .. }
| EncryptedMessage::OlmV1Curve25519AesSha2 { .. }
| EncryptedMessage::Unknown => None,
})
.collect();
if retry_indices.is_empty() {
return;
}
drop(state);
debug!("Retrying decryption");
self.decrypt_by_index(decryptor, session_ids, retry_indices).await;
}
async fn decrypt_by_index(
&self,
decryptor: impl Decryptor,
session_ids: Option<BTreeSet<String>>,
retry_indices: Vec<usize>,
) {
let should_retry = move |session_id: &str| {
@@ -93,7 +131,8 @@ impl<P: RoomDataProvider> DecryptionRetryTask<P> {
return None;
};
tracing::Span::current().record("event_id", debug(&remote_event.event_id));
tracing::Span::current()
.record("event_id", field::debug(&remote_event.event_id));
let Some(original_json) = &remote_event.original_json else {
error!("UTD item must contain original JSON");

View File

@@ -1062,49 +1062,13 @@ impl<P: RoomDataProvider> TimelineController<P> {
decryptor: impl Decryptor,
session_ids: Option<BTreeSet<String>>,
) {
use super::EncryptedMessage;
let state = self.state.clone().read_owned().await;
let should_retry = |session_id: &str| {
if let Some(session_ids) = &session_ids {
session_ids.contains(session_id)
} else {
true
}
};
let retry_indices: Vec<_> = state
.items
.iter()
.enumerate()
.filter_map(|(idx, item)| match item.as_event()?.content().as_unable_to_decrypt()? {
EncryptedMessage::MegolmV1AesSha2 { session_id, .. }
if should_retry(session_id) =>
{
Some(idx)
}
EncryptedMessage::MegolmV1AesSha2 { .. }
| EncryptedMessage::OlmV1Curve25519AesSha2 { .. }
| EncryptedMessage::Unknown => None,
})
.collect();
if retry_indices.is_empty() {
return;
}
drop(state);
debug!("Retrying decryption");
let decryption_retry_task = DecryptionRetryTask::new(
self.state.clone(),
self.settings.clone(),
self.room_data_provider.clone(),
);
decryption_retry_task.decrypt(decryptor, session_ids, retry_indices).await;
decryption_retry_task.decrypt(decryptor, session_ids).await;
}
pub(super) async fn set_sender_profiles_pending(&self) {