fix: Remove support for MSC3575.

This commit is contained in:
Ivan Enderlin
2025-01-14 15:01:34 +01:00
parent 9de6d28270
commit 38e28643f1
19 changed files with 146 additions and 1224 deletions

View File

@@ -1816,7 +1816,6 @@ impl MediaFileHandle {
#[derive(Clone, uniffi::Enum)]
pub enum SlidingSyncVersion {
None,
Proxy { url: String },
Native,
}
@@ -1824,7 +1823,6 @@ impl From<SdkSlidingSyncVersion> for SlidingSyncVersion {
fn from(value: SdkSlidingSyncVersion) -> Self {
match value {
SdkSlidingSyncVersion::None => Self::None,
SdkSlidingSyncVersion::Proxy { url } => Self::Proxy { url: url.to_string() },
SdkSlidingSyncVersion::Native => Self::Native,
}
}
@@ -1836,9 +1834,6 @@ impl TryFrom<SlidingSyncVersion> for SdkSlidingSyncVersion {
fn try_from(value: SlidingSyncVersion) -> Result<Self, Self::Error> {
Ok(match value {
SlidingSyncVersion::None => Self::None,
SlidingSyncVersion::Proxy { url } => Self::Proxy {
url: Url::parse(&url).map_err(|e| ClientError::Generic { msg: e.to_string() })?,
},
SlidingSyncVersion::Native => Self::Native,
})
}

View File

@@ -20,7 +20,6 @@ use matrix_sdk::{
};
use ruma::api::error::{DeserializationError, FromHttpResponseError};
use tracing::{debug, error};
use url::Url;
use zeroize::Zeroizing;
use super::{client::Client, RUNTIME};
@@ -604,22 +603,10 @@ impl ClientBuilder {
inner_builder = inner_builder
.sliding_sync_version_builder(MatrixSlidingSyncVersionBuilder::None)
}
SlidingSyncVersionBuilder::Proxy { url } => {
inner_builder = inner_builder.sliding_sync_version_builder(
MatrixSlidingSyncVersionBuilder::Proxy {
url: Url::parse(&url)
.map_err(|e| ClientBuildError::Generic { message: e.to_string() })?,
},
)
}
SlidingSyncVersionBuilder::Native => {
inner_builder = inner_builder
.sliding_sync_version_builder(MatrixSlidingSyncVersionBuilder::Native)
}
SlidingSyncVersionBuilder::DiscoverProxy => {
inner_builder = inner_builder
.sliding_sync_version_builder(MatrixSlidingSyncVersionBuilder::DiscoverProxy)
}
SlidingSyncVersionBuilder::DiscoverNative => {
inner_builder = inner_builder
.sliding_sync_version_builder(MatrixSlidingSyncVersionBuilder::DiscoverNative)
@@ -748,8 +735,6 @@ pub struct RequestConfig {
#[derive(Clone, uniffi::Enum)]
pub enum SlidingSyncVersionBuilder {
None,
Proxy { url: String },
Native,
DiscoverProxy,
DiscoverNative,
}

View File

@@ -65,7 +65,6 @@ ruma = { workspace = true, features = [
"canonical-json",
"unstable-msc2867",
"unstable-msc3381",
"unstable-msc3575",
"unstable-msc4186",
"rand",
] }

View File

@@ -110,8 +110,8 @@
//! events ids in both sets. As a matter of fact, we have to manually handle
//! this edge case here. I hope that having an event database will help avoid
//! this kind of workaround here later.
//! - In addition to that, and as noted in the timeline code, it seems that the
//! sliding-sync proxy could return the same event multiple times in a sync
//! - In addition to that, and as noted in the timeline code, it seems that
//! sliding sync could return the same event multiple times in a sync
//! timeline, leading to incorrect results. We have to take that into account
//! by resetting the read counts *every* time we see an event that was the
//! target of the latest active read receipt.
@@ -245,10 +245,9 @@ impl RoomReadReceipts {
let mut counting_receipts = false;
for event in events {
// The sliding sync proxy sometimes sends the same event multiple times, so it
// can be at the beginning and end of a batch, for instance. In that
// case, just reset every time we see the event matching the
// receipt. NOTE: SS proxy workaround.
// Sliding sync sometimes sends the same event multiple times, so it can be at
// the beginning and end of a batch, for instance. In that case, just reset
// every time we see the event matching the receipt.
if let Some(event_id) = event.event_id() {
if event_id == receipt_event_id {
// Bingo! Switch over to the counting state, after resetting the

View File

@@ -12,38 +12,8 @@
// See the License for the specific language governing permissions and
// limitations under the License.
//! HTTP types for MSC4186 or MSC3585.
//! HTTP types for MSC4186.
//!
//! This module provides unified namings for types from MSC3575 and
//! MSC4186.
//! It's a re-export of the type from Ruma.
/// HTTP types from MSC3575, renamed to match the MSC4186 namings.
pub mod msc3575 {
use ruma::api::client::sync::sync_events::v4;
pub use v4::{Request, Response};
/// HTTP types related to a `Request`.
pub mod request {
pub use super::v4::{
AccountDataConfig as AccountData, ExtensionsConfig as Extensions,
ReceiptsConfig as Receipts, RoomDetailsConfig as RoomDetails, RoomSubscription,
SyncRequestList as List, SyncRequestListFilters as ListFilters,
ToDeviceConfig as ToDevice, TypingConfig as Typing,
};
}
/// HTTP types related to a `Response`.
pub mod response {
pub use super::v4::{
AccountData, Extensions, Receipts, SlidingSyncRoom as Room,
SlidingSyncRoomHero as RoomHero, SyncList as List, ToDevice, Typing,
};
}
}
/// HTTP types from MSC4186.
pub mod msc4186 {
pub use ruma::api::client::sync::sync_events::v5::*;
}
pub use msc4186::*;
pub use ruma::api::client::sync::sync_events::v5::*;

View File

@@ -12,13 +12,13 @@
// See the License for the specific language governing permissions and
// limitations under the License.
//! Extend `BaseClient` with capabilities to handle MSC3575.
//! Extend `BaseClient` with capabilities to handle MSC4186.
pub mod http;
use std::collections::BTreeMap;
#[cfg(feature = "e2e-encryption")]
use std::ops::Deref;
use std::{borrow::Cow, collections::BTreeMap};
#[cfg(feature = "e2e-encryption")]
use matrix_sdk_common::deserialized_responses::TimelineEvent;
@@ -29,11 +29,11 @@ use ruma::{
AnySyncStateEvent,
},
serde::Raw,
JsOption, OwnedRoomId, RoomId, UInt, UserId,
JsOption, OwnedRoomId, RoomId, UserId,
};
#[cfg(feature = "e2e-encryption")]
use ruma::{api::client::sync::sync_events::v5, events::AnyToDeviceEvent, events::StateEventType};
use tracing::{debug, error, instrument, trace, warn};
use tracing::{instrument, trace, warn};
use super::BaseClient;
use crate::{
@@ -123,14 +123,11 @@ impl BaseClient {
/// sync.
/// * `previous_events_provider` - Timeline events prior to the current
/// sync.
/// * `with_msc4186` - Whether the `response` comes from simplified sliding
/// sync (MSC4186) or sliding sync (MSC3575).
#[instrument(skip_all, level = "trace")]
pub async fn process_sliding_sync<PEP: PreviousEventsProvider>(
&self,
response: &http::Response,
previous_events_provider: &PEP,
with_msc4186: bool,
) -> Result<SyncResponse> {
let http::Response {
// FIXME not yet supported by sliding sync. see
@@ -189,7 +186,6 @@ impl BaseClient {
&mut room_info_notable_updates,
&mut notifications,
&mut ambiguity_cache,
with_msc4186,
)
.await?;
@@ -361,7 +357,6 @@ impl BaseClient {
room_info_notable_updates: &mut BTreeMap<OwnedRoomId, RoomInfoNotableUpdateReasons>,
notifications: &mut BTreeMap<OwnedRoomId, Vec<Notification>>,
ambiguity_cache: &mut AmbiguityCache,
with_msc4186: bool,
) -> Result<(
RoomInfo,
Option<JoinedRoomUpdate>,
@@ -369,11 +364,6 @@ impl BaseClient {
Option<InvitedRoom>,
Option<KnockedRoom>,
)> {
// This method may change `room_data` (see the terrible hack describes below)
// with `timestamp` and `invite_state. We don't want to change the `room_data`
// from outside this method, hence `Cow` is perfectly suited here.
let mut room_data = Cow::Borrowed(room_data);
let (raw_state_events, state_events): (Vec<_>, Vec<_>) = {
// Read state events from the `required_state` field.
let state_events = Self::deserialize_state_events(&room_data.required_state);
@@ -388,40 +378,6 @@ impl BaseClient {
// Find or create the room in the store
let is_new_room = !store.room_exists(room_id);
// Terrible hack I (Ivan) am ashamed to write, but there is a bug in the sliding
// sync proxy… When a user receive an invite to a room, the room has no
// `timestamp` despites having `m.room.create` in `bump_event_types`. The result
// of this is that an invite cannot be sorted. This horrible hack will fix that.
//
// The SDK manipulates MSC4186 `Request` and `Response` though. In MSC4186,
// `bump_stamp` replaces `timestamp`, which does NOT represent a time! This
// hack must really, only, apply to the proxy, so to MSC3575 strictly (hence
// the `with_msc4186` argument).
//
// The proxy uses the `origin_server_ts` event's value to fill the `timestamp`
// room's value (which is a bad idea[^1]). If `timestamp` is `None`, let's find
// out which event in `invite_state` can be a candidate: we take the latest one.
//
// [^1]: using `origin_server_ts` for `timestamp` is a bad idea because
// this value can be forged by a malicious user. Anyway, that's how it works
// in the proxy. MSC4186 has another mechanism which fixes the problem.
if !with_msc4186 && room_data.bump_stamp.is_none() {
if let Some(invite_state) = &room_data.invite_state {
room_data.to_mut().bump_stamp =
invite_state.iter().rev().find_map(|invite_state| {
invite_state.get_field::<UInt>("origin_server_ts").ok().flatten()
});
debug!(
?room_id,
timestamp = ?room_data.bump_stamp,
"Received a room with no `timestamp`; looked for a replacement value"
);
} else {
error!(?room_id, "Received a room with no `timestamp` and no `invite_state`");
}
}
let stripped_state: Option<Vec<(Raw<AnyStrippedStateEvent>, AnyStrippedStateEvent)>> =
room_data
.invite_state
@@ -471,7 +427,7 @@ impl BaseClient {
process_room_properties(
room_id,
room_data.as_ref(),
room_data,
&mut room_info,
is_new_room,
room_info_notable_updates,
@@ -958,10 +914,8 @@ mod tests {
}),
);
let sync_response = client
.process_sliding_sync(&response, &(), true)
.await
.expect("Failed to process sync");
let sync_response =
client.process_sliding_sync(&response, &()).await.expect("Failed to process sync");
// Check it's present in the response.
let room = sync_response.rooms.join.get(room_id).unwrap();
@@ -976,10 +930,7 @@ mod tests {
async fn test_can_process_empty_sliding_sync_response() {
let client = logged_in_base_client(None).await;
let empty_response = http::Response::new("5".to_owned());
client
.process_sliding_sync(&empty_response, &(), true)
.await
.expect("Failed to process sync");
client.process_sliding_sync(&empty_response, &()).await.expect("Failed to process sync");
}
#[async_test]
@@ -993,10 +944,8 @@ mod tests {
let mut room = http::response::Room::new();
room.joined_count = Some(uint!(41));
let response = response_with_room(room_id, room);
let sync_resp = client
.process_sliding_sync(&response, &(), true)
.await
.expect("Failed to process sync");
let sync_resp =
client.process_sliding_sync(&response, &()).await.expect("Failed to process sync");
// Then the room appears in the client (with the same joined count)
let client_room = client.get_room(room_id).expect("No room found");
@@ -1021,10 +970,8 @@ mod tests {
let mut room = http::response::Room::new();
room.name = Some("little room".to_owned());
let response = response_with_room(room_id, room);
let sync_resp = client
.process_sliding_sync(&response, &(), true)
.await
.expect("Failed to process sync");
let sync_resp =
client.process_sliding_sync(&response, &()).await.expect("Failed to process sync");
// No m.room.name event, no heroes, no members => considered an empty room!
let client_room = client.get_room(room_id).expect("No room found");
@@ -1053,7 +1000,7 @@ mod tests {
set_room_name(&mut room, user_id!("@a:b.c"), "The Name".to_owned());
let response = response_with_room(room_id, room);
client.process_sliding_sync(&response, &(), true).await.expect("Failed to process sync");
client.process_sliding_sync(&response, &()).await.expect("Failed to process sync");
// The name is known.
let client_room = client.get_room(room_id).expect("No room found");
@@ -1075,10 +1022,8 @@ mod tests {
set_room_invited(&mut room, inviter, user_id);
room.name = Some("name from sliding sync response".to_owned());
let response = response_with_room(room_id, room);
let sync_resp = client
.process_sliding_sync(&response, &(), true)
.await
.expect("Failed to process sync");
let sync_resp =
client.process_sliding_sync(&response, &()).await.expect("Failed to process sync");
// Then the room doesn't have the name in the client.
let client_room = client.get_room(room_id).expect("No room found");
@@ -1114,7 +1059,7 @@ mod tests {
set_room_name(&mut room, user_id!("@a:b.c"), "The Name".to_owned());
let response = response_with_room(room_id, room);
client.process_sliding_sync(&response, &(), true).await.expect("Failed to process sync");
client.process_sliding_sync(&response, &()).await.expect("Failed to process sync");
// The name is known.
let client_room = client.get_room(room_id).expect("No room found");
@@ -1135,7 +1080,7 @@ mod tests {
set_room_knocked(&mut room, &user_id);
let response = response_with_room(room_id, room);
client.process_sliding_sync(&response, &(), true).await.expect("Failed to process sync");
client.process_sliding_sync(&response, &()).await.expect("Failed to process sync");
// The room is knocked.
let client_room = client.get_room(room_id).expect("No room found");
@@ -1155,7 +1100,7 @@ mod tests {
set_room_knocked(&mut room, user_id);
let response = response_with_room(room_id, room);
client.process_sliding_sync(&response, &(), true).await.expect("Failed to process sync");
client.process_sliding_sync(&response, &()).await.expect("Failed to process sync");
// The room is invited since the membership event doesn't belong to the current
// user.
@@ -1187,7 +1132,7 @@ mod tests {
room.invite_state = Some(vec![event]);
let response = response_with_room(room_id, room);
client.process_sliding_sync(&response, &(), true).await.expect("Failed to process sync");
client.process_sliding_sync(&response, &()).await.expect("Failed to process sync");
// The room is marked as invited.
let client_room = client.get_room(room_id).expect("No room found");
@@ -1205,17 +1150,15 @@ mod tests {
let mut room = http::response::Room::new();
set_room_joined(&mut room, user_id);
let response = response_with_room(room_id, room);
client.process_sliding_sync(&response, &(), true).await.expect("Failed to process sync");
client.process_sliding_sync(&response, &()).await.expect("Failed to process sync");
assert_eq!(client.get_room(room_id).unwrap().state(), RoomState::Joined);
// And then leave with a `required_state` state event…
let mut room = http::response::Room::new();
set_room_left(&mut room, user_id);
let response = response_with_room(room_id, room);
let sync_resp = client
.process_sliding_sync(&response, &(), true)
.await
.expect("Failed to process sync");
let sync_resp =
client.process_sliding_sync(&response, &()).await.expect("Failed to process sync");
// The room is left.
assert_eq!(client.get_room(room_id).unwrap().state(), RoomState::Left);
@@ -1239,10 +1182,7 @@ mod tests {
let mut room = http::response::Room::new();
set_room_joined(&mut room, user_a_id);
let response = response_with_room(room_id, room);
client
.process_sliding_sync(&response, &(), true)
.await
.expect("Failed to process sync");
client.process_sliding_sync(&response, &()).await.expect("Failed to process sync");
assert_eq!(client.get_room(room_id).unwrap().state(), RoomState::Joined);
// And then get kicked/banned with a `required_state` state event…
@@ -1254,10 +1194,8 @@ mod tests {
None,
));
let response = response_with_room(room_id, room);
let sync_resp = client
.process_sliding_sync(&response, &(), true)
.await
.expect("Failed to process sync");
let sync_resp =
client.process_sliding_sync(&response, &()).await.expect("Failed to process sync");
match membership {
MembershipState::Leave => {
@@ -1290,14 +1228,14 @@ mod tests {
let mut room = http::response::Room::new();
set_room_joined(&mut room, user_id);
let response = response_with_room(room_id, room);
client.process_sliding_sync(&response, &(), true).await.expect("Failed to process sync");
client.process_sliding_sync(&response, &()).await.expect("Failed to process sync");
assert_eq!(client.get_room(room_id).unwrap().state(), RoomState::Joined);
// And then leave with a `timeline` state event…
let mut room = http::response::Room::new();
set_room_left_as_timeline_event(&mut room, user_id);
let response = response_with_room(room_id, room);
client.process_sliding_sync(&response, &(), true).await.expect("Failed to process sync");
client.process_sliding_sync(&response, &()).await.expect("Failed to process sync");
// The room is NOT left because state events from `timeline` must be IGNORED!
assert_eq!(client.get_room(room_id).unwrap().state(), RoomState::Joined);
@@ -1316,7 +1254,7 @@ mod tests {
let mut room = http::response::Room::new();
set_room_joined(&mut room, user_id);
let response = response_with_room(room_id, room);
client.process_sliding_sync(&response, &(), true).await.expect("Failed to process sync");
client.process_sliding_sync(&response, &()).await.expect("Failed to process sync");
// (sanity: state is join)
assert_eq!(client.get_room(room_id).unwrap().state(), RoomState::Joined);
@@ -1324,7 +1262,7 @@ mod tests {
let mut room = http::response::Room::new();
set_room_left(&mut room, user_id);
let response = response_with_room(room_id, room);
client.process_sliding_sync(&response, &(), true).await.expect("Failed to process sync");
client.process_sliding_sync(&response, &()).await.expect("Failed to process sync");
// (sanity: state is left)
assert_eq!(client.get_room(room_id).unwrap().state(), RoomState::Left);
@@ -1332,7 +1270,7 @@ mod tests {
let mut room = http::response::Room::new();
set_room_invited(&mut room, user_id, user_id);
let response = response_with_room(room_id, room);
client.process_sliding_sync(&response, &(), true).await.expect("Failed to process sync");
client.process_sliding_sync(&response, &()).await.expect("Failed to process sync");
// Then the room is in the invite state
assert_eq!(client.get_room(room_id).unwrap().state(), RoomState::Invited);
@@ -1449,7 +1387,7 @@ mod tests {
room
};
let response = response_with_room(room_id, room);
client.process_sliding_sync(&response, &(), true).await.expect("Failed to process sync");
client.process_sliding_sync(&response, &()).await.expect("Failed to process sync");
// Then the room in the client has the avatar
let client_room = client.get_room(room_id).expect("No room found");
@@ -1475,7 +1413,7 @@ mod tests {
room
};
let response = response_with_room(room_id, room);
client.process_sliding_sync(&response, &(), true).await.expect("Failed to process sync");
client.process_sliding_sync(&response, &()).await.expect("Failed to process sync");
// Then the room in the client has the avatar
let client_room = client.get_room(room_id).expect("No room found");
@@ -1489,7 +1427,7 @@ mod tests {
// When I send sliding sync response containing no avatar.
let room = http::response::Room::new();
let response = response_with_room(room_id, room);
client.process_sliding_sync(&response, &(), true).await.expect("Failed to process sync");
client.process_sliding_sync(&response, &()).await.expect("Failed to process sync");
// Then the room in the client still has the avatar
let client_room = client.get_room(room_id).expect("No room found");
@@ -1508,7 +1446,7 @@ mod tests {
room
};
let response = response_with_room(room_id, room);
client.process_sliding_sync(&response, &(), true).await.expect("Failed to process sync");
client.process_sliding_sync(&response, &()).await.expect("Failed to process sync");
// Then the room in the client has no more avatar
let client_room = client.get_room(room_id).expect("No room found");
@@ -1525,7 +1463,7 @@ mod tests {
// When I send sliding sync response containing a room with an avatar
let room = room_with_avatar(mxc_uri!("mxc://e.uk/med1"), user_id);
let response = response_with_room(room_id, room);
client.process_sliding_sync(&response, &(), true).await.expect("Failed to process sync");
client.process_sliding_sync(&response, &()).await.expect("Failed to process sync");
// Then the room in the client has the avatar
let client_room = client.get_room(room_id).expect("No room found");
@@ -1546,10 +1484,8 @@ mod tests {
let mut room = http::response::Room::new();
set_room_invited(&mut room, user_id, user_id);
let response = response_with_room(room_id, room);
let sync_resp = client
.process_sliding_sync(&response, &(), true)
.await
.expect("Failed to process sync");
let sync_resp =
client.process_sliding_sync(&response, &()).await.expect("Failed to process sync");
// Then the room is added to the client
let client_room = client.get_room(room_id).expect("No room found");
@@ -1561,76 +1497,6 @@ mod tests {
assert!(!sync_resp.rooms.join.contains_key(room_id));
}
#[async_test]
async fn test_invitation_room_receive_a_default_timestamp_on_not_simplified_sliding_sync() {
const NOT_MSC4186: bool = false;
// Given a logged-in client
let client = logged_in_base_client(None).await;
let room_id = room_id!("!r:e.uk");
let user_id = user_id!("@u:e.uk");
// When I send sliding sync response containing an invited room with no
// `origin_server_ts` from any events in `invite_state`
let mut room = http::response::Room::new();
set_room_invited(&mut room, user_id, user_id);
let response = response_with_room(room_id, room);
let _sync_resp = client
.process_sliding_sync(&response, &(), NOT_MSC4186)
.await
.expect("Failed to process sync");
// Then the room has no recency stamp
let client_room = client.get_room(room_id).expect("No room found");
assert!(client_room.recency_stamp().is_none());
// When I send sliding sync response containing an invited room with an
// `origin_server_ts` from an event in `invite_state`
let mut room = http::response::Room::new();
set_room_invited(&mut room, user_id, user_id);
if let Some(invite_state) = room.invite_state.as_mut() {
invite_state.push(
Raw::new(&json!({
"type": "m.room.member",
"sender": user_id,
"content": {
"is_direct": true,
"membership": "invite",
},
"state_key": user_id,
// The `origin_server_ts` is in the middle of other events.
"origin_server_ts": 123456789,
}))
.expect("Failed to make raw event")
.cast(),
);
invite_state.push(
Raw::new(&json!({
"type": "m.room.member",
"sender": user_id,
"content": {
"is_direct": true,
"membership": "invite",
},
"state_key": user_id,
}))
.expect("Failed to make raw event")
.cast(),
);
}
let response = response_with_room(room_id, room);
let _sync_resp = client
.process_sliding_sync(&response, &(), NOT_MSC4186)
.await
.expect("Failed to process sync");
// Then the room has a recency stamp!
let client_room = client.get_room(room_id).expect("No room found");
assert_eq!(client_room.recency_stamp(), Some(123456789));
}
#[async_test]
async fn test_avatar_is_found_in_invitation_room_when_processing_sliding_sync_response() {
// Given a logged-in client
@@ -1642,7 +1508,7 @@ mod tests {
let mut room = room_with_avatar(mxc_uri!("mxc://e.uk/med1"), user_id);
set_room_invited(&mut room, user_id, user_id);
let response = response_with_room(room_id, room);
client.process_sliding_sync(&response, &(), true).await.expect("Failed to process sync");
client.process_sliding_sync(&response, &()).await.expect("Failed to process sync");
// Then the room in the client has the avatar
let client_room = client.get_room(room_id).expect("No room found");
@@ -1665,7 +1531,7 @@ mod tests {
let mut room = room_with_canonical_alias(room_alias_id, user_id);
set_room_invited(&mut room, user_id, user_id);
let response = response_with_room(room_id, room);
client.process_sliding_sync(&response, &(), true).await.expect("Failed to process sync");
client.process_sliding_sync(&response, &()).await.expect("Failed to process sync");
// Then the room in the client has the avatar
let client_room = client.get_room(room_id).expect("No room found");
@@ -1685,7 +1551,7 @@ mod tests {
let mut room = room_with_canonical_alias(room_alias_id, user_id);
room.name = Some("This came from the server".to_owned());
let response = response_with_room(room_id, room);
client.process_sliding_sync(&response, &(), true).await.expect("Failed to process sync");
client.process_sliding_sync(&response, &()).await.expect("Failed to process sync");
// Then the room's name is NOT overridden by the server-computed display name.
let client_room = client.get_room(room_id).expect("No room found");
@@ -1714,10 +1580,8 @@ mod tests {
}),
]);
let response = response_with_room(room_id, room);
let _sync_resp = client
.process_sliding_sync(&response, &(), true)
.await
.expect("Failed to process sync");
let _sync_resp =
client.process_sliding_sync(&response, &()).await.expect("Failed to process sync");
// Then the room appears in the client.
let client_room = client.get_room(room_id).expect("No room found");
@@ -1766,7 +1630,7 @@ mod tests {
let events = &[event_a, event_b.clone()];
let room = room_with_timeline(events);
let response = response_with_room(room_id, room);
client.process_sliding_sync(&response, &(), true).await.expect("Failed to process sync");
client.process_sliding_sync(&response, &()).await.expect("Failed to process sync");
// Then the room holds the latest event
let client_room = client.get_room(room_id).expect("No room found");
@@ -1810,7 +1674,7 @@ mod tests {
let mut room = room_with_timeline(events);
room.required_state.push(Raw::new(&power_levels).unwrap().cast());
let response = response_with_room(room_id, room);
client.process_sliding_sync(&response, &(), true).await.expect("Failed to process sync");
client.process_sliding_sync(&response, &()).await.expect("Failed to process sync");
// Then the room holds the latest knock state event
let client_room = client.get_room(room_id).expect("No room found");
@@ -1855,7 +1719,7 @@ mod tests {
let mut room = room_with_timeline(events);
room.required_state.push(Raw::new(&power_levels).unwrap().cast());
let response = response_with_room(room_id, room);
client.process_sliding_sync(&response, &(), true).await.expect("Failed to process sync");
client.process_sliding_sync(&response, &()).await.expect("Failed to process sync");
// Then the room doesn't hold the knock state event as the latest event
let client_room = client.get_room(room_id).expect("No room found");
@@ -1882,7 +1746,7 @@ mod tests {
let events = &[join_event];
let room = room_with_timeline(events);
let response = response_with_room(room_id, room);
client.process_sliding_sync(&response, &(), true).await.expect("Failed to process sync");
client.process_sliding_sync(&response, &()).await.expect("Failed to process sync");
// Then the room doesn't hold the join state event as the latest event
let client_room = client.get_room(room_id).expect("No room found");
@@ -1905,7 +1769,7 @@ mod tests {
// When the sliding sync response contains a timeline
let room = room_with_timeline(&[event_a]);
let response = response_with_room(room_id, room);
client.process_sliding_sync(&response, &(), true).await.expect("Failed to process sync");
client.process_sliding_sync(&response, &()).await.expect("Failed to process sync");
// Then the room holds the latest event
let client_room = client.get_room(room_id).expect("No room found");
@@ -1926,7 +1790,7 @@ mod tests {
// When a redaction for that event is received
let room = room_with_timeline(&[redaction]);
let response = response_with_room(room_id, room);
client.process_sliding_sync(&response, &(), true).await.expect("Failed to process sync");
client.process_sliding_sync(&response, &()).await.expect("Failed to process sync");
// Then the room still holds the latest event
let client_room = client.get_room(room_id).expect("No room found");
@@ -2216,7 +2080,7 @@ mod tests {
bump_stamp: Some(42u32.into()),
});
let response = response_with_room(room_id, room);
client.process_sliding_sync(&response, &(), true).await.expect("Failed to process sync");
client.process_sliding_sync(&response, &()).await.expect("Failed to process sync");
// Then the room in the client has the recency stamp
let client_room = client.get_room(room_id).expect("No room found");
@@ -2235,10 +2099,7 @@ mod tests {
bump_stamp: Some(42u32.into()),
});
let response = response_with_room(room_id, room);
client
.process_sliding_sync(&response, &(), true)
.await
.expect("Failed to process sync");
client.process_sliding_sync(&response, &()).await.expect("Failed to process sync");
// Then the room in the client has the recency stamp
let client_room = client.get_room(room_id).expect("No room found");
@@ -2251,10 +2112,7 @@ mod tests {
bump_stamp: None,
});
let response = response_with_room(room_id, room);
client
.process_sliding_sync(&response, &(), true)
.await
.expect("Failed to process sync");
client.process_sliding_sync(&response, &()).await.expect("Failed to process sync");
// Then the room in the client has the previous recency stamp
let client_room = client.get_room(room_id).expect("No room found");
@@ -2268,10 +2126,7 @@ mod tests {
bump_stamp: Some(153u32.into()),
});
let response = response_with_room(room_id, room);
client
.process_sliding_sync(&response, &(), true)
.await
.expect("Failed to process sync");
client.process_sliding_sync(&response, &()).await.expect("Failed to process sync");
// Then the room in the client has the recency stamp
let client_room = client.get_room(room_id).expect("No room found");
@@ -2291,7 +2146,7 @@ mod tests {
bump_stamp: Some(42u32.into()),
});
let response = response_with_room(room_id, room);
client.process_sliding_sync(&response, &(), true).await.expect("Failed to process sync");
client.process_sliding_sync(&response, &()).await.expect("Failed to process sync");
// Then a room info notable update is NOT received, because it's the first time
// the room is seen.
@@ -2308,7 +2163,7 @@ mod tests {
bump_stamp: Some(43u32.into()),
});
let response = response_with_room(room_id, room);
client.process_sliding_sync(&response, &(), true).await.expect("Failed to process sync");
client.process_sliding_sync(&response, &()).await.expect("Failed to process sync");
// Then a room info notable update is received.
assert_matches!(
@@ -2330,7 +2185,7 @@ mod tests {
let room_id = room_id!("!r:e.uk");
let room = http::response::Room::new();
let response = response_with_room(room_id, room);
client.process_sliding_sync(&response, &(), true).await.expect("Failed to process sync");
client.process_sliding_sync(&response, &()).await.expect("Failed to process sync");
// Then a room info notable update is NOT received.
assert_matches!(
@@ -2353,7 +2208,7 @@ mod tests {
timeline: events,
});
let response = response_with_room(room_id, room);
client.process_sliding_sync(&response, &(), true).await.expect("Failed to process sync");
client.process_sliding_sync(&response, &()).await.expect("Failed to process sync");
// Then a room info notable update is received.
assert_matches!(
@@ -2375,7 +2230,7 @@ mod tests {
let room_id = room_id!("!r:e.uk");
let room = http::response::Room::new();
let response = response_with_room(room_id, room);
client.process_sliding_sync(&response, &(), true).await.expect("Failed to process sync");
client.process_sliding_sync(&response, &()).await.expect("Failed to process sync");
// Discard first room info update
let _ = room_info_notable_update_stream.recv().await;
@@ -2398,7 +2253,7 @@ mod tests {
required_state: events,
});
let response = response_with_room(room_id, room);
client.process_sliding_sync(&response, &(), true).await.expect("Failed to process sync");
client.process_sliding_sync(&response, &()).await.expect("Failed to process sync");
// Room was already joined, no MEMBERSHIP update should be triggered here
assert_matches!(
@@ -2425,7 +2280,7 @@ mod tests {
required_state: events,
});
let response = response_with_room(room_id, room);
client.process_sliding_sync(&response, &(), true).await.expect("Failed to process sync");
client.process_sliding_sync(&response, &()).await.expect("Failed to process sync");
// Then a room info notable update is received.
let update = room_info_notable_update_stream.recv().await;
@@ -2448,7 +2303,7 @@ mod tests {
let room_id = room_id!("!r:e.uk");
let room = http::response::Room::new();
let response = response_with_room(room_id, room);
client.process_sliding_sync(&response, &(), true).await.expect("Failed to process sync");
client.process_sliding_sync(&response, &()).await.expect("Failed to process sync");
// Then a room info notable update is NOT received.
assert_matches!(
@@ -2476,7 +2331,7 @@ mod tests {
let mut response = response_with_room(room_id, http::response::Room::new());
response.extensions.account_data.rooms.insert(room_id.to_owned(), room_account_data_events);
client.process_sliding_sync(&response, &(), true).await.expect("Failed to process sync");
client.process_sliding_sync(&response, &()).await.expect("Failed to process sync");
// Then a room info notable update is received.
assert_matches!(
@@ -2488,7 +2343,7 @@ mod tests {
);
// But getting it again won't trigger a new notable update…
client.process_sliding_sync(&response, &(), true).await.expect("Failed to process sync");
client.process_sliding_sync(&response, &()).await.expect("Failed to process sync");
assert_matches!(
room_info_notable_update_stream.recv().await,
@@ -2511,7 +2366,7 @@ mod tests {
)
.unwrap()];
response.extensions.account_data.rooms.insert(room_id.to_owned(), room_account_data_events);
client.process_sliding_sync(&response, &(), true).await.expect("Failed to process sync");
client.process_sliding_sync(&response, &()).await.expect("Failed to process sync");
assert_matches!(
room_info_notable_update_stream.recv().await,
@@ -2533,7 +2388,7 @@ mod tests {
let mut room_response = http::response::Room::new();
set_room_joined(&mut room_response, user_a_id);
let response = response_with_room(room_id, room_response);
client.process_sliding_sync(&response, &(), true).await.expect("Failed to process sync");
client.process_sliding_sync(&response, &()).await.expect("Failed to process sync");
// The newly created room has no pinned event ids
let room = client.get_room(room_id).unwrap();
@@ -2549,7 +2404,7 @@ mod tests {
None,
));
let response = response_with_room(room_id, room_response);
client.process_sliding_sync(&response, &(), true).await.expect("Failed to process sync");
client.process_sliding_sync(&response, &()).await.expect("Failed to process sync");
let pinned_event_ids = room.pinned_event_ids().unwrap_or_default();
assert_eq!(pinned_event_ids.len(), 1);
@@ -2564,7 +2419,7 @@ mod tests {
None,
));
let response = response_with_room(room_id, room_response);
client.process_sliding_sync(&response, &(), true).await.expect("Failed to process sync");
client.process_sliding_sync(&response, &()).await.expect("Failed to process sync");
let pinned_event_ids = room.pinned_event_ids().unwrap();
assert!(pinned_event_ids.is_empty());
}
@@ -2590,7 +2445,7 @@ mod tests {
.account_data
.global
.push(make_global_account_data_event(DirectEventContent(direct_content)));
client.process_sliding_sync(&response, &(), true).await.expect("Failed to process sync");
client.process_sliding_sync(&response, &()).await.expect("Failed to process sync");
let room_1 = client.get_room(room_id_1).unwrap();
assert!(room_1.is_direct().await.unwrap());
@@ -2599,7 +2454,7 @@ mod tests {
let mut room_response = http::response::Room::new();
set_room_joined(&mut room_response, user_b_id);
let response = response_with_room(room_id_2, room_response);
client.process_sliding_sync(&response, &(), true).await.expect("Failed to process sync");
client.process_sliding_sync(&response, &()).await.expect("Failed to process sync");
let room_2 = client.get_room(room_id_2).unwrap();
assert!(room_2.is_direct().await.unwrap());
@@ -2740,7 +2595,7 @@ mod tests {
let mut response = response_with_room(room_id, room);
set_direct_with(&mut response, their_id.to_owned(), vec![room_id.to_owned()]);
client.process_sliding_sync(&response, &(), true).await.expect("Failed to process sync");
client.process_sliding_sync(&response, &()).await.expect("Failed to process sync");
}
/// Set this user's membership within this room to new_state
@@ -2753,7 +2608,7 @@ mod tests {
let mut room = http::response::Room::new();
room.required_state.push(make_membership_event(user_id, new_state));
let response = response_with_room(room_id, room);
client.process_sliding_sync(&response, &(), true).await.expect("Failed to process sync");
client.process_sliding_sync(&response, &()).await.expect("Failed to process sync");
}
fn set_direct_with(
@@ -2828,7 +2683,7 @@ mod tests {
}
fn set_room_invited(room: &mut http::response::Room, inviter: &UserId, invitee: &UserId) {
// MSC3575 shows an almost-empty event to indicate that we are invited to a
// Sliding Sync shows an almost-empty event to indicate that we are invited to a
// room. Just the type is supplied.
let evt = Raw::new(&json!({
@@ -2856,7 +2711,7 @@ mod tests {
}
fn set_room_knocked(room: &mut http::response::Room, knocker: &UserId) {
// MSC3575 shows an almost-empty event to indicate that we are invited to a
// Sliding Sync shows an almost-empty event to indicate that we are invited to a
// room. Just the type is supplied.
let evt = Raw::new(&json!({

View File

@@ -202,7 +202,7 @@ impl EncryptionSyncService {
match sync.next().await {
Some(Ok(update_summary)) => {
// This API is only concerned with the e2ee and to-device extensions.
// Warn if anything weird has been received from the proxy.
// Warn if anything weird has been received from the homeserver.
if !update_summary.lists.is_empty() {
debug!(?update_summary.lists, "unexpected non-empty list of lists in encryption sync API");
}
@@ -250,7 +250,7 @@ impl EncryptionSyncService {
match self.next_sync_with_lock(&mut sync).await? {
Some(Ok(update_summary)) => {
// This API is only concerned with the e2ee and to-device extensions.
// Warn if anything weird has been received from the proxy.
// Warn if anything weird has been received from the homeserver.
if !update_summary.lists.is_empty() {
debug!(?update_summary.lists, "unexpected non-empty list of lists in encryption sync API");
}

View File

@@ -177,8 +177,8 @@ impl NotificationClient {
//
// Spawn an `EncryptionSync` that runs two iterations of the sliding sync loop:
// - the first iteration allows to get SS events as well as send e2ee requests.
// - the second one let the SS proxy forward events triggered by the sending of
// e2ee requests.
// - the second one let the SS homeserver forward events triggered by the
// sending of e2ee requests.
//
// Keep timeouts small for both, since we might be short on time.

View File

@@ -466,13 +466,10 @@ pub enum SyncIndicator {
mod tests {
use std::future::ready;
use assert_matches::assert_matches;
use futures_util::{pin_mut, StreamExt};
use matrix_sdk::{
authentication::matrix::{MatrixSession, MatrixSessionTokens},
config::RequestConfig,
reqwest::Url,
sliding_sync::Version as SlidingSyncVersion,
Client, SlidingSyncMode,
};
use matrix_sdk_base::SessionMeta;
@@ -520,31 +517,6 @@ mod tests {
}
}
#[async_test]
async fn test_sliding_sync_proxy_url() -> Result<(), Error> {
let (client, _) = new_client().await;
{
let room_list = RoomListService::new(client.clone()).await?;
assert_matches!(room_list.sliding_sync().version(), SlidingSyncVersion::Native);
}
{
let url = Url::parse("https://foo.matrix/").unwrap();
client.set_sliding_sync_version(SlidingSyncVersion::Proxy { url: url.clone() });
let room_list = RoomListService::new(client.clone()).await?;
assert_matches!(
room_list.sliding_sync().version(),
SlidingSyncVersion::Proxy { url: given_url } => {
assert_eq!(&url, given_url);
}
);
}
Ok(())
}
#[async_test]
async fn test_all_rooms_are_declared() -> Result<(), Error> {
let room_list = new_room_list().await?;

View File

@@ -532,7 +532,7 @@ impl MatrixAuth {
};
let request = refresh_token::v3::Request::new(refresh_token);
let res = self.client.send_inner(request, None, None, Default::default()).await;
let res = self.client.send_inner(request, None, Default::default()).await;
match res {
Ok(res) => {

View File

@@ -52,7 +52,6 @@ pub(super) enum UrlScheme {
pub(super) struct HomeserverDiscoveryResult {
pub server: Option<Url>,
pub homeserver: Url,
pub well_known: Option<discover_homeserver::Response>,
pub supported_versions: Option<get_supported_versions::Response>,
}
@@ -68,7 +67,6 @@ impl HomeserverConfig {
HomeserverDiscoveryResult {
server: None, // We can't know the `server` if we only have a `homeserver`.
homeserver,
well_known: None,
supported_versions: None,
}
}
@@ -80,20 +78,19 @@ impl HomeserverConfig {
HomeserverDiscoveryResult {
server: Some(server),
homeserver: Url::parse(&well_known.homeserver.base_url)?,
well_known: Some(well_known),
supported_versions: None,
}
}
Self::ServerNameOrHomeserverUrl(server_name_or_url) => {
let (server, homeserver, well_known, supported_versions) =
let (server, homeserver, supported_versions) =
discover_homeserver_from_server_name_or_url(
server_name_or_url.to_owned(),
http_client,
)
.await?;
HomeserverDiscoveryResult { server, homeserver, well_known, supported_versions }
HomeserverDiscoveryResult { server, homeserver, supported_versions }
}
})
}
@@ -105,15 +102,7 @@ impl HomeserverConfig {
async fn discover_homeserver_from_server_name_or_url(
mut server_name_or_url: String,
http_client: &HttpClient,
) -> Result<
(
Option<Url>,
Url,
Option<discover_homeserver::Response>,
Option<get_supported_versions::Response>,
),
ClientBuildError,
> {
) -> Result<(Option<Url>, Url, Option<get_supported_versions::Response>), ClientBuildError> {
let mut discovery_error: Option<ClientBuildError> = None;
// Attempt discovery as a server name first.
@@ -128,12 +117,7 @@ async fn discover_homeserver_from_server_name_or_url(
match discover_homeserver(server_name, &protocol, http_client).await {
Ok((server, well_known)) => {
return Ok((
Some(server),
Url::parse(&well_known.homeserver.base_url)?,
Some(well_known),
None,
));
return Ok((Some(server), Url::parse(&well_known.homeserver.base_url)?, None));
}
Err(e) => {
debug!(error = %e, "Well-known discovery failed.");
@@ -154,7 +138,7 @@ async fn discover_homeserver_from_server_name_or_url(
// Make sure the URL is definitely for a homeserver.
match get_supported_versions(&homeserver_url, http_client).await {
Ok(response) => {
return Ok((None, homeserver_url, None, Some(response)));
return Ok((None, homeserver_url, Some(response)));
}
Err(e) => {
debug!(error = %e, "Checking supported versions failed.");
@@ -240,7 +224,6 @@ mod tests {
assert_eq!(result.server, None);
assert_eq!(result.homeserver, Url::parse("https://matrix-client.matrix.org").unwrap());
assert!(result.well_known.is_none());
assert!(result.supported_versions.is_none());
}
@@ -272,7 +255,6 @@ mod tests {
assert_eq!(result.server, Some(Url::parse(&server.uri()).unwrap()));
assert_eq!(result.homeserver, Url::parse(&homeserver.uri()).unwrap());
assert!(result.well_known.is_some());
assert!(result.supported_versions.is_none());
}
@@ -301,7 +283,6 @@ mod tests {
assert_eq!(result.server, Some(Url::parse(&server.uri()).unwrap()));
assert_eq!(result.homeserver, Url::parse(&homeserver.uri()).unwrap());
assert!(result.well_known.is_some());
assert!(result.supported_versions.is_none());
}
@@ -327,7 +308,6 @@ mod tests {
assert!(result.server.is_none());
assert_eq!(result.homeserver, Url::parse(&homeserver.uri()).unwrap());
assert!(result.well_known.is_none());
assert!(result.supported_versions.is_some());
}
}

View File

@@ -489,7 +489,7 @@ impl ClientBuilder {
let http_client = HttpClient::new(inner_http_client.clone(), self.request_config);
#[allow(unused_variables)]
let HomeserverDiscoveryResult { server, homeserver, well_known, supported_versions } =
let HomeserverDiscoveryResult { server, homeserver, supported_versions } =
homeserver_cfg.discover(&http_client).await?;
let sliding_sync_version = {
@@ -501,9 +501,7 @@ impl ClientBuilder {
None => None,
};
let version = self
.sliding_sync_version_builder
.build(well_known.as_ref(), supported_versions.as_ref())?;
let version = self.sliding_sync_version_builder.build(supported_versions.as_ref())?;
tracing::info!(?version, "selected sliding sync version");
@@ -763,7 +761,6 @@ pub(crate) mod tests {
use assert_matches::assert_matches;
use matrix_sdk_test::{async_test, test_json};
use serde_json::{json_internal, Value as JsonValue};
use url::Url;
use wiremock::{
matchers::{method, path},
Mock, MockServer, ResponseTemplate,
@@ -854,34 +851,6 @@ pub(crate) mod tests {
assert!(_client.sliding_sync_version().is_native());
}
#[async_test]
async fn test_discovery_direct_legacy_custom_proxy() {
// Given a homeserver without a well-known file and with a custom sliding sync
// proxy injected.
let homeserver = make_mock_homeserver().await;
let mut builder = ClientBuilder::new();
let url = {
let url = Url::parse("https://localhost:1234").unwrap();
builder = builder.sliding_sync_version_builder(SlidingSyncVersionBuilder::Proxy {
url: url.clone(),
});
url
};
// When building a client with the server's URL.
builder = builder.server_name_or_homeserver_url(homeserver.uri());
let client = builder.build().await.unwrap();
// Then a client should be built with support for sliding sync.
assert_matches!(
client.sliding_sync_version(),
SlidingSyncVersion::Proxy { url: given_url } => {
assert_eq!(given_url, url);
}
);
}
#[async_test]
async fn test_discovery_well_known_parse_error() {
// Given a base server with a well-known file that has errors.
@@ -889,7 +858,7 @@ pub(crate) mod tests {
let homeserver = make_mock_homeserver().await;
let mut builder = ClientBuilder::new();
let well_known = make_well_known_json(&homeserver.uri(), None);
let well_known = make_well_known_json(&homeserver.uri());
let bad_json = well_known.to_string().replace(',', "");
Mock::given(method("GET"))
.and(path("/.well-known/matrix/client"))
@@ -919,8 +888,7 @@ pub(crate) mod tests {
Mock::given(method("GET"))
.and(path("/.well-known/matrix/client"))
.respond_with(
ResponseTemplate::new(200)
.set_body_json(make_well_known_json(&homeserver.uri(), None)),
ResponseTemplate::new(200).set_body_json(make_well_known_json(&homeserver.uri())),
)
.mount(&server)
.await;
@@ -934,110 +902,6 @@ pub(crate) mod tests {
assert!(client.sliding_sync_version().is_native());
}
#[async_test]
async fn test_discovery_well_known_with_sliding_sync() {
// Given a base server with a well-known file that points to a homeserver with a
// sliding sync proxy.
let server = MockServer::start().await;
let homeserver = make_mock_homeserver().await;
let mut builder = ClientBuilder::new();
Mock::given(method("GET"))
.and(path("/.well-known/matrix/client"))
.respond_with(ResponseTemplate::new(200).set_body_json(make_well_known_json(
&homeserver.uri(),
Some("https://localhost:1234"),
)))
.mount(&server)
.await;
// When building a client with the base server, with sliding sync to
// auto-discover the proxy.
builder = builder
.server_name_or_homeserver_url(server.uri())
.sliding_sync_version_builder(SlidingSyncVersionBuilder::DiscoverProxy);
let client = builder.build().await.unwrap();
// Then a client should be built with support for sliding sync.
assert_matches!(
client.sliding_sync_version(),
SlidingSyncVersion::Proxy { url } => {
assert_eq!(url, Url::parse("https://localhost:1234").unwrap());
}
);
}
#[async_test]
async fn test_discovery_well_known_with_sliding_sync_override() {
// Given a base server with a well-known file that points to a homeserver with a
// sliding sync proxy.
let server = MockServer::start().await;
let homeserver = make_mock_homeserver().await;
let mut builder = ClientBuilder::new();
Mock::given(method("GET"))
.and(path("/.well-known/matrix/client"))
.respond_with(ResponseTemplate::new(200).set_body_json(make_well_known_json(
&homeserver.uri(),
Some("https://localhost:1234"),
)))
.mount(&server)
.await;
// When building a client with the base server and a custom sliding sync proxy
// set.
let url = Url::parse("https://localhost:9012").unwrap();
builder = builder
.sliding_sync_version_builder(SlidingSyncVersionBuilder::Proxy { url: url.clone() })
.server_name_or_homeserver_url(server.uri());
let client = builder.build().await.unwrap();
// Then a client should be built and configured with the custom sliding sync
// proxy.
assert_matches!(
client.sliding_sync_version(),
SlidingSyncVersion::Proxy { url: given_url } => {
assert_eq!(url, given_url);
}
);
}
#[async_test]
async fn test_sliding_sync_discover_proxy() {
// Given a homeserver with a `.well-known` file.
let homeserver = make_mock_homeserver().await;
let mut builder = ClientBuilder::new();
let expected_url = Url::parse("https://localhost:1234").unwrap();
Mock::given(method("GET"))
.and(path("/.well-known/matrix/client"))
.respond_with(ResponseTemplate::new(200).set_body_json(make_well_known_json(
&homeserver.uri(),
Some(expected_url.as_str()),
)))
.mount(&homeserver)
.await;
// When building the client with sliding sync to auto-discover the
// proxy version.
builder = builder
.server_name_or_homeserver_url(homeserver.uri())
.sliding_sync_version_builder(SlidingSyncVersionBuilder::DiscoverProxy);
let client = builder.build().await.unwrap();
// Then, sliding sync has the correct proxy URL.
assert_matches!(
client.sliding_sync_version(),
SlidingSyncVersion::Proxy { url } => {
assert_eq!(url, expected_url);
}
);
}
#[async_test]
async fn test_sliding_sync_discover_native() {
// Given a homeserver with a `/versions` file.
@@ -1104,10 +968,7 @@ pub(crate) mod tests {
homeserver
}
fn make_well_known_json(
homeserver_url: &str,
sliding_sync_proxy_url: Option<&str>,
) -> JsonValue {
fn make_well_known_json(homeserver_url: &str) -> JsonValue {
::serde_json::Value::Object({
let mut object = ::serde_json::Map::new();
let _ = object.insert(
@@ -1117,15 +978,6 @@ pub(crate) mod tests {
}),
);
if let Some(sliding_sync_proxy_url) = sliding_sync_proxy_url {
let _ = object.insert(
"org.matrix.msc3575.proxy".into(),
json_internal!({
"url": sliding_sync_proxy_url
}),
);
}
object
})
}

View File

@@ -46,7 +46,6 @@ use crate::{
#[allow(missing_debug_implementations)]
pub struct SendRequest<R> {
pub(crate) client: Client,
pub(crate) homeserver_override: Option<String>,
pub(crate) request: R,
pub(crate) config: Option<RequestConfig>,
pub(crate) send_progress: SharedObservable<TransmissionProgress>,
@@ -67,15 +66,6 @@ impl<R> SendRequest<R> {
self
}
/// Replace this request's target (homeserver) with a custom one.
///
/// This is useful at the moment because the current sliding sync
/// implementation uses a proxy server.
pub fn with_homeserver_override(mut self, homeserver_override: Option<String>) -> Self {
self.homeserver_override = homeserver_override;
self
}
/// Use the given [`RequestConfig`] for this send request, instead of the
/// one provided by default.
pub fn with_request_config(mut self, request_config: impl Into<Option<RequestConfig>>) -> Self {
@@ -101,16 +91,11 @@ where
boxed_into_future!();
fn into_future(self) -> Self::IntoFuture {
let Self { client, request, config, send_progress, homeserver_override } = self;
let Self { client, request, config, send_progress } = self;
Box::pin(async move {
let res = Box::pin(client.send_inner(
request.clone(),
config,
homeserver_override.clone(),
send_progress.clone(),
))
.await;
let res =
Box::pin(client.send_inner(request.clone(), config, send_progress.clone())).await;
// An `M_UNKNOWN_TOKEN` error can potentially be fixed with a token refresh.
if let Err(Some(ErrorKind::UnknownToken { soft_logout })) =
@@ -172,13 +157,7 @@ where
}
} else {
trace!("Token refresh: Refresh succeeded, retrying request.");
return Box::pin(client.send_inner(
request,
config,
homeserver_override,
send_progress,
))
.await;
return Box::pin(client.send_inner(request, config, send_progress)).await;
}
}

View File

@@ -1620,7 +1620,6 @@ impl Client {
request,
config: None,
send_progress: Default::default(),
homeserver_override: None,
}
}
@@ -1628,18 +1627,13 @@ impl Client {
&self,
request: Request,
config: Option<RequestConfig>,
homeserver_override: Option<String>,
send_progress: SharedObservable<TransmissionProgress>,
) -> HttpResult<Request::IncomingResponse>
where
Request: OutgoingRequest + Debug,
HttpError: From<FromHttpResponseError<Request::EndpointError>>,
{
let homeserver = match homeserver_override {
Some(hs) => hs,
None => self.homeserver().to_string(),
};
let homeserver = self.homeserver().to_string();
let access_token = self.access_token();
self.inner

View File

@@ -261,11 +261,8 @@ impl RoomEventCacheInner {
for raw_event in account_data {
match raw_event.deserialize() {
Ok(AnyRoomAccountDataEvent::FullyRead(ev)) => {
// Sometimes the sliding sync proxy sends many duplicates of the read marker
// event. Don't forward it multiple times to avoid clutter
// the update channel.
//
// NOTE: SS proxy workaround.
// If duplicated, do not forward read marker multiple times
// to avoid clutter the update channel.
if handled_read_marker {
continue;
}

View File

@@ -30,11 +30,6 @@ To create a new Sliding Sync session, one must query an existing
Typically one configures the custom homeserver endpoint, although it's
automatically detected using the `.well-known` endpoint, if configured.
At the time of writing, no Matrix server natively supports Sliding Sync;
a sidecar called the [Sliding Sync Proxy][proxy] is needed. As that
typically runs on a separate domain, it can be configured on the
[`SlidingSyncBuilder`].
A unique identifier, less than 16 chars long, is required for each instance
of Sliding Sync, and must be provided when getting a builder:
@@ -46,7 +41,7 @@ of Sliding Sync, and must be provided when getting a builder:
# let client = Client::new(homeserver).await?;
let sliding_sync_builder = client
.sliding_sync("main-sync")?
.version(Version::Proxy { url: Url::parse("http://sliding-sync.example.org")? });
.version(Version::Native);
# anyhow::Ok(())
# };
@@ -183,9 +178,7 @@ data.
To allow for a quick startup, client might want to request only a very low
`timeline_limit` (maybe 1 or even 0) at first and update the count later on
the list or room subscription (see [reactive api](#reactive-api)), Since
`0.99.0-rc1` the [sliding sync proxy][proxy] will then "paginate back" and
resent the now larger number of events. All this is handled transparently.
the list or room subscription (see [reactive api](#reactive-api)).
## Long Polling
@@ -346,7 +339,7 @@ let full_sync_list_name = "full-sync".to_owned();
let active_list_name = "active-list".to_owned();
let sliding_sync_builder = client
.sliding_sync("main-sync")?
.version(Version::Proxy { url: Url::parse("http://sliding-sync.example.org")? }) // our proxy server
.version(Version::Native)
.with_account_data_extension(
assign!(http::request::AccountData::default(), { enabled: Some(true) }),
) // we enable the account-data extension
@@ -412,5 +405,4 @@ loop {
```
[MSC]: https://github.com/matrix-org/matrix-spec-proposals/pull/3575
[proxy]: https://github.com/matrix-org/sliding-sync
[ruma-types]: https://docs.rs/ruma/latest/ruma/api/client/sync/sync_events/v4/index.html

View File

@@ -271,7 +271,6 @@ impl SlidingSyncBuilder {
Ok(SlidingSync::new(SlidingSyncInner {
id: self.id,
version,
client,
storage_key: self.storage_key,

View File

@@ -1,22 +1,15 @@
use std::collections::BTreeMap;
use as_variant::as_variant;
use imbl::Vector;
use matrix_sdk_base::{sliding_sync::http, sync::SyncResponse, PreviousEventsProvider};
use ruma::{
api::{
client::discovery::{discover_homeserver, get_supported_versions},
MatrixVersion,
},
events::AnyToDeviceEvent,
serde::Raw,
api::client::discovery::get_supported_versions, events::AnyToDeviceEvent, serde::Raw,
OwnedRoomId,
};
use tracing::error;
use url::Url;
use super::{SlidingSync, SlidingSyncBuilder};
use crate::{config::RequestConfig, http_client::HttpClient, Client, Result, SlidingSyncRoom};
use crate::{Client, Result, SlidingSyncRoom};
/// A sliding sync version.
#[derive(Clone, Debug)]
@@ -25,25 +18,16 @@ pub enum Version {
/// example, and that the version is unknown.
None,
/// Use the version of the sliding sync proxy, i.e. MSC3575.
Proxy {
/// URL to the proxy.
url: Url,
},
/// Use the version of the sliding sync implementation inside Synapse, i.e.
/// MSC4186.
Native,
}
impl Version {
#[cfg(test)]
pub(crate) fn is_native(&self) -> bool {
matches!(self, Self::Native)
}
pub(crate) fn overriding_url(&self) -> Option<&Url> {
as_variant!(self, Self::Proxy { url } => url)
}
}
/// An error when building a version.
@@ -53,14 +37,6 @@ pub enum VersionBuilderError {
#[error("`.well-known` is not set")]
WellKnownNotSet,
/// `.well-known` does not contain a `sliding_sync_proxy` entry.
#[error("`.well-known` does not contain a `sliding_sync_proxy` entry")]
NoSlidingSyncInWellKnown,
/// The `sliding_sync_proxy` URL in .well-known` is not valid ({0}).
#[error("the `sliding_sync_proxy` URL in .well-known` is not valid ({0})")]
UnparsableSlidingSyncUrl(url::ParseError),
/// The `/versions` response is not set.
#[error("The `/versions` response is not set")]
MissingVersionsResponse,
@@ -77,20 +53,9 @@ pub enum VersionBuilder {
/// Build a [`Version::None`].
None,
/// Build a [`Version::Proxy`].
Proxy {
/// Coerced URL to the proxy.
url: Url,
},
/// Build a [`Version::Native`].
Native,
/// Build a [`Version::Proxy`] by auto-discovering it.
///
/// It is available if the server enables it via `.well-known`.
DiscoverProxy,
/// Build a [`Version::Native`] by auto-discovering it.
///
/// It is available if the server enables it via `/versions`.
@@ -104,35 +69,17 @@ impl VersionBuilder {
/// Build a [`Version`].
///
/// It can fail if auto-discovering fails, e.g. if `.well-known`
/// or `/versions` do contain invalid data.
/// It can fail if auto-discovering fails, e.g. if `/versions` do contain
/// invalid data.
pub fn build(
self,
well_known: Option<&discover_homeserver::Response>,
versions: Option<&get_supported_versions::Response>,
) -> Result<Version, VersionBuilderError> {
Ok(match self {
Self::None => Version::None,
Self::Proxy { url } => Version::Proxy { url },
Self::Native => Version::Native,
Self::DiscoverProxy => {
let Some(well_known) = well_known else {
return Err(VersionBuilderError::WellKnownNotSet);
};
let Some(info) = &well_known.sliding_sync_proxy else {
return Err(VersionBuilderError::NoSlidingSyncInWellKnown);
};
let url =
Url::parse(&info.url).map_err(VersionBuilderError::UnparsableSlidingSyncUrl)?;
Version::Proxy { url }
}
Self::DiscoverNative => {
let Some(versions) = versions else {
return Err(VersionBuilderError::MissingVersionsResponse);
@@ -156,55 +103,6 @@ impl Client {
/// If `.well-known` or `/versions` is unreachable, it will simply move
/// potential sliding sync versions aside. No error will be reported.
pub async fn available_sliding_sync_versions(&self) -> Vec<Version> {
async fn discover_homeserver(
http_client: &HttpClient,
server: Option<String>,
) -> Option<discover_homeserver::Response> {
if let Some(server) = server {
http_client
.send(
discover_homeserver::Request::new(),
Some(RequestConfig::short_retry()),
server,
None,
&[MatrixVersion::V1_0],
Default::default(),
)
.await
.ok()
} else {
None
}
}
let http_client = &self.inner.http_client;
// Discover the homeserver by using:
//
// * the server if any,
// * by using the user ID's server name (if any) with `https://`,
// * by using the user ID's server name (if any) with `http://`.
//
// Otherwise, `well_known` is `None`.
let well_known = if let Some(well_known) =
discover_homeserver(http_client, self.server().map(ToString::to_string)).await
{
Some(well_known)
} else if let Some(well_known) = discover_homeserver(
http_client,
self.user_id().map(|user_id| format!("https://{}", user_id.server_name())),
)
.await
{
Some(well_known)
} else {
discover_homeserver(
http_client,
self.user_id().map(|user_id| format!("http://{}", user_id.server_name())),
)
.await
};
let supported_versions = self.unstable_features().await.ok().map(|unstable_features| {
let mut response = get_supported_versions::Response::new(vec![]);
response.unstable_features = unstable_features;
@@ -212,11 +110,9 @@ impl Client {
response
});
[VersionBuilder::DiscoverNative, VersionBuilder::DiscoverProxy]
[VersionBuilder::DiscoverNative]
.into_iter()
.filter_map(|version_builder| {
version_builder.build(well_known.as_ref(), supported_versions.as_ref()).ok()
})
.filter_map(|version_builder| version_builder.build(supported_versions.as_ref()).ok())
.collect()
}
@@ -239,10 +135,7 @@ impl Client {
&self,
response: &http::Response,
) -> Result<SyncResponse> {
let response = self
.base_client()
.process_sliding_sync(response, &(), self.sliding_sync_version().is_native())
.await?;
let response = self.base_client().process_sliding_sync(response, &()).await?;
tracing::debug!("done processing on base_client");
self.call_sync_response_handlers(&response).await?;
@@ -306,19 +199,11 @@ impl<'a> SlidingSyncResponseProcessor<'a> {
Ok(())
}
pub async fn handle_room_response(
&mut self,
response: &http::Response,
with_msc4186: bool,
) -> Result<()> {
pub async fn handle_room_response(&mut self, response: &http::Response) -> Result<()> {
self.response = Some(
self.client
.base_client()
.process_sliding_sync(
response,
&SlidingSyncPreviousEventsProvider(self.rooms),
with_msc4186,
)
.process_sliding_sync(response, &SlidingSyncPreviousEventsProvider(self.rooms))
.await?,
);
self.post_process().await
@@ -366,101 +251,31 @@ mod tests {
use std::collections::BTreeMap;
use assert_matches::assert_matches;
use matrix_sdk_base::{notification_settings::RoomNotificationMode, SessionMeta};
use matrix_sdk_base::notification_settings::RoomNotificationMode;
use matrix_sdk_test::async_test;
use ruma::{
api::MatrixVersion, assign, owned_device_id, room_id, serde::Raw, OwnedUserId, ServerName,
};
use ruma::{assign, room_id, serde::Raw};
use serde_json::json;
use url::Url;
use wiremock::{
matchers::{method, path},
Mock, MockServer, ResponseTemplate,
Mock, ResponseTemplate,
};
use super::{discover_homeserver, get_supported_versions, Version, VersionBuilder};
use super::{get_supported_versions, Version, VersionBuilder};
use crate::{
authentication::matrix::{MatrixSession, MatrixSessionTokens},
error::Result,
sliding_sync::{http, VersionBuilderError},
test_utils::logged_in_client_with_server,
Client, SlidingSyncList, SlidingSyncMode,
SlidingSyncList, SlidingSyncMode,
};
#[test]
fn test_version_builder_none() {
assert_matches!(VersionBuilder::None.build(None, None), Ok(Version::None));
}
#[test]
fn test_version_builder_proxy() {
let expected_url = Url::parse("https://matrix.org:1234").unwrap();
assert_matches!(
VersionBuilder::Proxy { url: expected_url.clone() }.build(None, None),
Ok(Version::Proxy { url }) => {
assert_eq!(url, expected_url);
}
);
assert_matches!(VersionBuilder::None.build(None), Ok(Version::None));
}
#[test]
fn test_version_builder_native() {
assert_matches!(VersionBuilder::Native.build(None, None), Ok(Version::Native));
}
#[test]
fn test_version_builder_discover_proxy() {
let expected_url = Url::parse("https://matrix.org:1234").unwrap();
let mut response = discover_homeserver::Response::new(
discover_homeserver::HomeserverInfo::new("matrix.org".to_owned()),
);
response.sliding_sync_proxy =
Some(discover_homeserver::SlidingSyncProxyInfo::new(expected_url.to_string()));
assert_matches!(
VersionBuilder::DiscoverProxy.build(Some(&response), None),
Ok(Version::Proxy { url }) => {
assert_eq!(url, expected_url);
}
);
}
#[test]
fn test_version_builder_discover_proxy_no_well_known() {
assert_matches!(
VersionBuilder::DiscoverProxy.build(None, None),
Err(VersionBuilderError::WellKnownNotSet)
);
}
#[test]
fn test_version_builder_discover_proxy_no_sliding_sync_proxy_in_well_known() {
let mut response = discover_homeserver::Response::new(
discover_homeserver::HomeserverInfo::new("matrix-client.matrix.org".to_owned()),
);
response.sliding_sync_proxy = None; // already `None` but the test is clearer now.
assert_matches!(
VersionBuilder::DiscoverProxy.build(Some(&response), None),
Err(VersionBuilderError::NoSlidingSyncInWellKnown)
);
}
#[test]
fn test_version_builder_discover_proxy_invalid_sliding_sync_proxy_in_well_known() {
let mut response = discover_homeserver::Response::new(
discover_homeserver::HomeserverInfo::new("matrix-client.matrix.org".to_owned()),
);
response.sliding_sync_proxy =
Some(discover_homeserver::SlidingSyncProxyInfo::new("💥".to_owned()));
assert_matches!(
VersionBuilder::DiscoverProxy.build(Some(&response), None),
Err(VersionBuilderError::UnparsableSlidingSyncUrl(err)) => {
assert_eq!(err.to_string(), "relative URL without a base");
}
);
assert_matches!(VersionBuilder::Native.build(None), Ok(Version::Native));
}
#[test]
@@ -468,16 +283,13 @@ mod tests {
let mut response = get_supported_versions::Response::new(vec![]);
response.unstable_features = [("org.matrix.simplified_msc3575".to_owned(), true)].into();
assert_matches!(
VersionBuilder::DiscoverNative.build(None, Some(&response)),
Ok(Version::Native)
);
assert_matches!(VersionBuilder::DiscoverNative.build(Some(&response)), Ok(Version::Native));
}
#[test]
fn test_version_builder_discover_native_no_supported_versions() {
assert_matches!(
VersionBuilder::DiscoverNative.build(None, None),
VersionBuilder::DiscoverNative.build(None),
Err(VersionBuilderError::MissingVersionsResponse)
);
}
@@ -488,7 +300,7 @@ mod tests {
response.unstable_features = [("org.matrix.simplified_msc3575".to_owned(), false)].into();
assert_matches!(
VersionBuilder::DiscoverNative.build(None, Some(&response)),
VersionBuilder::DiscoverNative.build(Some(&response)),
Err(VersionBuilderError::NativeVersionIsUnset)
);
}
@@ -503,102 +315,6 @@ mod tests {
assert!(available_versions.is_empty());
}
#[async_test]
async fn test_available_sliding_sync_versions_proxy_with_server() {
let server = MockServer::start().await;
let homeserver = format!("https://{}/homeserver", server.address());
let proxy = format!("https://{}/sliding-sync-proxy", server.address());
Mock::given(method("GET"))
.and(path("/.well-known/matrix/client"))
.respond_with(ResponseTemplate::new(200).set_body_json(json!({
"m.homeserver": {
"base_url": homeserver,
},
"org.matrix.msc3575.proxy": {
"url": proxy,
},
})))
.mount(&server)
.await;
// The server knows the server.
let client = Client::builder()
.insecure_server_name_no_tls(
<&ServerName>::try_from(server.address().to_string().as_str()).unwrap(),
)
.server_versions([MatrixVersion::V1_0])
.build()
.await
.unwrap();
let available_versions = client.available_sliding_sync_versions().await;
// `.well-known` is available.
assert_eq!(available_versions.len(), 1);
assert_matches!(
&available_versions[0],
Version::Proxy { url } => {
assert_eq!(url, &Url::parse(&proxy).unwrap());
}
);
}
#[async_test]
async fn test_available_sliding_sync_versions_proxy_with_user_id() {
let server = MockServer::start().await;
let homeserver = format!("https://{}/homeserver", server.address());
let proxy = format!("https://{}/sliding-sync-proxy", server.address());
Mock::given(method("GET"))
.and(path("/.well-known/matrix/client"))
.respond_with(ResponseTemplate::new(200).set_body_json(json!({
"m.homeserver": {
"base_url": homeserver,
},
"org.matrix.msc3575.proxy": {
"url": proxy,
},
})))
.mount(&server)
.await;
// The client doesn't know the server.
let client = Client::builder()
.homeserver_url(homeserver)
.server_versions([MatrixVersion::V1_0])
.build()
.await
.unwrap();
// The client knows a user.
client
.matrix_auth()
.restore_session(MatrixSession {
meta: SessionMeta {
user_id: OwnedUserId::try_from(format!("@alice:{}", server.address())).unwrap(),
device_id: owned_device_id!("DEVICEID"),
},
tokens: MatrixSessionTokens {
access_token: "1234".to_owned(),
refresh_token: None,
},
})
.await
.unwrap();
let available_versions = client.available_sliding_sync_versions().await;
// `.well-known` is available.
assert_eq!(available_versions.len(), 1);
assert_matches!(
&available_versions[0],
Version::Proxy { url } => {
assert_eq!(url, &Url::parse(&proxy).unwrap());
}
);
}
#[async_test]
async fn test_available_sliding_sync_versions_native() {
let (client, server) = logged_in_client_with_server().await;

View File

@@ -25,7 +25,7 @@ mod sticky_parameters;
mod utils;
use std::{
collections::{btree_map::Entry, BTreeMap, HashSet},
collections::{btree_map::Entry, BTreeMap},
fmt::Debug,
future::Future,
sync::{Arc, RwLock as StdRwLock},
@@ -37,10 +37,7 @@ pub use client::{Version, VersionBuilder};
use futures_core::stream::Stream;
pub use matrix_sdk_base::sliding_sync::http;
use matrix_sdk_common::{deserialized_responses::TimelineEvent, executor::spawn, timer};
use ruma::{
api::{client::error::ErrorKind, OutgoingRequest},
assign, OwnedEventId, OwnedRoomId, RoomId,
};
use ruma::{api::client::error::ErrorKind, assign, OwnedRoomId, RoomId};
use serde::{Deserialize, Serialize};
use tokio::{
select,
@@ -56,7 +53,7 @@ use self::{
client::SlidingSyncResponseProcessor,
sticky_parameters::{LazyTransactionId, SlidingSyncStickyManager, StickyData},
};
use crate::{config::RequestConfig, Client, HttpError, Result};
use crate::{config::RequestConfig, Client, Result};
/// The Sliding Sync instance.
///
@@ -71,17 +68,13 @@ pub struct SlidingSync {
pub(super) struct SlidingSyncInner {
/// A unique identifier for this instance of sliding sync.
///
/// Used to distinguish different connections to the sliding sync proxy.
/// Used to distinguish different connections to sliding sync.
id: String,
/// Either an overridden sliding sync [`Version`], or one inherited from the
/// client.
version: Version,
/// The HTTP Matrix client.
client: Client,
/// Long-polling timeout that appears the sliding sync proxy request.
/// Long-polling timeout that appears in sliding sync request.
poll_timeout: Duration,
/// Extra duration for the sliding sync request to timeout. This is added to
@@ -268,7 +261,7 @@ impl SlidingSync {
#[instrument(skip_all)]
async fn handle_response(
&self,
mut sliding_sync_response: http::Response,
sliding_sync_response: http::Response,
position: &mut SlidingSyncPositionMarkers,
) -> Result<UpdateSummary, crate::Error> {
let pos = Some(sliding_sync_response.pos.clone());
@@ -277,13 +270,6 @@ impl SlidingSync {
trace!(yes = must_process_rooms_response, "Must process rooms response?");
// Compute `limited` for the SS proxy only, if we're interested in a room list
// query.
if !self.inner.version.is_native() && must_process_rooms_response {
let known_rooms = self.inner.rooms.read().await;
compute_limited(&known_rooms, &mut sliding_sync_response.rooms);
}
// Transform a Sliding Sync Response to a `SyncResponse`.
//
// We may not need the `sync_response` in the future (once `SyncResponse` will
@@ -306,15 +292,9 @@ impl SlidingSync {
}
// Only handle the room's subsection of the response, if this sliding sync was
// configured to do so. That's because even when not requesting it,
// sometimes the current (2023-07-20) proxy will forward room events
// unrelated to the current connection's parameters.
//
// NOTE: SS proxy workaround.
// configured to do so.
if must_process_rooms_response {
response_processor
.handle_room_response(&sliding_sync_response, self.inner.version.is_native())
.await?;
response_processor.handle_room_response(&sliding_sync_response).await?;
}
response_processor.process_and_take_response().await?
@@ -491,23 +471,16 @@ impl SlidingSync {
Span::current().record("pos", &pos);
// There is a non-negligible difference MSC3575 and MSC4186 in how
// the `e2ee` extension works. When the client sends a request with
// no `pos`:
//
// * MSC3575 returns all device lists updates since the last request from the
// device that asked for device lists (this works similarly to to-device
// message handling),
// * MSC4186 returns no device lists updates, as it only returns changes since
// the provided `pos` (which is `null` in this case); this is in line with
// sync v2.
// When the client sends a request with no `pos`, MSC4186 returns no device
// lists updates, as it only returns changes since the provided `pos`
// (which is `null` in this case); this is in line with sync v2.
//
// Therefore, with MSC4186, the device list cache must be marked as to be
// re-downloaded if the `since` token is `None`, otherwise it's easy to miss
// device lists updates that happened between the previous request and the new
// “initial” request.
#[cfg(feature = "e2e-encryption")]
if pos.is_none() && self.inner.version.is_native() && self.is_e2ee_enabled() {
if pos.is_none() && self.is_e2ee_enabled() {
info!("Marking all tracked users as dirty");
let olm_machine = self.inner.client.olm_machine().await;
@@ -558,33 +531,17 @@ impl SlidingSync {
/// Send a sliding sync request.
///
/// This method contains the sending logic. It takes a generic `Request`
/// because it can be an MSC4186 or an MSC3575 `Request`.
async fn send_sync_request<Request>(
/// This method contains the sending logic.
async fn send_sync_request(
&self,
request: Request,
request: http::Request,
request_config: RequestConfig,
mut position_guard: OwnedMutexGuard<SlidingSyncPositionMarkers>,
) -> Result<UpdateSummary>
where
Request: OutgoingRequest + Clone + Debug + Send + Sync + 'static,
Request::IncomingResponse: Send
+ Sync
+
// This is required to get back an MSC4186 `Response` whatever the
// `Request` type.
Into<http::Response>,
HttpError: From<ruma::api::error::FromHttpResponseError<Request::EndpointError>>,
{
) -> Result<UpdateSummary> {
debug!("Sending request");
// Prepare the request.
let request = self
.inner
.client
.send(request)
.with_request_config(request_config)
.with_homeserver_override(self.inner.version.overriding_url().map(ToString::to_string));
let request = self.inner.client.send(request).with_request_config(request_config);
// Send the request and get a response with end-to-end encryption support.
//
@@ -643,11 +600,6 @@ impl SlidingSync {
#[cfg(not(feature = "e2e-encryption"))]
let response = request.await?;
// The code manipulates `Request` and `Response` from MSC4186 because it's the
// future standard. But this function may have received a `Request` from MSC4186
// or MSC3575. We need to get back an MSC4186 `Response`.
let response = Into::<http::msc4186::Response>::into(response);
debug!("Received response");
// At this point, the request has been sent, and a response has been received.
@@ -711,19 +663,8 @@ impl SlidingSync {
let (request, request_config, position_guard) =
self.generate_sync_request(&mut LazyTransactionId::new()).await?;
// The code manipulates `Request` and `Response` from MSC4186 because it's
// the future standard (at the time of writing: 2024-09-09). Let's check if
// the generated request must be transformed into an MSC3575 `Request`.
let summaries = if !self.inner.version.is_native() {
self.send_sync_request(
Into::<http::msc3575::Request>::into(request),
request_config,
position_guard,
)
.await?
} else {
self.send_sync_request(request, request_config, position_guard).await?
};
// Send the request, kaboom.
let summaries = self.send_sync_request(request, request_config, position_guard).await?;
// Notify a new sync was received
self.inner.client.inner.sync_beat.notify(usize::MAX);
@@ -880,11 +821,6 @@ impl SlidingSync {
position_lock.pos = Some(new_pos);
}
/// Get the sliding sync version used by this instance.
pub fn version(&self) -> &Version {
&self.inner.version
}
/// Read the static extension configuration for this Sliding Sync.
///
/// Note: this is not the next content of the sticky parameters, but rightly
@@ -1015,82 +951,6 @@ impl StickyData for SlidingSyncStickyParameters {
}
}
/// As of 2023-07-13, the sliding sync proxy doesn't provide us with `limited`
/// correctly, so we cheat and "correct" it using heuristics here.
/// TODO remove this workaround as soon as support of the `limited` flag is
/// properly implemented in the open-source proxy: https://github.com/matrix-org/sliding-sync/issues/197
// NOTE: SS proxy workaround.
fn compute_limited(
local_rooms: &BTreeMap<OwnedRoomId, SlidingSyncRoom>,
remote_rooms: &mut BTreeMap<OwnedRoomId, http::response::Room>,
) {
for (room_id, remote_room) in remote_rooms {
// Only rooms marked as initially loaded are subject to the fixup.
let initial = remote_room.initial.unwrap_or(false);
if !initial {
continue;
}
if remote_room.limited {
// If the room was already marked as limited, the server knew more than we do.
continue;
}
let remote_events = &remote_room.timeline;
if remote_events.is_empty() {
trace!(?room_id, "no timeline updates in the response => not limited");
continue;
}
let Some(local_room) = local_rooms.get(room_id) else {
trace!(?room_id, "room isn't known locally => not limited");
continue;
};
let local_events = local_room.timeline_queue();
if local_events.is_empty() {
trace!(?room_id, "local timeline had no events => not limited");
continue;
}
// If the local room had some timeline events, consider it's a `limited` if
// there's absolutely no overlap between the known events and the new
// events in the timeline.
// Gather all the known event IDs. Ignore events that don't have an event ID.
let num_local_events = local_events.len();
let local_events_with_ids: HashSet<OwnedEventId> =
HashSet::from_iter(local_events.into_iter().filter_map(|event| event.event_id()));
// There's overlap if, and only if, there's at least one event in the response's
// timeline that matches an event id we've seen before.
let mut num_remote_events_missing_ids = 0;
let overlap = remote_events.iter().any(|remote_event| {
if let Some(remote_event_id) =
remote_event.get_field::<OwnedEventId>("event_id").ok().flatten()
{
local_events_with_ids.contains(&remote_event_id)
} else {
num_remote_events_missing_ids += 1;
false
}
});
remote_room.limited = !overlap;
trace!(
?room_id,
num_events_response = remote_events.len(),
num_local_events,
num_local_events_with_ids = local_events_with_ids.len(),
num_remote_events_missing_ids,
room_limited = remote_room.limited,
"done"
);
}
}
#[cfg(all(test, not(target_family = "wasm")))]
#[allow(clippy::dbg_macro)]
mod tests {
@@ -1105,7 +965,6 @@ mod tests {
use assert_matches::assert_matches;
use event_listener::Listener;
use futures_util::{future::join_all, pin_mut, StreamExt};
use matrix_sdk_common::deserialized_responses::TimelineEvent;
use matrix_sdk_test::async_test;
use ruma::{
api::client::error::ErrorKind, assign, owned_room_id, room_id, serde::Raw, uint,
@@ -1113,14 +972,13 @@ mod tests {
};
use serde::Deserialize;
use serde_json::json;
use url::Url;
use wiremock::{http::Method, Match, Mock, MockServer, Request, ResponseTemplate};
use super::{
compute_limited, http,
http,
sticky_parameters::{LazyTransactionId, SlidingSyncStickyManager},
FrozenSlidingSync, SlidingSync, SlidingSyncList, SlidingSyncListBuilder, SlidingSyncMode,
SlidingSyncRoom, SlidingSyncStickyParameters, Version,
SlidingSyncStickyParameters,
};
use crate::{
sliding_sync::cache::restore_sliding_sync_state, test_utils::logged_in_client, Result,
@@ -2212,226 +2070,6 @@ mod tests {
Ok(())
}
#[async_test]
async fn test_sliding_sync_version() -> Result<()> {
let server = MockServer::start().await;
let client = logged_in_client(Some(server.uri())).await;
// By default, sliding sync inherits its version from the client, which is
// `Native`.
{
let sync = client.sliding_sync("default")?.build().await?;
assert_matches!(sync.version(), Version::Native);
}
// Sliding sync can override the configuration from the client.
{
let url = Url::parse("https://bar.matrix/").unwrap();
let sync = client
.sliding_sync("own-proxy")?
.version(Version::Proxy { url: url.clone() })
.build()
.await?;
assert_matches!(
sync.version(),
Version::Proxy { url: given_url } => {
assert_eq!(&url, given_url);
}
);
}
// Sliding sync inherits from the client…
let url = Url::parse("https://foo.matrix/").unwrap();
client.set_sliding_sync_version(Version::Proxy { url: url.clone() });
{
// The sliding sync inherits the client's sliding sync proxy URL.
let sync = client.sliding_sync("client-proxy")?.build().await?;
assert_matches!(
sync.version(),
Version::Proxy { url: given_url } => {
assert_eq!(&url, given_url);
}
);
}
{
// …unless we override it afterwards.
let sync = client.sliding_sync("own-proxy")?.version(Version::Native).build().await?;
assert_matches!(sync.version(), Version::Native);
}
Ok(())
}
#[async_test]
async fn test_limited_flag_computation() {
let make_event = |event_id: &str| -> TimelineEvent {
TimelineEvent::new(
Raw::from_json_string(
json!({
"event_id": event_id,
"sender": "@johnmastodon:example.org",
"origin_server_ts": 1337424242,
"type": "m.room.message",
"room_id": "!meaningless:example.org",
"content": {
"body": "Hello, world!",
"msgtype": "m.text"
},
})
.to_string(),
)
.unwrap(),
)
};
let event_a = make_event("$a");
let event_b = make_event("$b");
let event_c = make_event("$c");
let event_d = make_event("$d");
let not_initial = room_id!("!croissant:example.org");
let no_overlap = room_id!("!omelette:example.org");
let partial_overlap = room_id!("!fromage:example.org");
let complete_overlap = room_id!("!baguette:example.org");
let no_remote_events = room_id!("!pain:example.org");
let no_local_events = room_id!("!crepe:example.org");
let already_limited = room_id!("!paris:example.org");
let response_timeline = vec![event_c.raw().clone(), event_d.raw().clone()];
let local_rooms = BTreeMap::from_iter([
(
// This has no events overlapping with the response timeline, hence limited, but
// it's not marked as initial in the response.
not_initial.to_owned(),
SlidingSyncRoom::new(
no_overlap.to_owned(),
None,
vec![event_a.clone(), event_b.clone()],
),
),
(
// This has no events overlapping with the response timeline, hence limited.
no_overlap.to_owned(),
SlidingSyncRoom::new(
no_overlap.to_owned(),
None,
vec![event_a.clone(), event_b.clone()],
),
),
(
// This has event_c in common with the response timeline.
partial_overlap.to_owned(),
SlidingSyncRoom::new(
partial_overlap.to_owned(),
None,
vec![event_a.clone(), event_b.clone(), event_c.clone()],
),
),
(
// This has all events in common with the response timeline.
complete_overlap.to_owned(),
SlidingSyncRoom::new(
partial_overlap.to_owned(),
None,
vec![event_c.clone(), event_d.clone()],
),
),
(
// We locally have events for this room, and receive none in the response: not
// limited.
no_remote_events.to_owned(),
SlidingSyncRoom::new(
no_remote_events.to_owned(),
None,
vec![event_c.clone(), event_d.clone()],
),
),
(
// We don't have events for this room locally, and even if the remote room contains
// some events, it's not a limited sync.
no_local_events.to_owned(),
SlidingSyncRoom::new(no_local_events.to_owned(), None, vec![]),
),
(
// Already limited, but would be marked limited if the flag wasn't ignored (same as
// partial overlap).
already_limited.to_owned(),
SlidingSyncRoom::new(
already_limited.to_owned(),
None,
vec![event_a, event_b, event_c.clone()],
),
),
]);
let mut remote_rooms = BTreeMap::from_iter([
(
not_initial.to_owned(),
assign!(http::response::Room::default(), { timeline: response_timeline }),
),
(
no_overlap.to_owned(),
assign!(http::response::Room::default(), {
initial: Some(true),
timeline: vec![event_c.raw().clone(), event_d.raw().clone()],
}),
),
(
partial_overlap.to_owned(),
assign!(http::response::Room::default(), {
initial: Some(true),
timeline: vec![event_c.raw().clone(), event_d.raw().clone()],
}),
),
(
complete_overlap.to_owned(),
assign!(http::response::Room::default(), {
initial: Some(true),
timeline: vec![event_c.raw().clone(), event_d.raw().clone()],
}),
),
(
no_remote_events.to_owned(),
assign!(http::response::Room::default(), {
initial: Some(true),
timeline: vec![],
}),
),
(
no_local_events.to_owned(),
assign!(http::response::Room::default(), {
initial: Some(true),
timeline: vec![event_c.raw().clone(), event_d.raw().clone()],
}),
),
(
already_limited.to_owned(),
assign!(http::response::Room::default(), {
initial: Some(true),
limited: true,
timeline: vec![event_c.into_raw(), event_d.into_raw()],
}),
),
]);
compute_limited(&local_rooms, &mut remote_rooms);
assert!(!remote_rooms.get(not_initial).unwrap().limited);
assert!(remote_rooms.get(no_overlap).unwrap().limited);
assert!(!remote_rooms.get(partial_overlap).unwrap().limited);
assert!(!remote_rooms.get(complete_overlap).unwrap().limited);
assert!(!remote_rooms.get(no_remote_events).unwrap().limited);
assert!(!remote_rooms.get(no_local_events).unwrap().limited);
assert!(remote_rooms.get(already_limited).unwrap().limited);
}
#[async_test]
async fn test_process_read_receipts() -> Result<()> {
let room = owned_room_id!("!pony:example.org");