fix(sdk): Rewrite decryption retrying to fix invalid index bug

Previously, it was possible for us to use invalid indices when:

- We retried decrypting multiple events at once
- One of them (not the last) was an edit or reaction

This lead to a situation where we would remove the UTD item once
decryption for it was successfully retried, but not account for the
resulting index shift for all later timeline items.
This commit is contained in:
Jonas Platte
2023-02-21 12:49:04 +01:00
committed by Jonas Platte
parent 02a213c1b5
commit b1062a67e0
4 changed files with 167 additions and 47 deletions

View File

@@ -5,6 +5,7 @@ Fo = "Fo"
BA = "BA"
UE = "UE"
Ure = "Ure"
OFO = "OFO"
Ot = "Ot"
# This is the thead html tag, remove this once typos is updated in the github
# action. 1.3.1 seems to work correctly, while 1.11.0 on the CI seems to get

View File

@@ -181,6 +181,7 @@ pub(super) enum TimelineItemPosition {
#[derive(Default)]
pub(super) struct HandleEventResult {
pub(super) item_added: bool,
pub(super) item_removed: bool,
pub(super) items_updated: u16,
}
@@ -319,6 +320,7 @@ impl<'a> TimelineEventHandler<'a> {
// wouldn't normally be visible. Remove it.
trace!("Removing UTD that was successfully retried");
self.items.remove(idx);
self.result.item_removed = true;
}
// TODO: Add event as raw

View File

@@ -35,7 +35,11 @@ use ruma::{
EventId, MilliSecondsSinceUnixEpoch, OwnedEventId, OwnedTransactionId, OwnedUserId, RoomId,
TransactionId, UserId,
};
use tracing::{debug, error, field::debug, info, warn};
use tracing::{
debug, error,
field::{self, debug},
info, info_span, warn, Instrument as _,
};
#[cfg(feature = "e2e-encryption")]
use tracing::{instrument, trace};
@@ -312,6 +316,7 @@ impl<P: ProfileProvider> TimelineInner<P> {
) {
use super::EncryptedMessage;
trace!("Retrying decryption");
let should_retry = |session_id: &str| {
if let Some(session_ids) = &session_ids {
session_ids.contains(session_id)
@@ -320,63 +325,64 @@ impl<P: ProfileProvider> TimelineInner<P> {
}
};
let mut state = self.state.lock().await;
let retry_one = |item: Arc<TimelineItem>| {
async move {
let event_item = item.as_event()?;
trace!("Collecting UTD timeline items");
let utds_for_session: Vec<_> = state
.items
.iter()
.enumerate()
.filter_map(|(idx, item)| {
let event_item = &item.as_event()?;
let utd = event_item.content().as_unable_to_decrypt()?;
match utd {
let session_id = match event_item.content().as_unable_to_decrypt()? {
EncryptedMessage::MegolmV1AesSha2 { session_id, .. }
if should_retry(session_id) =>
{
let EventTimelineItem::Remote(
RemoteEventTimelineItem { event_id, raw, .. },
) = event_item else {
error!("Key for unable-to-decrypt timeline item is not an event ID");
return None;
};
Some((idx, event_id.to_owned(), session_id.to_owned(), raw.clone()))
session_id
}
EncryptedMessage::MegolmV1AesSha2 { .. }
| EncryptedMessage::OlmV1Curve25519AesSha2 { .. }
| EncryptedMessage::Unknown => None,
}
})
.collect();
| EncryptedMessage::Unknown => return None,
};
if utds_for_session.is_empty() {
trace!("Found no events to retry decryption for");
return;
}
tracing::Span::current().record("session_id", session_id);
debug!("Retrying decryption for {} items", utds_for_session.len());
for (idx, event_id, session_id, utd) in utds_for_session {
let event = match olm_machine.decrypt_room_event(utd.cast_ref(), room_id).await {
Ok(ev) => ev,
Err(e) => {
info!(
?event_id,
?session_id,
"Failed to decrypt event after receiving room key: {e}"
);
continue;
let EventTimelineItem::Remote(
RemoteEventTimelineItem { event_id, raw, .. },
) = event_item else {
error!("Key for unable-to-decrypt timeline item is not an event ID");
return None;
};
tracing::Span::current().record("event_id", debug(event_id));
let raw = raw.cast_ref();
match olm_machine.decrypt_room_event(raw, room_id).await {
Ok(event) => {
trace!("Successfully decrypted event that previously failed to decrypt");
Some(event)
}
Err(e) => {
info!("Failed to decrypt event after receiving room key: {e}");
None
}
}
}
.instrument(info_span!(
"retry_one",
session_id = field::Empty,
event_id = field::Empty
))
};
let mut state = self.state.lock().await;
// We loop through all the items in the timeline, if we successfully
// decrypt a UTD item we either replace it or remove it and update
// another one.
let mut idx = 0;
while let Some(item) = state.items.get(idx) {
let Some(event) = retry_one(item.clone()).await else {
idx += 1;
continue;
};
trace!(
?event_id,
?session_id,
"Successfully decrypted event that previously failed to decrypt"
);
handle_remote_event(
let result = handle_remote_event(
event.event.cast(),
event.encryption_info,
TimelineItemPosition::Update(idx),
@@ -384,6 +390,12 @@ impl<P: ProfileProvider> TimelineInner<P> {
&self.profile_provider,
)
.await;
// If the UTD was removed rather than updated, run the loop again
// with the same index.
if !result.item_removed {
idx += 1;
}
}
}

View File

@@ -1,6 +1,6 @@
#![cfg(not(target_arch = "wasm32"))]
use std::{io::Cursor, iter};
use std::{collections::BTreeSet, io::Cursor, iter};
use assert_matches::assert_matches;
use eyeball_im::VectorDiff;
@@ -206,3 +206,108 @@ async fn retry_edit_decryption() {
assert!(msg.is_edited());
assert_eq!(msg.body(), "This is Error");
}
#[async_test]
async fn retry_edit_and_more() {
const DEVICE_ID: &str = "MTEGRRVPEN";
const SENDER_KEY: &str = "NFPM2+ucU3n3sEdbDdwwv48Bsj4AiQ185lGuRFjy+gs";
const SESSION_ID: &str = "SMNh04luorH5E8J3b4XYuOBFp8dldO5njacq0OFO70o";
const SESSION_KEY: &[u8] = b"\
-----BEGIN MEGOLM SESSION DATA-----\n\
AXT1CtOfPgmZRXEk4st3ZwIGShWtZ6iDW0+fwku7AIonAAAACr31UJxAbryf6bH3eF5y+WrOipWmZ6G/59A3\
kuCwntIOrdIC5ShTRWo0qmcWHav2TaFBCx7kWFUs1ryFZjzksCB7sRnVhfXsDUgGGKgj0MOESlPH9Px+IOcV\
B6Dr9rjj2STtapCknlit9FMrOcfQhsV5q+ymZwm1C32Zc3UTEtyxfpXiIVyru4Xsrzti61fDIiWFj7Mie4Wn\
7YQ8SQ1Q9CZUnOCzflP4Yw+5cXHwMRDcz7/kIPzczCYILLp89G//Uh8QN25tN+oCPhBmTxMxoHhabEwkZ/rK\
D1T+jXDK/dClfXqDXxjjAhQpcUI0soWeAGEq8nMEE5J2D/42AOpKVYqfq2GPiGoPQk3suy4GtDJQlXZaFuz/\
l4fmHwB1CJCxMUlgpRJ4PhRHAfJn9zfiskM19/dj/G9foGt8KQBRnnbxDVM4eYuoMJZn7SaQfXFmybBTY+Z/\
bYGg9FUKn/LyjYc8jqbyXCnddzCHB+YENwEOP3WQQrZccyvjuTv5oB/TqK4yS90phIvkLlqEyJXKxxPnzAvV\
CArjU7naYXMeVieMqcntbeaXutLftLUIF7KUUCPu357sTKjaAp8z98YfPZBctrHRrx7Oo2t6Wtph0A5N/NwA\
dSN2ceRzRzkoupc4FCxvH6o6PmmtD9DfxtZsk+HA+8NQhgFpvm/VYalikckW+wGFxB4nn1nVViS4GN5n8fc/\
Ug\n\
-----END MEGOLM SESSION DATA-----";
fn encrypted_message(ciphertext: &str) -> RoomEncryptedEventContent {
RoomEncryptedEventContent::new(
EncryptedEventScheme::MegolmV1AesSha2(
MegolmV1AesSha2ContentInit {
ciphertext: ciphertext.into(),
sender_key: SENDER_KEY.into(),
device_id: DEVICE_ID.into(),
session_id: SESSION_ID.into(),
}
.into(),
),
None,
)
}
let timeline = TestTimeline::new();
timeline
.handle_live_message_event(
&BOB,
encrypted_message(
"AwgDEoABQsTrPTYDh22PTmfODR9EucX3qLl3buDcahHPjKJA8QIM+wW0s+e08Zi7/JbLdnZL1VL\
jO47HcRhxDTyHZPXPg8wd1l0Qb3irjnCnS7LFAc98+ko18CFJUGNeRZZwzGiorKK5VLMv0WQZI8\
mBZdKIaqDTUBFvcvbn2gQaWtUipQdJQRKyv2h0AWveVkv75lp5hRb7jolCi08oMX8cM+V3Zzyi7\
mlPAzZjDz0PaRbQwfbMTTHkcL7TZybBi4vLX4f5ZR2Iiysc7gw",
),
)
.await;
let event_id =
timeline.inner.items().await[1].as_event().unwrap().event_id().unwrap().to_owned();
let msg2 = encrypted_message(
"AwgEErABt7svMEHDYJTjCQEHypR21l34f9IZLNyFaAbI+EiCIN7C8X5iKmkzuYSmGUodyGKbFRYrW9l5dLj\
35xIRli3SZ6duZpmBI7D4pBGPj2T2Jkc/I9kd/I4EhpvV2emDTioB7jwUfFoATfdA0z/6ciTmU73PStKHZM\
+WYNxCWZERsCQBtiINzC80FymwLjh4nBhnyW0nlMihGGasakn+3wKQUY0HkVoFM8TXQlCXl1RM2oxL9nn0C\
dRu2LPArXc5K/1GBSyfluSrdQuA9DciLwVHJB9NwvbZ/7flIkaOC7ahahmk2ws+QeSz8MmHt+9QityK3ZUB\
4uEzsQ0",
);
timeline
.handle_live_message_event(
&BOB,
assign!(msg2, { relates_to: Some(Relation::Replacement(Replacement::new(event_id))) }),
)
.await;
timeline
.handle_live_message_event(
&BOB,
encrypted_message(
"AwgFEoABUAwzBLYStHEa1RaZtojePQ6sue9terXNMFufeLKci/UcpOpZC9o3lDxp9rxlNjk4Ii+\
fkOeSClib/qxt+wLszeQZVa04bRr6byK1dOhlptvAPjUCcEsaHyMMR1AnjT2vmFlJRGviwN6cvQ\
2r/fEvAW/9QB+N6fX4g9729bt5ftXRqa5QI7NA351RNUveRHxVvx+2x0WJArQjYGRk7tMS2rUto\
IYt2ZY17nE1UJjN7M87STnCF9c9qy4aGNqIpeVIht6XbtgD7gQ",
),
)
.await;
assert_eq!(timeline.inner.items().await.len(), 4);
let olm_machine = OlmMachine::new(user_id!("@jptest:matrix.org"), DEVICE_ID.into()).await;
let keys = decrypt_room_key_export(Cursor::new(SESSION_KEY), "testing").unwrap();
olm_machine.import_room_keys(keys, false, |_, _| {}).await.unwrap();
timeline
.inner
.retry_event_decryption(
room_id!("!wFnAUSQbxMcfIMgvNX:flipdot.org"),
&olm_machine,
Some(BTreeSet::from_iter([SESSION_ID])),
)
.await;
let timeline_items = timeline.inner.items().await;
assert_eq!(timeline_items.len(), 3);
assert!(timeline_items[0].is_day_divider());
assert_eq!(
timeline_items[1].as_event().unwrap().content().as_message().unwrap().body(),
"edited"
);
assert_eq!(
timeline_items[2].as_event().unwrap().content().as_message().unwrap().body(),
"Another message"
);
}