feat(crypto): Handle the failures field in the /keys/claim response

This commit is contained in:
Damir Jelić
2023-01-03 12:32:52 +01:00
parent 8db84a3c9e
commit 0e4f4d69b8

View File

@@ -25,8 +25,8 @@ use ruma::{
},
assign,
events::dummy::ToDeviceDummyEventContent,
DeviceId, DeviceKeyAlgorithm, OwnedDeviceId, OwnedTransactionId, OwnedUserId,
SecondsSinceUnixEpoch, TransactionId, UserId,
DeviceId, DeviceKeyAlgorithm, OwnedDeviceId, OwnedServerName, OwnedTransactionId, OwnedUserId,
SecondsSinceUnixEpoch, ServerName, TransactionId, UserId,
};
use tracing::{debug, error, info, warn};
use vodozemac::Curve25519PublicKey;
@@ -39,6 +39,7 @@ use crate::{
requests::{OutgoingRequest, ToDeviceRequest},
store::{Changes, Result as StoreResult, Store},
types::{events::EventType, EventEncryptionAlgorithm},
utilities::FailuresCache,
ReadOnlyDevice,
};
@@ -55,6 +56,7 @@ pub(crate) struct SessionManager {
key_request_machine: GossipMachine,
outgoing_to_device_requests: Arc<DashMap<OwnedTransactionId, OutgoingRequest>>,
keys_query_listener: KeysQueryListener,
failures: FailuresCache<OwnedServerName>,
}
impl SessionManager {
@@ -77,6 +79,7 @@ impl SessionManager {
wedged_devices: Default::default(),
outgoing_to_device_requests: Default::default(),
keys_query_listener,
failures: Default::default(),
}
}
@@ -226,7 +229,7 @@ impl SessionManager {
// Add the list of devices that the user wishes to establish sessions
// right now.
for user_id in users {
for user_id in users.filter(|u| !self.failures.contains(u.server_name())) {
let user_devices = self.get_user_devices(user_id).await?;
for (device_id, device) in user_devices {
@@ -300,6 +303,16 @@ impl SessionManager {
pub async fn receive_keys_claim_response(&self, response: &KeysClaimResponse) -> OlmResult<()> {
debug!(failures = ?response.failures, "Received a `/keys/claim` response");
let failed_servers = response
.failures
.keys()
.filter_map(|s| ServerName::parse(s).ok())
.filter(|s| s != self.account.user_id().server_name());
let successful_servers = response.one_time_keys.keys().map(|u| u.server_name());
self.failures.extend(failed_servers);
self.failures.remove(successful_servers);
struct SessionInfo {
session_id: String,
algorithm: EventEncryptionAlgorithm,
@@ -393,11 +406,12 @@ mod tests {
use dashmap::DashMap;
use matrix_sdk_common::locks::Mutex;
use matrix_sdk_test::async_test;
use matrix_sdk_test::{async_test, response_from_file};
use ruma::{
api::client::keys::claim_keys::v3::Response as KeyClaimResponse, device_id, user_id,
DeviceId, UserId,
api::{client::keys::claim_keys::v3::Response as KeyClaimResponse, IncomingResponse},
device_id, user_id, DeviceId, UserId,
};
use serde_json::json;
use super::SessionManager;
use crate::{
@@ -421,6 +435,33 @@ mod tests {
ReadOnlyAccount::new(user_id!("@bob:localhost"), device_id!("BOBDEVICE"))
}
fn keys_claim_with_failure() -> KeyClaimResponse {
let response = json!({
"one_time_keys": {},
"failures": {
"example.org": {
"errcode": "M_RESOURCE_LIMIT_EXCEEDED",
"error": "Not yet ready to retry",
}
}
});
let response = response_from_file(&response);
KeyClaimResponse::try_from_http_response(response).unwrap()
}
fn keys_claim_without_failure() -> KeyClaimResponse {
let response = json!({
"one_time_keys": {
"@alice:example.org": {},
},
"failures": {},
});
let response = response_from_file(&response);
KeyClaimResponse::try_from_http_response(response).unwrap()
}
async fn session_manager() -> SessionManager {
let user_id = user_id();
let device_id = device_id();
@@ -546,4 +587,25 @@ mod tests {
assert!(manager.get_missing_sessions(iter::once(bob.user_id())).await.unwrap().is_none());
assert!(!manager.outgoing_to_device_requests.is_empty())
}
#[async_test]
async fn failure_handling() {
let alice = user_id!("@alice:example.org");
let alice_account = ReadOnlyAccount::new(alice, "DEVICEID".into());
let alice_device = ReadOnlyDevice::from_account(&alice_account).await;
let manager = session_manager().await;
manager.store.save_devices(&[alice_device]).await.unwrap();
let (_, users_for_key_claim) =
manager.get_missing_sessions(iter::once(alice)).await.unwrap().unwrap();
assert!(users_for_key_claim.one_time_keys.contains_key(alice));
manager.receive_keys_claim_response(&keys_claim_with_failure()).await.unwrap();
assert!(manager.get_missing_sessions(iter::once(alice)).await.unwrap().is_none());
manager.receive_keys_claim_response(&keys_claim_without_failure()).await.unwrap();
assert!(users_for_key_claim.one_time_keys.contains_key(alice));
}
}