fix(sliding_sync): Add ignore_verification_requests method to SlidingSyncBuilder

This can be used to ignore verification requests in this sliding sync instance, preventing issues found where several sliding sync instances with the same client process events simultaneously and re-process the same verification request events during their initial syncs.
This commit is contained in:
Jorge Martín
2025-02-21 10:19:04 +01:00
parent 2eb2ae7959
commit ae8b54cf45
6 changed files with 157 additions and 74 deletions

View File

@@ -409,6 +409,7 @@ impl BaseClient {
limited: bool,
events: Vec<Raw<AnySyncTimelineEvent>>,
ignore_state_events: bool,
ignore_verification_requests: bool,
prev_batch: Option<String>,
push_rules: &Ruleset,
user_ids: &mut BTreeSet<OwnedUserId>,
@@ -496,13 +497,18 @@ impl BaseClient {
SyncMessageLikeEvent::Original(original_event),
) => match &original_event.content.msgtype {
MessageType::VerificationRequest(_) => {
Box::pin(self.handle_verification_event(e, room.room_id()))
.await?;
if !ignore_verification_requests {
Box::pin(self.handle_verification_event(e, room.room_id()))
.await?;
}
}
_ => (),
},
_ if e.event_type().to_string().starts_with("m.key.verification") => {
Box::pin(self.handle_verification_event(e, room.room_id())).await?;
if !ignore_verification_requests {
Box::pin(self.handle_verification_event(e, room.room_id()))
.await?;
}
}
_ => (),
},
@@ -1039,6 +1045,7 @@ impl BaseClient {
new_info.timeline.limited,
new_info.timeline.events,
false,
false,
new_info.timeline.prev_batch,
&push_rules,
&mut user_ids,
@@ -1134,6 +1141,7 @@ impl BaseClient {
new_info.timeline.limited,
new_info.timeline.events,
false,
false,
new_info.timeline.prev_batch,
&push_rules,
&mut user_ids,

View File

@@ -124,11 +124,14 @@ impl BaseClient {
/// sync.
/// * `previous_events_provider` - Timeline events prior to the current
/// sync.
/// * `ignore_verification_requests` - Whether verification requests found
/// in the response should be ignored.
#[instrument(skip_all, level = "trace")]
pub async fn process_sliding_sync<PEP: PreviousEventsProvider>(
&self,
response: &http::Response,
previous_events_provider: &PEP,
ignore_verification_requests: bool,
) -> Result<SyncResponse> {
let http::Response {
// FIXME not yet supported by sliding sync. see
@@ -187,6 +190,7 @@ impl BaseClient {
&mut room_info_notable_updates,
&mut notifications,
&mut ambiguity_cache,
ignore_verification_requests,
)
.await?;
@@ -358,6 +362,7 @@ impl BaseClient {
room_info_notable_updates: &mut BTreeMap<OwnedRoomId, RoomInfoNotableUpdateReasons>,
notifications: &mut BTreeMap<OwnedRoomId, Vec<Notification>>,
ambiguity_cache: &mut AmbiguityCache,
ignore_verification_requests: bool,
) -> Result<(
RoomInfo,
Option<JoinedRoomUpdate>,
@@ -440,6 +445,7 @@ impl BaseClient {
room_data.limited,
room_data.timeline.clone(),
true,
ignore_verification_requests,
room_data.prev_batch.clone(),
&push_rules,
&mut user_ids,
@@ -915,8 +921,10 @@ mod tests {
}),
);
let sync_response =
client.process_sliding_sync(&response, &()).await.expect("Failed to process sync");
let sync_response = client
.process_sliding_sync(&response, &(), false)
.await
.expect("Failed to process sync");
// Check it's present in the response.
let room = sync_response.rooms.join.get(room_id).unwrap();
@@ -931,7 +939,10 @@ mod tests {
async fn test_can_process_empty_sliding_sync_response() {
let client = logged_in_base_client(None).await;
let empty_response = http::Response::new("5".to_owned());
client.process_sliding_sync(&empty_response, &()).await.expect("Failed to process sync");
client
.process_sliding_sync(&empty_response, &(), false)
.await
.expect("Failed to process sync");
}
#[async_test]
@@ -945,8 +956,10 @@ mod tests {
let mut room = http::response::Room::new();
room.joined_count = Some(uint!(41));
let response = response_with_room(room_id, room);
let sync_resp =
client.process_sliding_sync(&response, &()).await.expect("Failed to process sync");
let sync_resp = client
.process_sliding_sync(&response, &(), false)
.await
.expect("Failed to process sync");
// Then the room appears in the client (with the same joined count)
let client_room = client.get_room(room_id).expect("No room found");
@@ -971,8 +984,10 @@ mod tests {
let mut room = http::response::Room::new();
room.name = Some("little room".to_owned());
let response = response_with_room(room_id, room);
let sync_resp =
client.process_sliding_sync(&response, &()).await.expect("Failed to process sync");
let sync_resp = client
.process_sliding_sync(&response, &(), false)
.await
.expect("Failed to process sync");
// No m.room.name event, no heroes, no members => considered an empty room!
let client_room = client.get_room(room_id).expect("No room found");
@@ -1001,7 +1016,7 @@ mod tests {
set_room_name(&mut room, user_id!("@a:b.c"), "The Name".to_owned());
let response = response_with_room(room_id, room);
client.process_sliding_sync(&response, &()).await.expect("Failed to process sync");
client.process_sliding_sync(&response, &(), false).await.expect("Failed to process sync");
// The name is known.
let client_room = client.get_room(room_id).expect("No room found");
@@ -1023,8 +1038,10 @@ mod tests {
set_room_invited(&mut room, inviter, user_id);
room.name = Some("name from sliding sync response".to_owned());
let response = response_with_room(room_id, room);
let sync_resp =
client.process_sliding_sync(&response, &()).await.expect("Failed to process sync");
let sync_resp = client
.process_sliding_sync(&response, &(), false)
.await
.expect("Failed to process sync");
// Then the room doesn't have the name in the client.
let client_room = client.get_room(room_id).expect("No room found");
@@ -1060,7 +1077,7 @@ mod tests {
set_room_name(&mut room, user_id!("@a:b.c"), "The Name".to_owned());
let response = response_with_room(room_id, room);
client.process_sliding_sync(&response, &()).await.expect("Failed to process sync");
client.process_sliding_sync(&response, &(), false).await.expect("Failed to process sync");
// The name is known.
let client_room = client.get_room(room_id).expect("No room found");
@@ -1081,7 +1098,7 @@ mod tests {
set_room_knocked(&mut room, &user_id);
let response = response_with_room(room_id, room);
client.process_sliding_sync(&response, &()).await.expect("Failed to process sync");
client.process_sliding_sync(&response, &(), false).await.expect("Failed to process sync");
// The room is knocked.
let client_room = client.get_room(room_id).expect("No room found");
@@ -1101,7 +1118,7 @@ mod tests {
set_room_knocked(&mut room, user_id);
let response = response_with_room(room_id, room);
client.process_sliding_sync(&response, &()).await.expect("Failed to process sync");
client.process_sliding_sync(&response, &(), false).await.expect("Failed to process sync");
// The room is invited since the membership event doesn't belong to the current
// user.
@@ -1133,7 +1150,7 @@ mod tests {
room.invite_state = Some(vec![event]);
let response = response_with_room(room_id, room);
client.process_sliding_sync(&response, &()).await.expect("Failed to process sync");
client.process_sliding_sync(&response, &(), false).await.expect("Failed to process sync");
// The room is marked as invited.
let client_room = client.get_room(room_id).expect("No room found");
@@ -1151,15 +1168,17 @@ mod tests {
let mut room = http::response::Room::new();
set_room_joined(&mut room, user_id);
let response = response_with_room(room_id, room);
client.process_sliding_sync(&response, &()).await.expect("Failed to process sync");
client.process_sliding_sync(&response, &(), false).await.expect("Failed to process sync");
assert_eq!(client.get_room(room_id).unwrap().state(), RoomState::Joined);
// And then leave with a `required_state` state event…
let mut room = http::response::Room::new();
set_room_left(&mut room, user_id);
let response = response_with_room(room_id, room);
let sync_resp =
client.process_sliding_sync(&response, &()).await.expect("Failed to process sync");
let sync_resp = client
.process_sliding_sync(&response, &(), false)
.await
.expect("Failed to process sync");
// The room is left.
assert_eq!(client.get_room(room_id).unwrap().state(), RoomState::Left);
@@ -1183,7 +1202,10 @@ mod tests {
let mut room = http::response::Room::new();
set_room_joined(&mut room, user_a_id);
let response = response_with_room(room_id, room);
client.process_sliding_sync(&response, &()).await.expect("Failed to process sync");
client
.process_sliding_sync(&response, &(), false)
.await
.expect("Failed to process sync");
assert_eq!(client.get_room(room_id).unwrap().state(), RoomState::Joined);
// And then get kicked/banned with a `required_state` state event…
@@ -1195,8 +1217,10 @@ mod tests {
None,
));
let response = response_with_room(room_id, room);
let sync_resp =
client.process_sliding_sync(&response, &()).await.expect("Failed to process sync");
let sync_resp = client
.process_sliding_sync(&response, &(), false)
.await
.expect("Failed to process sync");
match membership {
MembershipState::Leave => {
@@ -1229,14 +1253,14 @@ mod tests {
let mut room = http::response::Room::new();
set_room_joined(&mut room, user_id);
let response = response_with_room(room_id, room);
client.process_sliding_sync(&response, &()).await.expect("Failed to process sync");
client.process_sliding_sync(&response, &(), false).await.expect("Failed to process sync");
assert_eq!(client.get_room(room_id).unwrap().state(), RoomState::Joined);
// And then leave with a `timeline` state event…
let mut room = http::response::Room::new();
set_room_left_as_timeline_event(&mut room, user_id);
let response = response_with_room(room_id, room);
client.process_sliding_sync(&response, &()).await.expect("Failed to process sync");
client.process_sliding_sync(&response, &(), false).await.expect("Failed to process sync");
// The room is NOT left because state events from `timeline` must be IGNORED!
assert_eq!(client.get_room(room_id).unwrap().state(), RoomState::Joined);
@@ -1255,7 +1279,7 @@ mod tests {
let mut room = http::response::Room::new();
set_room_joined(&mut room, user_id);
let response = response_with_room(room_id, room);
client.process_sliding_sync(&response, &()).await.expect("Failed to process sync");
client.process_sliding_sync(&response, &(), false).await.expect("Failed to process sync");
// (sanity: state is join)
assert_eq!(client.get_room(room_id).unwrap().state(), RoomState::Joined);
@@ -1263,7 +1287,7 @@ mod tests {
let mut room = http::response::Room::new();
set_room_left(&mut room, user_id);
let response = response_with_room(room_id, room);
client.process_sliding_sync(&response, &()).await.expect("Failed to process sync");
client.process_sliding_sync(&response, &(), false).await.expect("Failed to process sync");
// (sanity: state is left)
assert_eq!(client.get_room(room_id).unwrap().state(), RoomState::Left);
@@ -1271,7 +1295,7 @@ mod tests {
let mut room = http::response::Room::new();
set_room_invited(&mut room, user_id, user_id);
let response = response_with_room(room_id, room);
client.process_sliding_sync(&response, &()).await.expect("Failed to process sync");
client.process_sliding_sync(&response, &(), false).await.expect("Failed to process sync");
// Then the room is in the invite state
assert_eq!(client.get_room(room_id).unwrap().state(), RoomState::Invited);
@@ -1388,7 +1412,7 @@ mod tests {
room
};
let response = response_with_room(room_id, room);
client.process_sliding_sync(&response, &()).await.expect("Failed to process sync");
client.process_sliding_sync(&response, &(), false).await.expect("Failed to process sync");
// Then the room in the client has the avatar
let client_room = client.get_room(room_id).expect("No room found");
@@ -1414,7 +1438,7 @@ mod tests {
room
};
let response = response_with_room(room_id, room);
client.process_sliding_sync(&response, &()).await.expect("Failed to process sync");
client.process_sliding_sync(&response, &(), false).await.expect("Failed to process sync");
// Then the room in the client has the avatar
let client_room = client.get_room(room_id).expect("No room found");
@@ -1428,7 +1452,7 @@ mod tests {
// When I send sliding sync response containing no avatar.
let room = http::response::Room::new();
let response = response_with_room(room_id, room);
client.process_sliding_sync(&response, &()).await.expect("Failed to process sync");
client.process_sliding_sync(&response, &(), false).await.expect("Failed to process sync");
// Then the room in the client still has the avatar
let client_room = client.get_room(room_id).expect("No room found");
@@ -1447,7 +1471,7 @@ mod tests {
room
};
let response = response_with_room(room_id, room);
client.process_sliding_sync(&response, &()).await.expect("Failed to process sync");
client.process_sliding_sync(&response, &(), false).await.expect("Failed to process sync");
// Then the room in the client has no more avatar
let client_room = client.get_room(room_id).expect("No room found");
@@ -1464,7 +1488,7 @@ mod tests {
// When I send sliding sync response containing a room with an avatar
let room = room_with_avatar(mxc_uri!("mxc://e.uk/med1"), user_id);
let response = response_with_room(room_id, room);
client.process_sliding_sync(&response, &()).await.expect("Failed to process sync");
client.process_sliding_sync(&response, &(), false).await.expect("Failed to process sync");
// Then the room in the client has the avatar
let client_room = client.get_room(room_id).expect("No room found");
@@ -1485,8 +1509,10 @@ mod tests {
let mut room = http::response::Room::new();
set_room_invited(&mut room, user_id, user_id);
let response = response_with_room(room_id, room);
let sync_resp =
client.process_sliding_sync(&response, &()).await.expect("Failed to process sync");
let sync_resp = client
.process_sliding_sync(&response, &(), false)
.await
.expect("Failed to process sync");
// Then the room is added to the client
let client_room = client.get_room(room_id).expect("No room found");
@@ -1509,7 +1535,7 @@ mod tests {
let mut room = room_with_avatar(mxc_uri!("mxc://e.uk/med1"), user_id);
set_room_invited(&mut room, user_id, user_id);
let response = response_with_room(room_id, room);
client.process_sliding_sync(&response, &()).await.expect("Failed to process sync");
client.process_sliding_sync(&response, &(), false).await.expect("Failed to process sync");
// Then the room in the client has the avatar
let client_room = client.get_room(room_id).expect("No room found");
@@ -1532,7 +1558,7 @@ mod tests {
let mut room = room_with_canonical_alias(room_alias_id, user_id);
set_room_invited(&mut room, user_id, user_id);
let response = response_with_room(room_id, room);
client.process_sliding_sync(&response, &()).await.expect("Failed to process sync");
client.process_sliding_sync(&response, &(), false).await.expect("Failed to process sync");
// Then the room in the client has the avatar
let client_room = client.get_room(room_id).expect("No room found");
@@ -1552,7 +1578,7 @@ mod tests {
let mut room = room_with_canonical_alias(room_alias_id, user_id);
room.name = Some("This came from the server".to_owned());
let response = response_with_room(room_id, room);
client.process_sliding_sync(&response, &()).await.expect("Failed to process sync");
client.process_sliding_sync(&response, &(), false).await.expect("Failed to process sync");
// Then the room's name is NOT overridden by the server-computed display name.
let client_room = client.get_room(room_id).expect("No room found");
@@ -1581,8 +1607,10 @@ mod tests {
}),
]);
let response = response_with_room(room_id, room);
let _sync_resp =
client.process_sliding_sync(&response, &()).await.expect("Failed to process sync");
let _sync_resp = client
.process_sliding_sync(&response, &(), false)
.await
.expect("Failed to process sync");
// Then the room appears in the client.
let client_room = client.get_room(room_id).expect("No room found");
@@ -1631,7 +1659,7 @@ mod tests {
let events = &[event_a, event_b.clone()];
let room = room_with_timeline(events);
let response = response_with_room(room_id, room);
client.process_sliding_sync(&response, &()).await.expect("Failed to process sync");
client.process_sliding_sync(&response, &(), false).await.expect("Failed to process sync");
// Then the room holds the latest event
let client_room = client.get_room(room_id).expect("No room found");
@@ -1675,7 +1703,7 @@ mod tests {
let mut room = room_with_timeline(events);
room.required_state.push(Raw::new(&power_levels).unwrap().cast());
let response = response_with_room(room_id, room);
client.process_sliding_sync(&response, &()).await.expect("Failed to process sync");
client.process_sliding_sync(&response, &(), false).await.expect("Failed to process sync");
// Then the room holds the latest knock state event
let client_room = client.get_room(room_id).expect("No room found");
@@ -1720,7 +1748,7 @@ mod tests {
let mut room = room_with_timeline(events);
room.required_state.push(Raw::new(&power_levels).unwrap().cast());
let response = response_with_room(room_id, room);
client.process_sliding_sync(&response, &()).await.expect("Failed to process sync");
client.process_sliding_sync(&response, &(), false).await.expect("Failed to process sync");
// Then the room doesn't hold the knock state event as the latest event
let client_room = client.get_room(room_id).expect("No room found");
@@ -1747,7 +1775,7 @@ mod tests {
let events = &[join_event];
let room = room_with_timeline(events);
let response = response_with_room(room_id, room);
client.process_sliding_sync(&response, &()).await.expect("Failed to process sync");
client.process_sliding_sync(&response, &(), false).await.expect("Failed to process sync");
// Then the room doesn't hold the join state event as the latest event
let client_room = client.get_room(room_id).expect("No room found");
@@ -1770,7 +1798,7 @@ mod tests {
// When the sliding sync response contains a timeline
let room = room_with_timeline(&[event_a]);
let response = response_with_room(room_id, room);
client.process_sliding_sync(&response, &()).await.expect("Failed to process sync");
client.process_sliding_sync(&response, &(), false).await.expect("Failed to process sync");
// Then the room holds the latest event
let client_room = client.get_room(room_id).expect("No room found");
@@ -1791,7 +1819,7 @@ mod tests {
// When a redaction for that event is received
let room = room_with_timeline(&[redaction]);
let response = response_with_room(room_id, room);
client.process_sliding_sync(&response, &()).await.expect("Failed to process sync");
client.process_sliding_sync(&response, &(), false).await.expect("Failed to process sync");
// Then the room still holds the latest event
let client_room = client.get_room(room_id).expect("No room found");
@@ -2081,7 +2109,7 @@ mod tests {
bump_stamp: Some(42u32.into()),
});
let response = response_with_room(room_id, room);
client.process_sliding_sync(&response, &()).await.expect("Failed to process sync");
client.process_sliding_sync(&response, &(), false).await.expect("Failed to process sync");
// Then the room in the client has the recency stamp
let client_room = client.get_room(room_id).expect("No room found");
@@ -2100,7 +2128,10 @@ mod tests {
bump_stamp: Some(42u32.into()),
});
let response = response_with_room(room_id, room);
client.process_sliding_sync(&response, &()).await.expect("Failed to process sync");
client
.process_sliding_sync(&response, &(), false)
.await
.expect("Failed to process sync");
// Then the room in the client has the recency stamp
let client_room = client.get_room(room_id).expect("No room found");
@@ -2113,7 +2144,10 @@ mod tests {
bump_stamp: None,
});
let response = response_with_room(room_id, room);
client.process_sliding_sync(&response, &()).await.expect("Failed to process sync");
client
.process_sliding_sync(&response, &(), false)
.await
.expect("Failed to process sync");
// Then the room in the client has the previous recency stamp
let client_room = client.get_room(room_id).expect("No room found");
@@ -2127,7 +2161,10 @@ mod tests {
bump_stamp: Some(153u32.into()),
});
let response = response_with_room(room_id, room);
client.process_sliding_sync(&response, &()).await.expect("Failed to process sync");
client
.process_sliding_sync(&response, &(), false)
.await
.expect("Failed to process sync");
// Then the room in the client has the recency stamp
let client_room = client.get_room(room_id).expect("No room found");
@@ -2147,7 +2184,7 @@ mod tests {
bump_stamp: Some(42u32.into()),
});
let response = response_with_room(room_id, room);
client.process_sliding_sync(&response, &()).await.expect("Failed to process sync");
client.process_sliding_sync(&response, &(), false).await.expect("Failed to process sync");
// Then a room info notable update is NOT received, because it's the first time
// the room is seen.
@@ -2164,7 +2201,7 @@ mod tests {
bump_stamp: Some(43u32.into()),
});
let response = response_with_room(room_id, room);
client.process_sliding_sync(&response, &()).await.expect("Failed to process sync");
client.process_sliding_sync(&response, &(), false).await.expect("Failed to process sync");
// Then a room info notable update is received.
assert_matches!(
@@ -2186,7 +2223,7 @@ mod tests {
let room_id = room_id!("!r:e.uk");
let room = http::response::Room::new();
let response = response_with_room(room_id, room);
client.process_sliding_sync(&response, &()).await.expect("Failed to process sync");
client.process_sliding_sync(&response, &(), false).await.expect("Failed to process sync");
// Then a room info notable update is NOT received.
assert_matches!(
@@ -2209,7 +2246,7 @@ mod tests {
timeline: events,
});
let response = response_with_room(room_id, room);
client.process_sliding_sync(&response, &()).await.expect("Failed to process sync");
client.process_sliding_sync(&response, &(), false).await.expect("Failed to process sync");
// Then a room info notable update is received.
assert_matches!(
@@ -2231,7 +2268,7 @@ mod tests {
let room_id = room_id!("!r:e.uk");
let room = http::response::Room::new();
let response = response_with_room(room_id, room);
client.process_sliding_sync(&response, &()).await.expect("Failed to process sync");
client.process_sliding_sync(&response, &(), false).await.expect("Failed to process sync");
// Discard first room info update
let _ = room_info_notable_update_stream.recv().await;
@@ -2254,7 +2291,7 @@ mod tests {
required_state: events,
});
let response = response_with_room(room_id, room);
client.process_sliding_sync(&response, &()).await.expect("Failed to process sync");
client.process_sliding_sync(&response, &(), false).await.expect("Failed to process sync");
// Room was already joined, no MEMBERSHIP update should be triggered here
assert_matches!(
@@ -2281,7 +2318,7 @@ mod tests {
required_state: events,
});
let response = response_with_room(room_id, room);
client.process_sliding_sync(&response, &()).await.expect("Failed to process sync");
client.process_sliding_sync(&response, &(), false).await.expect("Failed to process sync");
// Then a room info notable update is received.
let update = room_info_notable_update_stream.recv().await;
@@ -2304,7 +2341,7 @@ mod tests {
let room_id = room_id!("!r:e.uk");
let room = http::response::Room::new();
let response = response_with_room(room_id, room);
client.process_sliding_sync(&response, &()).await.expect("Failed to process sync");
client.process_sliding_sync(&response, &(), false).await.expect("Failed to process sync");
// Then a room info notable update is NOT received.
assert_matches!(
@@ -2332,7 +2369,7 @@ mod tests {
let mut response = response_with_room(room_id, http::response::Room::new());
response.extensions.account_data.rooms.insert(room_id.to_owned(), room_account_data_events);
client.process_sliding_sync(&response, &()).await.expect("Failed to process sync");
client.process_sliding_sync(&response, &(), false).await.expect("Failed to process sync");
// Then a room info notable update is received.
assert_matches!(
@@ -2344,7 +2381,7 @@ mod tests {
);
// But getting it again won't trigger a new notable update…
client.process_sliding_sync(&response, &()).await.expect("Failed to process sync");
client.process_sliding_sync(&response, &(), false).await.expect("Failed to process sync");
assert_matches!(
room_info_notable_update_stream.recv().await,
@@ -2367,7 +2404,7 @@ mod tests {
)
.unwrap()];
response.extensions.account_data.rooms.insert(room_id.to_owned(), room_account_data_events);
client.process_sliding_sync(&response, &()).await.expect("Failed to process sync");
client.process_sliding_sync(&response, &(), false).await.expect("Failed to process sync");
assert_matches!(
room_info_notable_update_stream.recv().await,
@@ -2389,7 +2426,7 @@ mod tests {
let mut room_response = http::response::Room::new();
set_room_joined(&mut room_response, user_a_id);
let response = response_with_room(room_id, room_response);
client.process_sliding_sync(&response, &()).await.expect("Failed to process sync");
client.process_sliding_sync(&response, &(), false).await.expect("Failed to process sync");
// The newly created room has no pinned event ids
let room = client.get_room(room_id).unwrap();
@@ -2405,7 +2442,7 @@ mod tests {
None,
));
let response = response_with_room(room_id, room_response);
client.process_sliding_sync(&response, &()).await.expect("Failed to process sync");
client.process_sliding_sync(&response, &(), false).await.expect("Failed to process sync");
let pinned_event_ids = room.pinned_event_ids().unwrap_or_default();
assert_eq!(pinned_event_ids.len(), 1);
@@ -2420,7 +2457,7 @@ mod tests {
None,
));
let response = response_with_room(room_id, room_response);
client.process_sliding_sync(&response, &()).await.expect("Failed to process sync");
client.process_sliding_sync(&response, &(), false).await.expect("Failed to process sync");
let pinned_event_ids = room.pinned_event_ids().unwrap();
assert!(pinned_event_ids.is_empty());
}
@@ -2446,7 +2483,7 @@ mod tests {
.account_data
.global
.push(make_global_account_data_event(DirectEventContent(direct_content)));
client.process_sliding_sync(&response, &()).await.expect("Failed to process sync");
client.process_sliding_sync(&response, &(), false).await.expect("Failed to process sync");
let room_1 = client.get_room(room_id_1).unwrap();
assert!(room_1.is_direct().await.unwrap());
@@ -2455,7 +2492,7 @@ mod tests {
let mut room_response = http::response::Room::new();
set_room_joined(&mut room_response, user_b_id);
let response = response_with_room(room_id_2, room_response);
client.process_sliding_sync(&response, &()).await.expect("Failed to process sync");
client.process_sliding_sync(&response, &(), false).await.expect("Failed to process sync");
let room_2 = client.get_room(room_id_2).unwrap();
assert!(room_2.is_direct().await.unwrap());
@@ -2596,7 +2633,7 @@ mod tests {
let mut response = response_with_room(room_id, room);
set_direct_with(&mut response, their_id.to_owned(), vec![room_id.to_owned()]);
client.process_sliding_sync(&response, &()).await.expect("Failed to process sync");
client.process_sliding_sync(&response, &(), false).await.expect("Failed to process sync");
}
/// Set this user's membership within this room to new_state
@@ -2609,7 +2646,7 @@ mod tests {
let mut room = http::response::Room::new();
room.required_state.push(make_membership_event(user_id, new_state));
let response = response_with_room(room_id, room);
client.process_sliding_sync(&response, &()).await.expect("Failed to process sync");
client.process_sliding_sync(&response, &(), false).await.expect("Failed to process sync");
}
fn set_direct_with(

View File

@@ -427,6 +427,7 @@ impl NotificationClient {
assign!(http::request::AccountData::default(), { enabled: Some(true) }),
)
.add_list(invites)
.ignore_verification_requests()
.build()
.await?;

View File

@@ -34,6 +34,8 @@ pub struct SlidingSyncBuilder {
network_timeout: Duration,
#[cfg(feature = "e2e-encryption")]
share_pos: bool,
#[cfg(feature = "e2e-encryption")]
ignore_verification_requests: bool,
}
impl SlidingSyncBuilder {
@@ -56,6 +58,7 @@ impl SlidingSyncBuilder {
network_timeout: Duration::from_secs(30),
#[cfg(feature = "e2e-encryption")]
share_pos: false,
ignore_verification_requests: false,
})
}
}
@@ -222,6 +225,18 @@ impl SlidingSyncBuilder {
self
}
/// Ignore any verification requests received by this Sliding Sync instance.
///
/// This is especially useful when several instances will be running
/// simultaneously, as only one of them should handle verification requests,
/// otherwise those would be processed several times and the verification
/// flow will be broken.
#[cfg(feature = "e2e-encryption")]
pub fn ignore_verification_requests(mut self) -> Self {
self.ignore_verification_requests = true;
self
}
/// Build the Sliding Sync.
///
/// If `self.storage_key` is `Some(_)`, load the cached data from cold
@@ -291,6 +306,7 @@ impl SlidingSyncBuilder {
poll_timeout: self.poll_timeout,
network_timeout: self.network_timeout,
ignore_verification_requests: self.ignore_verification_requests,
}))
}
}

View File

@@ -137,7 +137,7 @@ impl Client {
&self,
response: &http::Response,
) -> Result<SyncResponse> {
let response = self.base_client().process_sliding_sync(response, &()).await?;
let response = self.base_client().process_sliding_sync(response, &(), false).await?;
tracing::debug!("done processing on base_client");
self.call_sync_response_handlers(&response).await?;
@@ -168,11 +168,22 @@ pub(crate) struct SlidingSyncResponseProcessor<'a> {
to_device_events: Vec<Raw<AnyToDeviceEvent>>,
response: Option<SyncResponse>,
rooms: &'a BTreeMap<OwnedRoomId, SlidingSyncRoom>,
ignore_verification_requests: bool,
}
impl<'a> SlidingSyncResponseProcessor<'a> {
pub fn new(client: Client, rooms: &'a BTreeMap<OwnedRoomId, SlidingSyncRoom>) -> Self {
Self { client, to_device_events: Vec::new(), response: None, rooms }
pub fn new(
client: Client,
rooms: &'a BTreeMap<OwnedRoomId, SlidingSyncRoom>,
ignore_verification_requests: bool,
) -> Self {
Self {
client,
to_device_events: Vec::new(),
response: None,
rooms,
ignore_verification_requests,
}
}
#[cfg(feature = "e2e-encryption")]
@@ -205,7 +216,11 @@ impl<'a> SlidingSyncResponseProcessor<'a> {
self.response = Some(
self.client
.base_client()
.process_sliding_sync(response, &SlidingSyncPreviousEventsProvider(self.rooms))
.process_sliding_sync(
response,
&SlidingSyncPreviousEventsProvider(self.rooms),
self.ignore_verification_requests,
)
.await?,
);
self.post_process().await

View File

@@ -120,6 +120,9 @@ pub(super) struct SlidingSyncInner {
/// Internal channel used to pass messages between Sliding Sync and other
/// types.
internal_channel: Sender<SlidingSyncInternalMessage>,
/// Ignore any verification requests received in the sync.
ignore_verification_requests: bool,
}
impl SlidingSync {
@@ -285,8 +288,11 @@ impl SlidingSync {
let _sync_lock = self.inner.client.base_client().sync_lock().lock().await;
let rooms = &*self.inner.rooms.read().await;
let mut response_processor =
SlidingSyncResponseProcessor::new(self.inner.client.clone(), rooms);
let mut response_processor = SlidingSyncResponseProcessor::new(
self.inner.client.clone(),
rooms,
self.inner.ignore_verification_requests,
);
#[cfg(feature = "e2e-encryption")]
if self.is_e2ee_enabled() {