From 0e4f4d69b89b1ff402ab5a8f5a4ee08df0141836 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Damir=20Jeli=C4=87?= Date: Tue, 3 Jan 2023 12:32:52 +0100 Subject: [PATCH] feat(crypto): Handle the failures field in the /keys/claim response --- .../src/session_manager/sessions.rs | 74 +++++++++++++++++-- 1 file changed, 68 insertions(+), 6 deletions(-) diff --git a/crates/matrix-sdk-crypto/src/session_manager/sessions.rs b/crates/matrix-sdk-crypto/src/session_manager/sessions.rs index 8e84875a4..4940ed0f9 100644 --- a/crates/matrix-sdk-crypto/src/session_manager/sessions.rs +++ b/crates/matrix-sdk-crypto/src/session_manager/sessions.rs @@ -25,8 +25,8 @@ use ruma::{ }, assign, events::dummy::ToDeviceDummyEventContent, - DeviceId, DeviceKeyAlgorithm, OwnedDeviceId, OwnedTransactionId, OwnedUserId, - SecondsSinceUnixEpoch, TransactionId, UserId, + DeviceId, DeviceKeyAlgorithm, OwnedDeviceId, OwnedServerName, OwnedTransactionId, OwnedUserId, + SecondsSinceUnixEpoch, ServerName, TransactionId, UserId, }; use tracing::{debug, error, info, warn}; use vodozemac::Curve25519PublicKey; @@ -39,6 +39,7 @@ use crate::{ requests::{OutgoingRequest, ToDeviceRequest}, store::{Changes, Result as StoreResult, Store}, types::{events::EventType, EventEncryptionAlgorithm}, + utilities::FailuresCache, ReadOnlyDevice, }; @@ -55,6 +56,7 @@ pub(crate) struct SessionManager { key_request_machine: GossipMachine, outgoing_to_device_requests: Arc>, keys_query_listener: KeysQueryListener, + failures: FailuresCache, } impl SessionManager { @@ -77,6 +79,7 @@ impl SessionManager { wedged_devices: Default::default(), outgoing_to_device_requests: Default::default(), keys_query_listener, + failures: Default::default(), } } @@ -226,7 +229,7 @@ impl SessionManager { // Add the list of devices that the user wishes to establish sessions // right now. - for user_id in users { + for user_id in users.filter(|u| !self.failures.contains(u.server_name())) { let user_devices = self.get_user_devices(user_id).await?; for (device_id, device) in user_devices { @@ -300,6 +303,16 @@ impl SessionManager { pub async fn receive_keys_claim_response(&self, response: &KeysClaimResponse) -> OlmResult<()> { debug!(failures = ?response.failures, "Received a `/keys/claim` response"); + let failed_servers = response + .failures + .keys() + .filter_map(|s| ServerName::parse(s).ok()) + .filter(|s| s != self.account.user_id().server_name()); + let successful_servers = response.one_time_keys.keys().map(|u| u.server_name()); + + self.failures.extend(failed_servers); + self.failures.remove(successful_servers); + struct SessionInfo { session_id: String, algorithm: EventEncryptionAlgorithm, @@ -393,11 +406,12 @@ mod tests { use dashmap::DashMap; use matrix_sdk_common::locks::Mutex; - use matrix_sdk_test::async_test; + use matrix_sdk_test::{async_test, response_from_file}; use ruma::{ - api::client::keys::claim_keys::v3::Response as KeyClaimResponse, device_id, user_id, - DeviceId, UserId, + api::{client::keys::claim_keys::v3::Response as KeyClaimResponse, IncomingResponse}, + device_id, user_id, DeviceId, UserId, }; + use serde_json::json; use super::SessionManager; use crate::{ @@ -421,6 +435,33 @@ mod tests { ReadOnlyAccount::new(user_id!("@bob:localhost"), device_id!("BOBDEVICE")) } + fn keys_claim_with_failure() -> KeyClaimResponse { + let response = json!({ + "one_time_keys": {}, + "failures": { + "example.org": { + "errcode": "M_RESOURCE_LIMIT_EXCEEDED", + "error": "Not yet ready to retry", + } + } + }); + let response = response_from_file(&response); + + KeyClaimResponse::try_from_http_response(response).unwrap() + } + + fn keys_claim_without_failure() -> KeyClaimResponse { + let response = json!({ + "one_time_keys": { + "@alice:example.org": {}, + }, + "failures": {}, + }); + let response = response_from_file(&response); + + KeyClaimResponse::try_from_http_response(response).unwrap() + } + async fn session_manager() -> SessionManager { let user_id = user_id(); let device_id = device_id(); @@ -546,4 +587,25 @@ mod tests { assert!(manager.get_missing_sessions(iter::once(bob.user_id())).await.unwrap().is_none()); assert!(!manager.outgoing_to_device_requests.is_empty()) } + + #[async_test] + async fn failure_handling() { + let alice = user_id!("@alice:example.org"); + let alice_account = ReadOnlyAccount::new(alice, "DEVICEID".into()); + let alice_device = ReadOnlyDevice::from_account(&alice_account).await; + + let manager = session_manager().await; + + manager.store.save_devices(&[alice_device]).await.unwrap(); + + let (_, users_for_key_claim) = + manager.get_missing_sessions(iter::once(alice)).await.unwrap().unwrap(); + assert!(users_for_key_claim.one_time_keys.contains_key(alice)); + + manager.receive_keys_claim_response(&keys_claim_with_failure()).await.unwrap(); + assert!(manager.get_missing_sessions(iter::once(alice)).await.unwrap().is_none()); + + manager.receive_keys_claim_response(&keys_claim_without_failure()).await.unwrap(); + assert!(users_for_key_claim.one_time_keys.contains_key(alice)); + } }