mirror of
https://github.com/matrix-org/matrix-rust-sdk.git
synced 2026-05-07 15:33:45 -04:00
Merge branch 'client-arc'
This commit is contained in:
@@ -12,7 +12,7 @@
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
use std::{net::ToSocketAddrs, result::Result as StdResult};
|
||||
use std::net::ToSocketAddrs;
|
||||
|
||||
use matrix_sdk::{
|
||||
bytes::Bytes,
|
||||
@@ -168,7 +168,7 @@ mod handlers {
|
||||
_user_id: String,
|
||||
appservice: AppService,
|
||||
request: http::Request<Bytes>,
|
||||
) -> StdResult<impl warp::Reply, Rejection> {
|
||||
) -> Result<impl warp::Reply, Rejection> {
|
||||
if let Some(user_exists) = appservice.event_handler.users.lock().await.as_mut() {
|
||||
let request =
|
||||
query_user::IncomingRequest::try_from_http_request(request).map_err(Error::from)?;
|
||||
@@ -185,7 +185,7 @@ mod handlers {
|
||||
_room_id: String,
|
||||
appservice: AppService,
|
||||
request: http::Request<Bytes>,
|
||||
) -> StdResult<impl warp::Reply, Rejection> {
|
||||
) -> Result<impl warp::Reply, Rejection> {
|
||||
if let Some(room_exists) = appservice.event_handler.rooms.lock().await.as_mut() {
|
||||
let request =
|
||||
query_room::IncomingRequest::try_from_http_request(request).map_err(Error::from)?;
|
||||
@@ -202,7 +202,7 @@ mod handlers {
|
||||
_txn_id: String,
|
||||
appservice: AppService,
|
||||
request: http::Request<Bytes>,
|
||||
) -> StdResult<impl warp::Reply, Rejection> {
|
||||
) -> Result<impl warp::Reply, Rejection> {
|
||||
let incoming_transaction: ruma::api::appservice::event::push_events::v1::IncomingRequest =
|
||||
ruma::api::IncomingRequest::try_from_http_request(request).map_err(Error::from)?;
|
||||
|
||||
@@ -224,7 +224,7 @@ struct ErrorMessage {
|
||||
message: String,
|
||||
}
|
||||
|
||||
pub async fn handle_rejection(err: Rejection) -> std::result::Result<impl Reply, Rejection> {
|
||||
pub async fn handle_rejection(err: Rejection) -> Result<impl Reply, Rejection> {
|
||||
if err.find::<Unauthorized>().is_some() || err.find::<warp::reject::InvalidQuery>().is_some() {
|
||||
let code = http::StatusCode::UNAUTHORIZED;
|
||||
let message = "UNAUTHORIZED";
|
||||
|
||||
@@ -50,7 +50,7 @@ default-features = false
|
||||
features = ["sync", "fs"]
|
||||
|
||||
[dev-dependencies]
|
||||
futures = { version = "0.3.15", default-features = false }
|
||||
futures = { version = "0.3.15", default-features = false, features = ["executor"] }
|
||||
http = "0.2.4"
|
||||
matrix-sdk-test = { version = "0.4.0", path = "../matrix-sdk-test" }
|
||||
|
||||
|
||||
@@ -33,7 +33,7 @@ version = "0.1.9"
|
||||
features = ["now"]
|
||||
|
||||
[target.'cfg(target_arch = "wasm32")'.dependencies]
|
||||
futures-util = { version = "0.3.15", default-features = false }
|
||||
futures-locks = { version = "0.6.0", default-features = false }
|
||||
async-lock = "2.4.0"
|
||||
futures-util = { version = "0.3.15", default-features = false, features = ["channel"] }
|
||||
wasm-bindgen-futures = "0.4.24"
|
||||
uuid = { version = "0.8.2", default-features = false, features = ["v4", "wasm-bindgen"] }
|
||||
|
||||
@@ -1,8 +1,4 @@
|
||||
// could switch to futures-lock completely at some point, blocker:
|
||||
// https://github.com/asomers/futures-locks/issues/34
|
||||
// https://www.reddit.com/r/rust/comments/f4zldz/i_audited_3_different_implementation_of_async/
|
||||
|
||||
#[cfg(target_arch = "wasm32")]
|
||||
pub use futures_locks::{Mutex, MutexGuard, RwLock, RwLockReadGuard, RwLockWriteGuard};
|
||||
pub use async_lock::{Mutex, MutexGuard, RwLock, RwLockReadGuard, RwLockWriteGuard};
|
||||
#[cfg(not(target_arch = "wasm32"))]
|
||||
pub use tokio::sync::{Mutex, MutexGuard, RwLock, RwLockReadGuard, RwLockWriteGuard};
|
||||
|
||||
@@ -56,7 +56,7 @@ criterion = { version = "0.3.4", features = [
|
||||
"async_tokio",
|
||||
"html_reports",
|
||||
] }
|
||||
futures = { version = "0.3.15", default-features = false }
|
||||
futures = { version = "0.3.15", default-features = false, features = ["executor"] }
|
||||
http = "0.2.4"
|
||||
indoc = "1.0.3"
|
||||
matches = "0.1.8"
|
||||
|
||||
@@ -108,7 +108,7 @@ features = ["wasm-bindgen"]
|
||||
[dev-dependencies]
|
||||
anyhow = "1.0"
|
||||
dirs = "3.0.2"
|
||||
futures = { version = "0.3.15", default-features = false }
|
||||
futures = { version = "0.3.15", default-features = false, features = ["executor"] }
|
||||
lazy_static = "1.4.0"
|
||||
matches = "0.1.8"
|
||||
matrix-sdk-test = { version = "0.4.0", path = "../matrix-sdk-test" }
|
||||
|
||||
@@ -26,15 +26,14 @@ use std::{
|
||||
use anymap2::any::CloneAnySendSync;
|
||||
use dashmap::DashMap;
|
||||
use futures_core::stream::Stream;
|
||||
use futures_timer::Delay as sleep;
|
||||
use matrix_sdk_base::{
|
||||
deserialized_responses::{JoinedRoom, LeftRoom, SyncResponse},
|
||||
deserialized_responses::SyncResponse,
|
||||
media::{MediaEventContent, MediaFormat, MediaRequest, MediaThumbnailSize, MediaType},
|
||||
BaseClient, Session, Store,
|
||||
};
|
||||
use matrix_sdk_common::{
|
||||
instant::{Duration, Instant},
|
||||
locks::{Mutex, RwLock},
|
||||
locks::{Mutex, RwLock, RwLockReadGuard},
|
||||
};
|
||||
use mime::{self, Mime};
|
||||
use ruma::{
|
||||
@@ -109,27 +108,31 @@ pub enum LoopCtrl {
|
||||
/// All of the state is held in an `Arc` so the `Client` can be cloned freely.
|
||||
#[derive(Clone)]
|
||||
pub struct Client {
|
||||
pub(crate) inner: Arc<ClientInner>,
|
||||
}
|
||||
|
||||
pub(crate) struct ClientInner {
|
||||
/// The URL of the homeserver to connect to.
|
||||
homeserver: Arc<RwLock<Url>>,
|
||||
/// The underlying HTTP client.
|
||||
http_client: HttpClient,
|
||||
/// User session data.
|
||||
pub(crate) base_client: BaseClient,
|
||||
base_client: BaseClient,
|
||||
/// Locks making sure we only have one group session sharing request in
|
||||
/// flight per room.
|
||||
#[cfg(feature = "encryption")]
|
||||
pub(crate) group_session_locks: Arc<DashMap<RoomId, Arc<Mutex<()>>>>,
|
||||
pub(crate) group_session_locks: DashMap<RoomId, Arc<Mutex<()>>>,
|
||||
#[cfg(feature = "encryption")]
|
||||
/// Lock making sure we're only doing one key claim request at a time.
|
||||
pub(crate) key_claim_lock: Arc<Mutex<()>>,
|
||||
pub(crate) members_request_locks: Arc<DashMap<RoomId, Arc<Mutex<()>>>>,
|
||||
pub(crate) typing_notice_times: Arc<DashMap<RoomId, Instant>>,
|
||||
pub(crate) key_claim_lock: Mutex<()>,
|
||||
pub(crate) members_request_locks: DashMap<RoomId, Arc<Mutex<()>>>,
|
||||
pub(crate) typing_notice_times: DashMap<RoomId, Instant>,
|
||||
/// Event handlers. See `register_event_handler`.
|
||||
pub(crate) event_handlers: Arc<RwLock<EventHandlerMap>>,
|
||||
event_handlers: RwLock<EventHandlerMap>,
|
||||
/// Custom event handler context. See `register_event_handler_context`.
|
||||
pub(crate) event_handler_data: Arc<StdRwLock<AnyMap>>,
|
||||
event_handler_data: StdRwLock<AnyMap>,
|
||||
/// Notification handlers. See `register_notification_handler`.
|
||||
notification_handlers: Arc<RwLock<Vec<NotificationHandlerFn>>>,
|
||||
notification_handlers: RwLock<Vec<NotificationHandlerFn>>,
|
||||
/// Whether the client should operate in application service style mode.
|
||||
/// This is low-level functionality. For an high-level API check the
|
||||
/// `matrix_sdk_appservice` crate.
|
||||
@@ -142,7 +145,7 @@ pub struct Client {
|
||||
/// synchronization, e.g. if we send out a request to create a room, we can
|
||||
/// wait for the sync to get the data to fetch a room object from the state
|
||||
/// store.
|
||||
pub(crate) sync_beat: Arc<event_listener::Event>,
|
||||
pub(crate) sync_beat: event_listener::Event,
|
||||
}
|
||||
|
||||
#[cfg(not(tarpaulin_include))]
|
||||
@@ -186,7 +189,7 @@ impl Client {
|
||||
let http_client =
|
||||
HttpClient::new(client, homeserver.clone(), session, config.request_config);
|
||||
|
||||
Ok(Self {
|
||||
let inner = Arc::new(ClientInner {
|
||||
homeserver,
|
||||
http_client,
|
||||
base_client,
|
||||
@@ -201,8 +204,10 @@ impl Client {
|
||||
notification_handlers: Default::default(),
|
||||
appservice_mode: config.appservice_mode,
|
||||
use_discovery_response: config.use_discovery_response,
|
||||
sync_beat: event_listener::Event::new().into(),
|
||||
})
|
||||
sync_beat: event_listener::Event::new(),
|
||||
});
|
||||
|
||||
Ok(Self { inner })
|
||||
}
|
||||
|
||||
/// Create a new [`Client`] using homeserver auto discovery.
|
||||
@@ -226,12 +231,12 @@ impl Client {
|
||||
/// // First let's try to construct an user id, presumably from user input.
|
||||
/// let alice = UserId::try_from("@alice:example.org")?;
|
||||
///
|
||||
/// // Now let's try to discover the homeserver and create client object.
|
||||
/// // Now let's try to discover the homeserver and create a client object.
|
||||
/// let client = Client::new_from_user_id(&alice).await?;
|
||||
///
|
||||
/// // Finally let's try to login.
|
||||
/// client.login(alice, "password", None, None).await?;
|
||||
/// # matrix_sdk::Result::Ok(()) });
|
||||
/// # Result::<_, matrix_sdk::Error>::Ok(()) });
|
||||
/// ```
|
||||
///
|
||||
/// [spec]: https://spec.matrix.org/unstable/client-server-api/#well-known-uri
|
||||
@@ -268,6 +273,24 @@ impl Client {
|
||||
Ok(client)
|
||||
}
|
||||
|
||||
pub(crate) fn base_client(&self) -> &BaseClient {
|
||||
&self.inner.base_client
|
||||
}
|
||||
|
||||
#[cfg(feature = "encryption")]
|
||||
pub(crate) async fn olm_machine(&self) -> Option<matrix_sdk_base::crypto::OlmMachine> {
|
||||
self.base_client().olm_machine().await
|
||||
}
|
||||
|
||||
#[cfg(feature = "encryption")]
|
||||
pub(crate) async fn mark_request_as_sent(
|
||||
&self,
|
||||
request_id: &matrix_sdk_base::uuid::Uuid,
|
||||
response: impl Into<matrix_sdk_base::crypto::IncomingResponse<'_>>,
|
||||
) -> Result<(), matrix_sdk_base::Error> {
|
||||
self.base_client().mark_request_as_sent(request_id, response).await
|
||||
}
|
||||
|
||||
fn homeserver_from_user_id(user_id: &UserId) -> Result<Url> {
|
||||
let homeserver = format!("https://{}", user_id.server_name());
|
||||
#[allow(unused_mut)]
|
||||
@@ -290,7 +313,7 @@ impl Client {
|
||||
///
|
||||
/// * `homeserver_url` - The new URL to use.
|
||||
pub async fn set_homeserver(&self, homeserver_url: Url) {
|
||||
let mut homeserver = self.homeserver.write().await;
|
||||
let mut homeserver = self.inner.homeserver.write().await;
|
||||
*homeserver = homeserver_url;
|
||||
}
|
||||
|
||||
@@ -324,23 +347,23 @@ impl Client {
|
||||
|
||||
/// Is the client logged in.
|
||||
pub async fn logged_in(&self) -> bool {
|
||||
self.base_client.logged_in().await
|
||||
self.inner.base_client.logged_in().await
|
||||
}
|
||||
|
||||
/// The Homeserver of the client.
|
||||
pub async fn homeserver(&self) -> Url {
|
||||
self.homeserver.read().await.clone()
|
||||
self.inner.homeserver.read().await.clone()
|
||||
}
|
||||
|
||||
/// Get the user id of the current owner of the client.
|
||||
pub async fn user_id(&self) -> Option<UserId> {
|
||||
let session = self.base_client.session().read().await;
|
||||
let session = self.inner.base_client.session().read().await;
|
||||
session.as_ref().cloned().map(|s| s.user_id)
|
||||
}
|
||||
|
||||
/// Get the device id that identifies the current session.
|
||||
pub async fn device_id(&self) -> Option<DeviceIdBox> {
|
||||
let session = self.base_client.session().read().await;
|
||||
let session = self.inner.base_client.session().read().await;
|
||||
session.as_ref().map(|s| s.device_id.clone())
|
||||
}
|
||||
|
||||
@@ -351,7 +374,7 @@ impl Client {
|
||||
/// Can be used with [`Client::restore_login`] to restore a previously
|
||||
/// logged in session.
|
||||
pub async fn session(&self) -> Option<Session> {
|
||||
self.base_client.session().read().await.clone()
|
||||
self.inner.base_client.session().read().await.clone()
|
||||
}
|
||||
|
||||
/// Fetches the display name of the owner of the client.
|
||||
@@ -469,7 +492,7 @@ impl Client {
|
||||
|
||||
/// Get a reference to the store.
|
||||
pub fn store(&self) -> &Store {
|
||||
self.base_client.store()
|
||||
self.inner.base_client.store()
|
||||
}
|
||||
|
||||
/// Sets the mxc avatar url of the client's owner. The avatar gets unset if
|
||||
@@ -611,32 +634,41 @@ impl Client {
|
||||
<H::Future as Future>::Output: EventHandlerResult,
|
||||
{
|
||||
let event_type = H::ID.1;
|
||||
self.event_handlers.write().await.entry(H::ID).or_default().push(Box::new(move |data| {
|
||||
let maybe_fut = serde_json::from_str(data.raw.get())
|
||||
.map(|ev| handler.clone().handle_event(ev, data));
|
||||
self.inner.event_handlers.write().await.entry(H::ID).or_default().push(Box::new(
|
||||
move |data| {
|
||||
let maybe_fut = serde_json::from_str(data.raw.get())
|
||||
.map(|ev| handler.clone().handle_event(ev, data));
|
||||
|
||||
Box::pin(async move {
|
||||
match maybe_fut {
|
||||
Ok(Some(fut)) => {
|
||||
fut.await.print_error(event_type);
|
||||
}
|
||||
Ok(None) => {
|
||||
error!("Event handler for {} has an invalid context argument", event_type);
|
||||
}
|
||||
Err(e) => {
|
||||
warn!(
|
||||
"Failed to deserialize `{}` event, skipping event handler.\n\
|
||||
Box::pin(async move {
|
||||
match maybe_fut {
|
||||
Ok(Some(fut)) => {
|
||||
fut.await.print_error(event_type);
|
||||
}
|
||||
Ok(None) => {
|
||||
error!(
|
||||
"Event handler for {} has an invalid context argument",
|
||||
event_type
|
||||
);
|
||||
}
|
||||
Err(e) => {
|
||||
warn!(
|
||||
"Failed to deserialize `{}` event, skipping event handler.\n\
|
||||
Deserialization error: {}",
|
||||
event_type, e,
|
||||
);
|
||||
event_type, e,
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
})
|
||||
}));
|
||||
})
|
||||
},
|
||||
));
|
||||
|
||||
self
|
||||
}
|
||||
|
||||
pub(crate) async fn event_handlers(&self) -> RwLockReadGuard<'_, EventHandlerMap> {
|
||||
self.inner.event_handlers.read().await
|
||||
}
|
||||
|
||||
/// Add an arbitrary value for use as event handler context.
|
||||
///
|
||||
/// The value can be obtained in an event handler by adding an argument of
|
||||
@@ -678,10 +710,18 @@ impl Client {
|
||||
where
|
||||
T: Clone + Send + Sync + 'static,
|
||||
{
|
||||
self.event_handler_data.write().unwrap().insert(ctx);
|
||||
self.inner.event_handler_data.write().unwrap().insert(ctx);
|
||||
self
|
||||
}
|
||||
|
||||
pub(crate) fn event_handler_context<T>(&self) -> Option<T>
|
||||
where
|
||||
T: Clone + Send + Sync + 'static,
|
||||
{
|
||||
let map = self.inner.event_handler_data.read().unwrap();
|
||||
map.get::<T>().cloned()
|
||||
}
|
||||
|
||||
/// Register a handler for a notification.
|
||||
///
|
||||
/// Similar to [`Client::register_event_handler`], but only allows functions
|
||||
@@ -692,13 +732,19 @@ impl Client {
|
||||
H: Fn(Notification, room::Room, Client) -> Fut + Send + Sync + 'static,
|
||||
Fut: Future<Output = ()> + Send + 'static,
|
||||
{
|
||||
self.notification_handlers.write().await.push(Box::new(
|
||||
self.inner.notification_handlers.write().await.push(Box::new(
|
||||
move |notification, room, client| Box::pin((handler)(notification, room, client)),
|
||||
));
|
||||
|
||||
self
|
||||
}
|
||||
|
||||
pub(crate) async fn notification_handlers(
|
||||
&self,
|
||||
) -> RwLockReadGuard<'_, Vec<NotificationHandlerFn>> {
|
||||
self.inner.notification_handlers.read().await
|
||||
}
|
||||
|
||||
/// Get all the rooms the client knows about.
|
||||
///
|
||||
/// This will return the list of joined, invited, and left rooms.
|
||||
@@ -849,7 +895,7 @@ impl Client {
|
||||
/// "Logged in as {}, got device_id {} and access_token {}",
|
||||
/// user, response.device_id, response.access_token
|
||||
/// );
|
||||
/// # matrix_sdk::Result::Ok(()) });
|
||||
/// # Result::<_, matrix_sdk::Error>::Ok(()) });
|
||||
/// ```
|
||||
///
|
||||
/// [`restore_login`]: #method.restore_login
|
||||
@@ -1166,7 +1212,7 @@ impl Client {
|
||||
///
|
||||
/// * `response` - A successful login response.
|
||||
async fn receive_login_response(&self, response: &login::Response) -> Result<()> {
|
||||
if self.use_discovery_response {
|
||||
if self.inner.use_discovery_response {
|
||||
if let Some(well_known) = &response.well_known {
|
||||
if let Ok(homeserver) = Url::parse(&well_known.homeserver.base_url) {
|
||||
self.set_homeserver(homeserver).await;
|
||||
@@ -1174,7 +1220,7 @@ impl Client {
|
||||
}
|
||||
}
|
||||
|
||||
self.base_client.receive_login_response(response).await?;
|
||||
self.inner.base_client.receive_login_response(response).await?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
@@ -1237,7 +1283,7 @@ impl Client {
|
||||
///
|
||||
/// [`login`]: #method.login
|
||||
pub async fn restore_login(&self, session: Session) -> Result<()> {
|
||||
Ok(self.base_client.restore_login(session).await?)
|
||||
Ok(self.inner.base_client.restore_login(session).await?)
|
||||
}
|
||||
|
||||
/// Register a user to the server.
|
||||
@@ -1284,8 +1330,8 @@ impl Client {
|
||||
let homeserver = self.homeserver().await;
|
||||
info!("Registering to {}", homeserver);
|
||||
|
||||
let config = if self.appservice_mode {
|
||||
Some(self.http_client.request_config.force_auth())
|
||||
let config = if self.inner.appservice_mode {
|
||||
Some(self.inner.http_client.request_config.force_auth())
|
||||
} else {
|
||||
None
|
||||
};
|
||||
@@ -1350,14 +1396,14 @@ impl Client {
|
||||
filter_name: &str,
|
||||
definition: FilterDefinition<'_>,
|
||||
) -> Result<String> {
|
||||
if let Some(filter) = self.base_client.get_filter(filter_name).await? {
|
||||
if let Some(filter) = self.inner.base_client.get_filter(filter_name).await? {
|
||||
Ok(filter)
|
||||
} else {
|
||||
let user_id = self.user_id().await.ok_or(Error::AuthenticationRequired)?;
|
||||
let request = FilterUploadRequest::new(&user_id, definition);
|
||||
let response = self.send(request, None).await?;
|
||||
|
||||
self.base_client.receive_filter_upload(filter_name, &response).await?;
|
||||
self.inner.base_client.receive_filter_upload(filter_name, &response).await?;
|
||||
|
||||
Ok(response.filter_id)
|
||||
}
|
||||
@@ -1511,7 +1557,7 @@ impl Client {
|
||||
/// for room in response.chunk {
|
||||
/// println!("Found room {:?}", room);
|
||||
/// }
|
||||
/// # matrix_sdk::Result::Ok(()) });
|
||||
/// # Result::<_, matrix_sdk::Error>::Ok(()) });
|
||||
/// ```
|
||||
pub async fn public_rooms_filtered(
|
||||
&self,
|
||||
@@ -1569,8 +1615,8 @@ impl Client {
|
||||
content_type: Some(content_type.essence_str()),
|
||||
});
|
||||
|
||||
let request_config = self.http_client.request_config.timeout(timeout);
|
||||
Ok(self.http_client.upload(request, Some(request_config)).await?)
|
||||
let request_config = self.inner.http_client.request_config.timeout(timeout);
|
||||
Ok(self.inner.http_client.upload(request, Some(request_config)).await?)
|
||||
}
|
||||
|
||||
/// Send an arbitrary request to the server, without updating client state.
|
||||
@@ -1611,7 +1657,7 @@ impl Client {
|
||||
///
|
||||
/// // Check the corresponding Response struct to find out what types are
|
||||
/// // returned
|
||||
/// # matrix_sdk::Result::Ok(()) });
|
||||
/// # Result::<_, matrix_sdk::Error>::Ok(()) });
|
||||
/// ```
|
||||
pub async fn send<Request>(
|
||||
&self,
|
||||
@@ -1622,7 +1668,7 @@ impl Client {
|
||||
Request: OutgoingRequest + Debug,
|
||||
HttpError: From<FromHttpResponseError<Request::EndpointError>>,
|
||||
{
|
||||
Ok(self.http_client.send(request, config).await?)
|
||||
Ok(self.inner.http_client.send(request, config).await?)
|
||||
}
|
||||
|
||||
/// Get information of all our own devices.
|
||||
@@ -1646,7 +1692,7 @@ impl Client {
|
||||
/// device.display_name.as_deref().unwrap_or("")
|
||||
/// );
|
||||
/// }
|
||||
/// # matrix_sdk::Result::Ok(()) });
|
||||
/// # Result::<_, matrix_sdk::Error>::Ok(()) });
|
||||
/// ```
|
||||
pub async fn devices(&self) -> HttpResult<get_devices::Response> {
|
||||
let request = get_devices::Request::new();
|
||||
@@ -1699,7 +1745,7 @@ impl Client {
|
||||
/// .await?;
|
||||
/// }
|
||||
/// }
|
||||
/// # matrix_sdk::Result::Ok(()) });
|
||||
/// # Result::<_, matrix_sdk::Error>::Ok(()) });
|
||||
pub async fn delete_devices(
|
||||
&self,
|
||||
devices: &[DeviceIdBox],
|
||||
@@ -1711,135 +1757,6 @@ impl Client {
|
||||
self.send(request, None).await
|
||||
}
|
||||
|
||||
pub(crate) async fn process_sync(
|
||||
&self,
|
||||
response: sync_events::Response,
|
||||
) -> Result<SyncResponse> {
|
||||
let response = self.base_client.receive_sync_response(response).await?;
|
||||
let SyncResponse {
|
||||
next_batch: _,
|
||||
rooms,
|
||||
presence,
|
||||
account_data,
|
||||
to_device: _,
|
||||
device_lists: _,
|
||||
device_one_time_keys_count: _,
|
||||
ambiguity_changes: _,
|
||||
notifications,
|
||||
} = &response;
|
||||
|
||||
self.handle_sync_events(EventKind::GlobalAccountData, &None, &account_data.events).await?;
|
||||
self.handle_sync_events(EventKind::Presence, &None, &presence.events).await?;
|
||||
|
||||
for (room_id, room_info) in &rooms.join {
|
||||
let room = self.get_room(room_id);
|
||||
if room.is_none() {
|
||||
error!("Can't call event handler, room {} not found", room_id);
|
||||
continue;
|
||||
}
|
||||
|
||||
let JoinedRoom { unread_notifications: _, timeline, state, account_data, ephemeral } =
|
||||
room_info;
|
||||
|
||||
self.handle_sync_events(EventKind::EphemeralRoomData, &room, &ephemeral.events).await?;
|
||||
self.handle_sync_events(EventKind::RoomAccountData, &room, &account_data.events)
|
||||
.await?;
|
||||
self.handle_sync_state_events(&room, &state.events).await?;
|
||||
self.handle_sync_timeline_events(&room, &timeline.events).await?;
|
||||
}
|
||||
|
||||
for (room_id, room_info) in &rooms.leave {
|
||||
let room = self.get_room(room_id);
|
||||
if room.is_none() {
|
||||
error!("Can't call event handler, room {} not found", room_id);
|
||||
continue;
|
||||
}
|
||||
|
||||
let LeftRoom { timeline, state, account_data } = room_info;
|
||||
|
||||
self.handle_sync_events(EventKind::RoomAccountData, &room, &account_data.events)
|
||||
.await?;
|
||||
self.handle_sync_state_events(&room, &state.events).await?;
|
||||
self.handle_sync_timeline_events(&room, &timeline.events).await?;
|
||||
}
|
||||
|
||||
for (room_id, room_info) in &rooms.invite {
|
||||
let room = self.get_room(room_id);
|
||||
if room.is_none() {
|
||||
error!("Can't call event handler, room {} not found", room_id);
|
||||
continue;
|
||||
}
|
||||
|
||||
// FIXME: Destructure room_info
|
||||
self.handle_sync_events(
|
||||
EventKind::StrippedState,
|
||||
&room,
|
||||
&room_info.invite_state.events,
|
||||
)
|
||||
.await?;
|
||||
}
|
||||
|
||||
// Construct notification event handler futures
|
||||
let mut futures = Vec::new();
|
||||
for handler in &*self.notification_handlers.read().await {
|
||||
for (room_id, room_notifications) in notifications {
|
||||
let room = match self.get_room(room_id) {
|
||||
Some(room) => room,
|
||||
None => {
|
||||
warn!("Can't call notification handler, room {} not found", room_id);
|
||||
continue;
|
||||
}
|
||||
};
|
||||
|
||||
futures.extend(room_notifications.iter().map(|notification| {
|
||||
(handler)(notification.clone(), room.clone(), self.clone())
|
||||
}));
|
||||
}
|
||||
}
|
||||
|
||||
// Run the notification handler futures with the
|
||||
// `self.notification_handlers` lock no longer being held, in order.
|
||||
for fut in futures {
|
||||
fut.await;
|
||||
}
|
||||
|
||||
Ok(response)
|
||||
}
|
||||
|
||||
async fn sync_loop_helper(
|
||||
&self,
|
||||
sync_settings: &mut crate::config::SyncSettings<'_>,
|
||||
) -> Result<SyncResponse> {
|
||||
let response = self.sync_once(sync_settings.clone()).await;
|
||||
|
||||
match response {
|
||||
Ok(r) => {
|
||||
sync_settings.token = Some(r.next_batch.clone());
|
||||
Ok(r)
|
||||
}
|
||||
Err(e) => {
|
||||
error!("Received an invalid response: {}", e);
|
||||
sleep::new(Duration::from_secs(1)).await;
|
||||
Err(e)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
async fn delay_sync(last_sync_time: &mut Option<Instant>) {
|
||||
let now = Instant::now();
|
||||
|
||||
// If the last sync happened less than a second ago, sleep for a
|
||||
// while to not hammer out requests if the server doesn't respect
|
||||
// the sync timeout.
|
||||
if let Some(t) = last_sync_time {
|
||||
if now - *t <= Duration::from_secs(1) {
|
||||
sleep::new(Duration::from_secs(1)).await;
|
||||
}
|
||||
}
|
||||
|
||||
*last_sync_time = Some(now);
|
||||
}
|
||||
|
||||
/// Synchronize the client's state with the latest state on the server.
|
||||
///
|
||||
/// ## Syncing Events
|
||||
@@ -1917,7 +1834,7 @@ impl Client {
|
||||
/// // Now keep on syncing forever. `sync()` will use the stored sync token
|
||||
/// // from our `sync_once()` call automatically.
|
||||
/// client.sync(SyncSettings::default()).await;
|
||||
/// # matrix_sdk::Result::Ok(()) });
|
||||
/// # Result::<_, matrix_sdk::Error>::Ok(()) });
|
||||
/// ```
|
||||
///
|
||||
/// [`sync`]: #method.sync
|
||||
@@ -1944,9 +1861,9 @@ impl Client {
|
||||
timeout: sync_settings.timeout,
|
||||
});
|
||||
|
||||
let request_config = self.http_client.request_config.timeout(
|
||||
let request_config = self.inner.http_client.request_config.timeout(
|
||||
sync_settings.timeout.unwrap_or_else(|| Duration::from_secs(0))
|
||||
+ self.http_client.request_config.timeout,
|
||||
+ self.inner.http_client.request_config.timeout,
|
||||
);
|
||||
|
||||
let response = self.send(request, Some(request_config)).await?;
|
||||
@@ -1957,7 +1874,7 @@ impl Client {
|
||||
error!(error =? e, "Error while sending outgoing E2EE requests");
|
||||
};
|
||||
|
||||
self.sync_beat.notify(usize::MAX);
|
||||
self.inner.sync_beat.notify(usize::MAX);
|
||||
|
||||
Ok(response)
|
||||
}
|
||||
@@ -2008,7 +1925,7 @@ impl Client {
|
||||
/// // Now keep on syncing forever. `sync()` will use the latest sync token
|
||||
/// // automatically.
|
||||
/// client.sync(SyncSettings::default()).await;
|
||||
/// # matrix_sdk::Result::Ok(()) });
|
||||
/// # Result::<_, matrix_sdk::Error>::Ok(()) });
|
||||
/// ```
|
||||
///
|
||||
/// [argument docs]: #method.sync_once
|
||||
@@ -2137,7 +2054,7 @@ impl Client {
|
||||
/// }
|
||||
/// }
|
||||
///
|
||||
/// # matrix_sdk::Result::Ok(()) });
|
||||
/// # Result::<_, matrix_sdk::Error>::Ok(()) });
|
||||
/// ```
|
||||
#[instrument]
|
||||
pub async fn sync_stream<'a>(
|
||||
@@ -2162,7 +2079,7 @@ impl Client {
|
||||
/// Get the current, if any, sync token of the client.
|
||||
/// This will be None if the client didn't sync at least once.
|
||||
pub async fn sync_token(&self) -> Option<String> {
|
||||
self.base_client.sync_token().await
|
||||
self.inner.base_client.sync_token().await
|
||||
}
|
||||
|
||||
/// Get a media file's content.
|
||||
@@ -2181,7 +2098,7 @@ impl Client {
|
||||
use_cache: bool,
|
||||
) -> Result<Vec<u8>> {
|
||||
let content = if use_cache {
|
||||
self.base_client.store().get_media_content(request).await?
|
||||
self.inner.base_client.store().get_media_content(request).await?
|
||||
} else {
|
||||
None
|
||||
};
|
||||
@@ -2225,7 +2142,7 @@ impl Client {
|
||||
};
|
||||
|
||||
if use_cache {
|
||||
self.base_client.store().add_media_content(request, content.clone()).await?;
|
||||
self.inner.base_client.store().add_media_content(request, content.clone()).await?;
|
||||
}
|
||||
|
||||
Ok(content)
|
||||
@@ -2238,7 +2155,7 @@ impl Client {
|
||||
///
|
||||
/// * `request` - The `MediaRequest` of the content.
|
||||
pub async fn remove_media_content(&self, request: &MediaRequest) -> Result<()> {
|
||||
Ok(self.base_client.store().remove_media_content(request).await?)
|
||||
Ok(self.inner.base_client.store().remove_media_content(request).await?)
|
||||
}
|
||||
|
||||
/// Delete all the media content corresponding to the given
|
||||
@@ -2248,7 +2165,7 @@ impl Client {
|
||||
///
|
||||
/// * `uri` - The `MxcUri` of the files.
|
||||
pub async fn remove_media_content_for_uri(&self, uri: &MxcUri) -> Result<()> {
|
||||
Ok(self.base_client.store().remove_media_content_for_uri(uri).await?)
|
||||
Ok(self.inner.base_client.store().remove_media_content_for_uri(uri).await?)
|
||||
}
|
||||
|
||||
/// Get the file of the given media event content.
|
||||
@@ -2767,7 +2684,7 @@ pub(crate) mod test {
|
||||
.add_state_event(EventsJson::PowerLevels)
|
||||
.build_sync_response();
|
||||
|
||||
client.base_client.receive_sync_response(response).await.unwrap();
|
||||
client.inner.base_client.receive_sync_response(response).await.unwrap();
|
||||
let room_id = room_id!("!SVkFJHzfwvuaIEawgC:localhost");
|
||||
|
||||
assert_eq!(client.homeserver().await, Url::parse(&mockito::server_url()).unwrap());
|
||||
|
||||
@@ -89,7 +89,7 @@ impl ClientConfig {
|
||||
/// let client_config = ClientConfig::new()
|
||||
/// .proxy("http://localhost:8080")?;
|
||||
///
|
||||
/// # matrix_sdk::Result::Ok(())
|
||||
/// # Result::<_, matrix_sdk::Error>::Ok(())
|
||||
/// ```
|
||||
#[cfg(not(target_arch = "wasm32"))]
|
||||
pub fn proxy(mut self, proxy: &str) -> Result<Self> {
|
||||
@@ -104,7 +104,7 @@ impl ClientConfig {
|
||||
}
|
||||
|
||||
/// Set a custom HTTP user agent for the client.
|
||||
pub fn user_agent(mut self, user_agent: &str) -> std::result::Result<Self, InvalidHeaderValue> {
|
||||
pub fn user_agent(mut self, user_agent: &str) -> Result<Self, InvalidHeaderValue> {
|
||||
self.user_agent = Some(HeaderValue::from_str(user_agent)?);
|
||||
Ok(self)
|
||||
}
|
||||
|
||||
@@ -12,7 +12,7 @@
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
use std::{ops::Deref, result::Result as StdResult};
|
||||
use std::ops::Deref;
|
||||
|
||||
use matrix_sdk_base::crypto::{
|
||||
store::CryptoStoreError, Device as BaseDevice, LocalTrust, ReadOnlyDevice,
|
||||
@@ -250,7 +250,7 @@ impl Device {
|
||||
/// }
|
||||
/// # anyhow::Result::<()>::Ok(()) });
|
||||
/// ```
|
||||
pub async fn verify(&self) -> std::result::Result<(), ManualVerifyError> {
|
||||
pub async fn verify(&self) -> Result<(), ManualVerifyError> {
|
||||
let request = self.inner.verify().await?;
|
||||
self.client.send(request, None).await?;
|
||||
|
||||
@@ -391,10 +391,7 @@ impl Device {
|
||||
/// # Arguments
|
||||
///
|
||||
/// * `trust_state` - The new trust state that should be set for the device.
|
||||
pub async fn set_local_trust(
|
||||
&self,
|
||||
trust_state: LocalTrust,
|
||||
) -> StdResult<(), CryptoStoreError> {
|
||||
pub async fn set_local_trust(&self, trust_state: LocalTrust) -> Result<(), CryptoStoreError> {
|
||||
self.inner.set_local_trust(trust_state).await
|
||||
}
|
||||
}
|
||||
|
||||
@@ -12,7 +12,7 @@
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
use std::{result::Result, sync::Arc};
|
||||
use std::sync::Arc;
|
||||
|
||||
use matrix_sdk_base::{
|
||||
crypto::{
|
||||
|
||||
@@ -295,7 +295,7 @@ impl Client {
|
||||
/// called the fingerprint of the device.
|
||||
#[cfg(feature = "encryption")]
|
||||
pub async fn ed25519_key(&self) -> Option<String> {
|
||||
self.base_client.olm_machine().await.map(|o| o.identity_keys().ed25519().to_owned())
|
||||
self.olm_machine().await.map(|o| o.identity_keys().ed25519().to_owned())
|
||||
}
|
||||
|
||||
/// Get the status of the private cross signing keys.
|
||||
@@ -304,7 +304,7 @@ impl Client {
|
||||
/// stored locally.
|
||||
#[cfg(feature = "encryption")]
|
||||
pub async fn cross_signing_status(&self) -> Option<CrossSigningStatus> {
|
||||
if let Some(machine) = self.base_client.olm_machine().await {
|
||||
if let Some(machine) = self.olm_machine().await {
|
||||
Some(machine.cross_signing_status().await)
|
||||
} else {
|
||||
None
|
||||
@@ -317,13 +317,13 @@ impl Client {
|
||||
/// capable devices up to date.
|
||||
#[cfg(feature = "encryption")]
|
||||
pub async fn tracked_users(&self) -> HashSet<UserId> {
|
||||
self.base_client.olm_machine().await.map(|o| o.tracked_users()).unwrap_or_default()
|
||||
self.olm_machine().await.map(|o| o.tracked_users()).unwrap_or_default()
|
||||
}
|
||||
|
||||
/// Get a verification object with the given flow id.
|
||||
#[cfg(feature = "encryption")]
|
||||
pub async fn get_verification(&self, user_id: &UserId, flow_id: &str) -> Option<Verification> {
|
||||
let olm = self.base_client.olm_machine().await?;
|
||||
let olm = self.olm_machine().await?;
|
||||
olm.get_verification(user_id, flow_id).map(|v| match v {
|
||||
matrix_sdk_base::crypto::Verification::SasV1(s) => {
|
||||
SasVerification { inner: s, client: self.clone() }.into()
|
||||
@@ -343,7 +343,7 @@ impl Client {
|
||||
user_id: &UserId,
|
||||
flow_id: impl AsRef<str>,
|
||||
) -> Option<VerificationRequest> {
|
||||
let olm = self.base_client.olm_machine().await?;
|
||||
let olm = self.olm_machine().await?;
|
||||
|
||||
olm.get_verification_request(user_id, flow_id)
|
||||
.map(|r| VerificationRequest { inner: r, client: self.clone() })
|
||||
@@ -388,7 +388,7 @@ impl Client {
|
||||
user_id: &UserId,
|
||||
device_id: &DeviceId,
|
||||
) -> StdResult<Option<Device>, CryptoStoreError> {
|
||||
let device = self.base_client.get_device(user_id, device_id).await?;
|
||||
let device = self.base_client().get_device(user_id, device_id).await?;
|
||||
|
||||
Ok(device.map(|d| Device { inner: d, client: self.clone() }))
|
||||
}
|
||||
@@ -425,7 +425,7 @@ impl Client {
|
||||
&self,
|
||||
user_id: &UserId,
|
||||
) -> StdResult<UserDevices, CryptoStoreError> {
|
||||
let devices = self.base_client.get_user_devices(user_id).await?;
|
||||
let devices = self.base_client().get_user_devices(user_id).await?;
|
||||
|
||||
Ok(UserDevices { inner: devices, client: self.clone() })
|
||||
}
|
||||
@@ -468,7 +468,7 @@ impl Client {
|
||||
) -> StdResult<Option<crate::encryption::identities::UserIdentity>, CryptoStoreError> {
|
||||
use crate::encryption::identities::UserIdentity;
|
||||
|
||||
if let Some(olm) = self.base_client.olm_machine().await {
|
||||
if let Some(olm) = self.olm_machine().await {
|
||||
let identity = olm.get_identity(user_id).await?;
|
||||
|
||||
Ok(identity.map(|i| match i {
|
||||
@@ -526,7 +526,7 @@ impl Client {
|
||||
/// # anyhow::Result::<()>::Ok(()) });
|
||||
#[cfg(feature = "encryption")]
|
||||
pub async fn bootstrap_cross_signing(&self, auth_data: Option<AuthData<'_>>) -> Result<()> {
|
||||
let olm = self.base_client.olm_machine().await.ok_or(Error::AuthenticationRequired)?;
|
||||
let olm = self.olm_machine().await.ok_or(Error::AuthenticationRequired)?;
|
||||
|
||||
let (request, signature_request) = olm.bootstrap_cross_signing(false).await?;
|
||||
|
||||
@@ -601,7 +601,7 @@ impl Client {
|
||||
passphrase: &str,
|
||||
predicate: impl FnMut(&matrix_sdk_base::crypto::olm::InboundGroupSession) -> bool,
|
||||
) -> Result<()> {
|
||||
let olm = self.base_client.olm_machine().await.ok_or(Error::AuthenticationRequired)?;
|
||||
let olm = self.olm_machine().await.ok_or(Error::AuthenticationRequired)?;
|
||||
|
||||
let keys = olm.export_keys(predicate).await?;
|
||||
let passphrase = zeroize::Zeroizing::new(passphrase.to_owned());
|
||||
@@ -661,7 +661,7 @@ impl Client {
|
||||
path: PathBuf,
|
||||
passphrase: &str,
|
||||
) -> StdResult<RoomKeyImportResult, RoomKeyImportError> {
|
||||
let olm = self.base_client.olm_machine().await.ok_or(RoomKeyImportError::StoreClosed)?;
|
||||
let olm = self.olm_machine().await.ok_or(RoomKeyImportError::StoreClosed)?;
|
||||
let passphrase = zeroize::Zeroizing::new(passphrase.to_owned());
|
||||
|
||||
let decrypt = move || {
|
||||
@@ -684,7 +684,7 @@ impl Client {
|
||||
) -> serde_json::Result<RoomEvent> {
|
||||
use ruma::serde::JsonObject;
|
||||
|
||||
if let Some(machine) = self.base_client.olm_machine().await {
|
||||
if let Some(machine) = self.olm_machine().await {
|
||||
if let AnyRoomEvent::Message(event) = event {
|
||||
if let AnyMessageEvent::RoomEncrypted(_) = event {
|
||||
let room_id = event.room_id();
|
||||
@@ -727,7 +727,7 @@ impl Client {
|
||||
let request = assign!(get_keys::Request::new(), { device_keys });
|
||||
|
||||
let response = self.send(request, None).await?;
|
||||
self.base_client.mark_request_as_sent(request_id, &response).await?;
|
||||
self.mark_request_as_sent(request_id, &response).await?;
|
||||
|
||||
Ok(response)
|
||||
}
|
||||
@@ -844,7 +844,7 @@ impl Client {
|
||||
if let Some(room) = self.get_joined_room(&response.room_id) {
|
||||
Ok(Some(room))
|
||||
} else {
|
||||
self.sync_beat.listen().wait_timeout(SYNC_WAIT_TIME);
|
||||
self.inner.sync_beat.listen().wait_timeout(SYNC_WAIT_TIME);
|
||||
Ok(self.get_joined_room(&response.room_id))
|
||||
}
|
||||
}
|
||||
@@ -860,11 +860,11 @@ impl Client {
|
||||
&self,
|
||||
users: impl Iterator<Item = &UserId>,
|
||||
) -> Result<()> {
|
||||
let _lock = self.key_claim_lock.lock().await;
|
||||
let _lock = self.inner.key_claim_lock.lock().await;
|
||||
|
||||
if let Some((request_id, request)) = self.base_client.get_missing_sessions(users).await? {
|
||||
if let Some((request_id, request)) = self.base_client().get_missing_sessions(users).await? {
|
||||
let response = self.send(request, None).await?;
|
||||
self.base_client.mark_request_as_sent(&request_id, &response).await?;
|
||||
self.mark_request_as_sent(&request_id, &response).await?;
|
||||
}
|
||||
|
||||
Ok(())
|
||||
@@ -893,7 +893,7 @@ impl Client {
|
||||
);
|
||||
|
||||
let response = self.send(request.clone(), None).await?;
|
||||
self.base_client.mark_request_as_sent(request_id, &response).await?;
|
||||
self.mark_request_as_sent(request_id, &response).await?;
|
||||
|
||||
Ok(response)
|
||||
}
|
||||
@@ -971,23 +971,23 @@ impl Client {
|
||||
}
|
||||
OutgoingRequests::ToDeviceRequest(request) => {
|
||||
let response = self.send_to_device(request).await?;
|
||||
self.base_client.mark_request_as_sent(r.request_id(), &response).await?;
|
||||
self.mark_request_as_sent(r.request_id(), &response).await?;
|
||||
}
|
||||
OutgoingRequests::SignatureUpload(request) => {
|
||||
let response = self.send(request.clone(), None).await?;
|
||||
self.base_client.mark_request_as_sent(r.request_id(), &response).await?;
|
||||
self.mark_request_as_sent(r.request_id(), &response).await?;
|
||||
}
|
||||
OutgoingRequests::RoomMessage(request) => {
|
||||
let response = self.room_send_helper(request).await?;
|
||||
self.base_client.mark_request_as_sent(r.request_id(), &response).await?;
|
||||
self.mark_request_as_sent(r.request_id(), &response).await?;
|
||||
}
|
||||
OutgoingRequests::KeysClaim(request) => {
|
||||
let response = self.send(request.clone(), None).await?;
|
||||
self.base_client.mark_request_as_sent(r.request_id(), &response).await?;
|
||||
self.mark_request_as_sent(r.request_id(), &response).await?;
|
||||
}
|
||||
OutgoingRequests::KeysBackup(request) => {
|
||||
let response = self.send_backup_request(request).await?;
|
||||
self.base_client.mark_request_as_sent(r.request_id(), &response).await?;
|
||||
self.mark_request_as_sent(r.request_id(), &response).await?;
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1015,7 +1015,7 @@ impl Client {
|
||||
warn!("Error while claiming one-time keys {:?}", e);
|
||||
}
|
||||
|
||||
let outgoing_requests = stream::iter(self.base_client.outgoing_requests().await?)
|
||||
let outgoing_requests = stream::iter(self.base_client().outgoing_requests().await?)
|
||||
.map(|r| self.send_outgoing_request(r));
|
||||
|
||||
let requests = outgoing_requests.buffer_unordered(MAX_CONCURRENT_REQUESTS);
|
||||
|
||||
@@ -71,7 +71,7 @@ impl QrVerification {
|
||||
///
|
||||
/// The [`to_bytes()`](#method.to_bytes) method can be used to instead
|
||||
/// output the raw bytes that should be encoded as a QR code.
|
||||
pub fn to_qr_code(&self) -> std::result::Result<QrCode, EncodingError> {
|
||||
pub fn to_qr_code(&self) -> Result<QrCode, EncodingError> {
|
||||
self.inner.to_qr_code()
|
||||
}
|
||||
|
||||
@@ -80,7 +80,7 @@ impl QrVerification {
|
||||
///
|
||||
/// The [`to_qr_code()`](#method.to_qr_code) method can be used to instead
|
||||
/// output a `QrCode` object that can be rendered.
|
||||
pub fn to_bytes(&self) -> std::result::Result<Vec<u8>, EncodingError> {
|
||||
pub fn to_bytes(&self) -> Result<Vec<u8>, EncodingError> {
|
||||
self.inner.to_bytes()
|
||||
}
|
||||
|
||||
|
||||
@@ -40,7 +40,7 @@ use thiserror::Error;
|
||||
use url::ParseError as UrlParseError;
|
||||
|
||||
/// Result type of the matrix-sdk.
|
||||
pub type Result<T> = std::result::Result<T, Error>;
|
||||
pub type Result<T, E = Error> = std::result::Result<T, E>;
|
||||
|
||||
/// Result type of a pure HTTP request.
|
||||
pub type HttpResult<T> = std::result::Result<T, HttpError>;
|
||||
|
||||
@@ -186,8 +186,7 @@ pub struct Ctx<T>(pub T);
|
||||
|
||||
impl<T: Clone + Send + Sync + 'static> EventHandlerContext for Ctx<T> {
|
||||
fn from_data(data: &EventHandlerData<'_>) -> Option<Self> {
|
||||
let anymap = data.client.event_handler_data.read().unwrap();
|
||||
Some(Ctx(anymap.get::<T>()?.clone()))
|
||||
data.client.event_handler_context::<T>().map(Ctx)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -330,8 +329,7 @@ impl Client {
|
||||
|
||||
// Construct event handler futures
|
||||
let futures: Vec<_> = self
|
||||
.event_handlers
|
||||
.read()
|
||||
.event_handlers()
|
||||
.await
|
||||
.get(&event_handler_id)
|
||||
.into_iter()
|
||||
|
||||
@@ -53,8 +53,8 @@ pub mod event_handler;
|
||||
mod http_client;
|
||||
/// High-level room API
|
||||
pub mod room;
|
||||
/// High-level room API
|
||||
mod room_member;
|
||||
mod sync;
|
||||
|
||||
#[cfg(feature = "encryption")]
|
||||
pub mod encryption;
|
||||
|
||||
@@ -168,14 +168,17 @@ impl Common {
|
||||
|
||||
pub(crate) async fn request_members(&self) -> Result<Option<MembersResponse>> {
|
||||
if let Some(mutex) =
|
||||
self.client.members_request_locks.get(self.inner.room_id()).map(|m| m.clone())
|
||||
self.client.inner.members_request_locks.get(self.inner.room_id()).map(|m| m.clone())
|
||||
{
|
||||
mutex.lock().await;
|
||||
|
||||
Ok(None)
|
||||
} else {
|
||||
let mutex = Arc::new(Mutex::new(()));
|
||||
self.client.members_request_locks.insert(self.inner.room_id().clone(), mutex.clone());
|
||||
self.client
|
||||
.inner
|
||||
.members_request_locks
|
||||
.insert(self.inner.room_id().clone(), mutex.clone());
|
||||
|
||||
let _guard = mutex.lock().await;
|
||||
|
||||
@@ -183,9 +186,9 @@ impl Common {
|
||||
let response = self.client.send(request, None).await?;
|
||||
|
||||
let response =
|
||||
self.client.base_client.receive_members(self.inner.room_id(), &response).await?;
|
||||
self.client.base_client().receive_members(self.inner.room_id(), &response).await?;
|
||||
|
||||
self.client.members_request_locks.remove(self.inner.room_id());
|
||||
self.client.inner.members_request_locks.remove(self.inner.room_id());
|
||||
|
||||
Ok(Some(response))
|
||||
}
|
||||
|
||||
@@ -170,37 +170,39 @@ impl Joined {
|
||||
/// if let Some(room) = client.get_joined_room(&room_id) {
|
||||
/// room.typing_notice(true).await?
|
||||
/// }
|
||||
/// # matrix_sdk::Result::Ok(()) });
|
||||
/// # Result::<_, matrix_sdk::Error>::Ok(()) });
|
||||
/// ```
|
||||
pub async fn typing_notice(&self, typing: bool) -> Result<()> {
|
||||
// Only send a request to the homeserver if the old timeout has elapsed
|
||||
// or the typing notice changed state within the
|
||||
// TYPING_NOTICE_TIMEOUT
|
||||
let send =
|
||||
if let Some(typing_time) = self.client.typing_notice_times.get(self.inner.room_id()) {
|
||||
if typing_time.elapsed() > TYPING_NOTICE_RESEND_TIMEOUT {
|
||||
// We always reactivate the typing notice if typing is true or
|
||||
// we may need to deactivate it if it's
|
||||
// currently active if typing is false
|
||||
typing || typing_time.elapsed() <= TYPING_NOTICE_TIMEOUT
|
||||
} else {
|
||||
// Only send a request when we need to deactivate typing
|
||||
!typing
|
||||
}
|
||||
let send = if let Some(typing_time) =
|
||||
self.client.inner.typing_notice_times.get(self.inner.room_id())
|
||||
{
|
||||
if typing_time.elapsed() > TYPING_NOTICE_RESEND_TIMEOUT {
|
||||
// We always reactivate the typing notice if typing is true or
|
||||
// we may need to deactivate it if it's
|
||||
// currently active if typing is false
|
||||
typing || typing_time.elapsed() <= TYPING_NOTICE_TIMEOUT
|
||||
} else {
|
||||
// Typing notice is currently deactivated, therefore, send a request
|
||||
// only when it's about to be activated
|
||||
typing
|
||||
};
|
||||
// Only send a request when we need to deactivate typing
|
||||
!typing
|
||||
}
|
||||
} else {
|
||||
// Typing notice is currently deactivated, therefore, send a request
|
||||
// only when it's about to be activated
|
||||
typing
|
||||
};
|
||||
|
||||
if send {
|
||||
let typing = if typing {
|
||||
self.client
|
||||
.inner
|
||||
.typing_notice_times
|
||||
.insert(self.inner.room_id().clone(), Instant::now());
|
||||
Typing::Yes(TYPING_NOTICE_TIMEOUT)
|
||||
} else {
|
||||
self.client.typing_notice_times.remove(self.inner.room_id());
|
||||
self.client.inner.typing_notice_times.remove(self.inner.room_id());
|
||||
Typing::No
|
||||
};
|
||||
|
||||
@@ -278,7 +280,7 @@ impl Joined {
|
||||
/// if let Some(room) = client.get_joined_room(&room_id) {
|
||||
/// room.enable_encryption().await?
|
||||
/// }
|
||||
/// # matrix_sdk::Result::Ok(()) });
|
||||
/// # Result::<_, matrix_sdk::Error>::Ok(()) });
|
||||
/// ```
|
||||
pub async fn enable_encryption(&self) -> Result<()> {
|
||||
use ruma::{
|
||||
@@ -294,7 +296,7 @@ impl Joined {
|
||||
// TODO do we want to return an error here if we time out? This
|
||||
// could be quite useful if someone wants to enable encryption and
|
||||
// send a message right after it's enabled.
|
||||
self.client.sync_beat.listen().wait_timeout(SYNC_WAIT_TIME);
|
||||
self.client.inner.sync_beat.listen().wait_timeout(SYNC_WAIT_TIME);
|
||||
}
|
||||
|
||||
Ok(())
|
||||
@@ -311,7 +313,7 @@ impl Joined {
|
||||
// TODO expose this publicly so people can pre-share a group session if
|
||||
// e.g. a user starts to type a message for a room.
|
||||
if let Some(mutex) =
|
||||
self.client.group_session_locks.get(self.inner.room_id()).map(|m| m.clone())
|
||||
self.client.inner.group_session_locks.get(self.inner.room_id()).map(|m| m.clone())
|
||||
{
|
||||
// If a group session share request is already going on,
|
||||
// await the release of the lock.
|
||||
@@ -320,7 +322,10 @@ impl Joined {
|
||||
// Otherwise create a new lock and share the group
|
||||
// session.
|
||||
let mutex = Arc::new(Mutex::new(()));
|
||||
self.client.group_session_locks.insert(self.inner.room_id().clone(), mutex.clone());
|
||||
self.client
|
||||
.inner
|
||||
.group_session_locks
|
||||
.insert(self.inner.room_id().clone(), mutex.clone());
|
||||
|
||||
let _guard = mutex.lock().await;
|
||||
|
||||
@@ -334,13 +339,13 @@ impl Joined {
|
||||
|
||||
let response = self.share_group_session().await;
|
||||
|
||||
self.client.group_session_locks.remove(self.inner.room_id());
|
||||
self.client.inner.group_session_locks.remove(self.inner.room_id());
|
||||
|
||||
// If one of the responses failed invalidate the group
|
||||
// session as using it would end up in undecryptable
|
||||
// messages.
|
||||
if let Err(r) = response {
|
||||
self.client.base_client.invalidate_group_session(self.inner.room_id()).await?;
|
||||
self.client.base_client().invalidate_group_session(self.inner.room_id()).await?;
|
||||
return Err(r);
|
||||
}
|
||||
}
|
||||
@@ -357,12 +362,12 @@ impl Joined {
|
||||
#[cfg(feature = "encryption")]
|
||||
async fn share_group_session(&self) -> Result<()> {
|
||||
let mut requests =
|
||||
self.client.base_client.share_group_session(self.inner.room_id()).await?;
|
||||
self.client.base_client().share_group_session(self.inner.room_id()).await?;
|
||||
|
||||
for request in requests.drain(..) {
|
||||
let response = self.client.send_to_device(&request).await?;
|
||||
|
||||
self.client.base_client.mark_request_as_sent(&request.txn_id, &response).await?;
|
||||
self.client.mark_request_as_sent(&request.txn_id, &response).await?;
|
||||
}
|
||||
|
||||
Ok(())
|
||||
@@ -449,7 +454,7 @@ impl Joined {
|
||||
/// if let Some(room) = client.get_joined_room(&room_id) {
|
||||
/// room.send(content, Some(txn_id)).await?;
|
||||
/// }
|
||||
/// # matrix_sdk::Result::Ok(()) });
|
||||
/// # Result::<_, matrix_sdk::Error>::Ok(()) });
|
||||
/// ```
|
||||
///
|
||||
/// [`SyncMessageEvent`]: ruma::events::SyncMessageEvent
|
||||
@@ -517,7 +522,7 @@ impl Joined {
|
||||
/// if let Some(room) = client.get_joined_room(&room_id) {
|
||||
/// room.send_raw(content, "m.room.message", None).await?;
|
||||
/// }
|
||||
/// # matrix_sdk::Result::Ok(()) });
|
||||
/// # Result::<_, matrix_sdk::Error>::Ok(()) });
|
||||
/// ```
|
||||
///
|
||||
/// [`SyncMessageEvent`]: ruma::events::SyncMessageEvent
|
||||
@@ -554,8 +559,7 @@ impl Joined {
|
||||
|
||||
self.preshare_group_session().await?;
|
||||
|
||||
let olm =
|
||||
self.client.base_client.olm_machine().await.expect("Olm machine wasn't started");
|
||||
let olm = self.client.olm_machine().await.expect("Olm machine wasn't started");
|
||||
|
||||
let encrypted_content =
|
||||
olm.encrypt_raw(self.inner.room_id(), content, event_type).await?;
|
||||
@@ -631,7 +635,7 @@ impl Joined {
|
||||
/// None,
|
||||
/// ).await?;
|
||||
/// }
|
||||
/// # matrix_sdk::Result::Ok(()) });
|
||||
/// # Result::<_, matrix_sdk::Error>::Ok(()) });
|
||||
/// ```
|
||||
pub async fn send_attachment<R: Read>(
|
||||
&self,
|
||||
@@ -698,7 +702,7 @@ impl Joined {
|
||||
/// if let Some(room) = client.get_joined_room(&room_id) {
|
||||
/// room.send_state_event(content, "").await?;
|
||||
/// }
|
||||
/// # matrix_sdk::Result::Ok(()) });
|
||||
/// # Result::<_, matrix_sdk::Error>::Ok(()) });
|
||||
/// ```
|
||||
pub async fn send_state_event(
|
||||
&self,
|
||||
@@ -791,7 +795,7 @@ impl Joined {
|
||||
/// let reason = Some("Indecent material");
|
||||
/// room.redact(&event_id, reason, None).await?;
|
||||
/// }
|
||||
/// # matrix_sdk::Result::Ok(()) });
|
||||
/// # Result::<_, matrix_sdk::Error>::Ok(()) });
|
||||
/// ```
|
||||
pub async fn redact(
|
||||
&self,
|
||||
@@ -834,7 +838,7 @@ impl Joined {
|
||||
///
|
||||
/// room.set_tag("u.work", tag_info ).await?;
|
||||
/// }
|
||||
/// # matrix_sdk::Result::Ok(()) });
|
||||
/// # Result::<_, matrix_sdk::Error>::Ok(()) });
|
||||
/// ```
|
||||
pub async fn set_tag(&self, tag: &str, tag_info: TagInfo) -> HttpResult<create_tag::Response> {
|
||||
let user_id = self.client.user_id().await.ok_or(HttpError::AuthenticationRequired)?;
|
||||
|
||||
144
crates/matrix-sdk/src/sync.rs
Normal file
144
crates/matrix-sdk/src/sync.rs
Normal file
@@ -0,0 +1,144 @@
|
||||
use std::time::Duration;
|
||||
|
||||
use futures_timer::Delay as sleep;
|
||||
use matrix_sdk_base::{
|
||||
deserialized_responses::{JoinedRoom, LeftRoom, SyncResponse},
|
||||
instant::Instant,
|
||||
};
|
||||
use ruma::api::client::r0::sync::sync_events;
|
||||
use tracing::{error, warn};
|
||||
|
||||
use crate::{event_handler::EventKind, Client, Result};
|
||||
|
||||
/// Internal functionality related to getting events from the server
|
||||
/// (`sync_events` endpoint)
|
||||
impl Client {
|
||||
pub(crate) async fn process_sync(
|
||||
&self,
|
||||
response: sync_events::Response,
|
||||
) -> Result<SyncResponse> {
|
||||
let response = self.base_client().receive_sync_response(response).await?;
|
||||
let SyncResponse {
|
||||
next_batch: _,
|
||||
rooms,
|
||||
presence,
|
||||
account_data,
|
||||
to_device: _,
|
||||
device_lists: _,
|
||||
device_one_time_keys_count: _,
|
||||
ambiguity_changes: _,
|
||||
notifications,
|
||||
} = &response;
|
||||
|
||||
self.handle_sync_events(EventKind::GlobalAccountData, &None, &account_data.events).await?;
|
||||
self.handle_sync_events(EventKind::Presence, &None, &presence.events).await?;
|
||||
|
||||
for (room_id, room_info) in &rooms.join {
|
||||
let room = self.get_room(room_id);
|
||||
if room.is_none() {
|
||||
error!("Can't call event handler, room {} not found", room_id);
|
||||
continue;
|
||||
}
|
||||
|
||||
let JoinedRoom { unread_notifications: _, timeline, state, account_data, ephemeral } =
|
||||
room_info;
|
||||
|
||||
self.handle_sync_events(EventKind::EphemeralRoomData, &room, &ephemeral.events).await?;
|
||||
self.handle_sync_events(EventKind::RoomAccountData, &room, &account_data.events)
|
||||
.await?;
|
||||
self.handle_sync_state_events(&room, &state.events).await?;
|
||||
self.handle_sync_timeline_events(&room, &timeline.events).await?;
|
||||
}
|
||||
|
||||
for (room_id, room_info) in &rooms.leave {
|
||||
let room = self.get_room(room_id);
|
||||
if room.is_none() {
|
||||
error!("Can't call event handler, room {} not found", room_id);
|
||||
continue;
|
||||
}
|
||||
|
||||
let LeftRoom { timeline, state, account_data } = room_info;
|
||||
|
||||
self.handle_sync_events(EventKind::RoomAccountData, &room, &account_data.events)
|
||||
.await?;
|
||||
self.handle_sync_state_events(&room, &state.events).await?;
|
||||
self.handle_sync_timeline_events(&room, &timeline.events).await?;
|
||||
}
|
||||
|
||||
for (room_id, room_info) in &rooms.invite {
|
||||
let room = self.get_room(room_id);
|
||||
if room.is_none() {
|
||||
error!("Can't call event handler, room {} not found", room_id);
|
||||
continue;
|
||||
}
|
||||
|
||||
// FIXME: Destructure room_info
|
||||
self.handle_sync_events(
|
||||
EventKind::StrippedState,
|
||||
&room,
|
||||
&room_info.invite_state.events,
|
||||
)
|
||||
.await?;
|
||||
}
|
||||
|
||||
// Construct notification event handler futures
|
||||
let mut futures = Vec::new();
|
||||
for handler in &*self.notification_handlers().await {
|
||||
for (room_id, room_notifications) in notifications {
|
||||
let room = match self.get_room(room_id) {
|
||||
Some(room) => room,
|
||||
None => {
|
||||
warn!("Can't call notification handler, room {} not found", room_id);
|
||||
continue;
|
||||
}
|
||||
};
|
||||
|
||||
futures.extend(room_notifications.iter().map(|notification| {
|
||||
(handler)(notification.clone(), room.clone(), self.clone())
|
||||
}));
|
||||
}
|
||||
}
|
||||
|
||||
// Run the notification handler futures with the
|
||||
// `self.notification_handlers` lock no longer being held, in order.
|
||||
for fut in futures {
|
||||
fut.await;
|
||||
}
|
||||
|
||||
Ok(response)
|
||||
}
|
||||
|
||||
pub(crate) async fn sync_loop_helper(
|
||||
&self,
|
||||
sync_settings: &mut crate::config::SyncSettings<'_>,
|
||||
) -> Result<SyncResponse> {
|
||||
let response = self.sync_once(sync_settings.clone()).await;
|
||||
|
||||
match response {
|
||||
Ok(r) => {
|
||||
sync_settings.token = Some(r.next_batch.clone());
|
||||
Ok(r)
|
||||
}
|
||||
Err(e) => {
|
||||
error!("Received an invalid response: {}", e);
|
||||
sleep::new(Duration::from_secs(1)).await;
|
||||
Err(e)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) async fn delay_sync(last_sync_time: &mut Option<Instant>) {
|
||||
let now = Instant::now();
|
||||
|
||||
// If the last sync happened less than a second ago, sleep for a
|
||||
// while to not hammer out requests if the server doesn't respect
|
||||
// the sync timeout.
|
||||
if let Some(t) = last_sync_time {
|
||||
if now - *t <= Duration::from_secs(1) {
|
||||
sleep::new(Duration::from_secs(1)).await;
|
||||
}
|
||||
}
|
||||
|
||||
*last_sync_time = Some(now);
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user