test: Confirm that the notification client doesn't create duplicate one-time keys

This commit is contained in:
Damir Jelić
2024-09-16 15:22:02 +02:00
parent b7ce2dc7e6
commit 1429c1a06a
2 changed files with 250 additions and 4 deletions

View File

@@ -55,7 +55,7 @@ assert-json-diff = { workspace = true }
assert_matches = { workspace = true }
assert_matches2 = { workspace = true }
eyeball-im-util = { workspace = true }
matrix-sdk = { workspace = true, features = ["testing"] }
matrix-sdk = { workspace = true, features = ["testing", "sqlite"] }
matrix-sdk-test = { workspace = true }
stream_assert = { workspace = true }
tempfile = "3.3.0"

View File

@@ -1,17 +1,35 @@
use std::sync::{Arc, Mutex};
use std::{
collections::{BTreeMap, HashSet},
sync::{
atomic::{AtomicBool, Ordering},
Arc, Mutex,
},
};
use futures_util::{pin_mut, StreamExt as _};
use matrix_sdk::test_utils::logged_in_client_with_server;
use matrix_sdk::{
config::RequestConfig,
matrix_auth::{MatrixSession, MatrixSessionTokens},
test_utils::{logged_in_client_with_server, test_client_builder_with_server},
SessionMeta,
};
use matrix_sdk_base::crypto::store::Changes;
use matrix_sdk_test::async_test;
use matrix_sdk_ui::encryption_sync_service::{
EncryptionSyncPermit, EncryptionSyncService, WithLocking,
};
use ruma::{device_id, user_id};
use serde::Deserialize;
use serde_json::json;
use tokio::sync::Mutex as AsyncMutex;
use wiremock::{Mock, MockGuard, MockServer, Request, ResponseTemplate};
use tracing::{error, info, trace, warn};
use wiremock::{
matchers::{method, path},
Mock, MockGuard, MockServer, Request, ResponseTemplate,
};
use crate::{
mock_sync,
sliding_sync::{check_requests, PartialSlidingSyncRequest, SlidingSyncMatcher},
sliding_sync_then_assert_request_and_fake_response,
};
@@ -320,3 +338,231 @@ async fn test_encryption_sync_always_reloads_todevice_token() -> anyhow::Result<
Ok(())
}
#[async_test]
async fn test_notification_client_does_not_upload_duplicate_one_time_keys() -> anyhow::Result<()> {
use tempfile::tempdir;
let dir = tempdir().unwrap();
let user_id = user_id!("@example:morpheus.localhost");
let (builder, server) = test_client_builder_with_server().await;
let client = builder
.request_config(RequestConfig::new().disable_retry())
.sqlite_store(dir.path(), None)
.build()
.await
.unwrap();
let session = MatrixSession {
meta: SessionMeta { user_id: user_id.into(), device_id: device_id!("DEVICEID").to_owned() },
tokens: MatrixSessionTokens { access_token: "1234".to_owned(), refresh_token: None },
};
client.restore_session(session.to_owned()).await.unwrap();
info!("Creating the notification client");
let notification_client = client
.notification_client()
.await
.expect("We should be able to build a notification client");
let sync_permit = Arc::new(AsyncMutex::new(EncryptionSyncPermit::new_for_testing()));
let sync_permit_guard = sync_permit.lock_owned().await;
let encryption_sync =
EncryptionSyncService::new("tests".to_owned(), client.clone(), None, WithLocking::Yes)
.await?;
let stream = encryption_sync.sync(sync_permit_guard);
pin_mut!(stream);
Mock::given(method("POST"))
.and(path("/_matrix/client/r0/keys/query"))
.respond_with(ResponseTemplate::new(200).set_body_json(json!({})))
.mount(&server)
.await;
info!("First sync, uploading 50 one-time keys");
sliding_sync_then_assert_request_and_fake_response! {
[server, stream]
assert request = {
"conn_id": "encryption",
"extensions": {
"e2ee": {
"enabled": true
},
"to_device": {
"enabled": true
}
}
},
respond with = {
"pos": "0",
"extensions": {
"to_device": {
"next_batch": "nb0"
},
}
},
};
#[derive(Debug, Deserialize)]
struct UploadRequest {
one_time_keys: BTreeMap<String, serde_json::Value>,
}
let found_duplicate = Arc::new(AtomicBool::new(false));
let uploaded_key_ids = Arc::new(Mutex::new(HashSet::new()));
Mock::given(method("POST"))
.and(path("/_matrix/client/r0/keys/upload"))
.respond_with({
let found_duplicate = found_duplicate.clone();
let uploaded_key_ids = uploaded_key_ids.clone();
move |request: &Request| {
let request: UploadRequest = request
.body_json()
.expect("The /keys/upload request should contain one-time keys");
let mut uploaded_key_ids = uploaded_key_ids.lock().unwrap();
let new_key_ids: HashSet<String> = request.one_time_keys.into_keys().collect();
warn!(?new_key_ids, "Got a new /keys/upload request");
let duplicates: HashSet<_> = uploaded_key_ids.intersection(&new_key_ids).collect();
if let Some(duplicate) = duplicates.into_iter().next() {
error!("Duplicate one-time keys were uploaded.");
found_duplicate.store(true, Ordering::SeqCst);
ResponseTemplate::new(400).set_body_json(json!({
"errcode": "M_WAT",
"error:": format!("One time key {duplicate} already exists!")
}))
} else {
trace!("No duplicate one-time keys found.");
uploaded_key_ids.extend(new_key_ids);
ResponseTemplate::new(200).set_body_json(json!({
"one_time_key_counts": {
"signed_curve25519": 50
}
}))
}
}
})
.expect(4)
.mount(&server)
.await;
info!("Main sync now gets told that a one-time key has been used up.");
sliding_sync_then_assert_request_and_fake_response! {
[server, stream]
assert request = {
"conn_id": "encryption",
"extensions": {
"to_device": {
"since": "nb0",
},
}
},
respond with = {
"pos": "2",
"extensions": {
"to_device": {
"next_batch": "nb2"
},
"e2ee": {
"device_one_time_keys_count": {
"signed_curve25519": 49
}
}
}
},
};
assert!(
!found_duplicate.load(Ordering::SeqCst),
"The main sync should not have caused a duplicate one-time key"
);
mock_sync(
&server,
json!({
"next_batch": "foo",
"device_one_time_keys_count": {
"signed_curve25519": 49
}
}),
None,
)
.await;
info!("The notification client now syncs and tries to upload some one-time keys");
notification_client
.sync_once(Default::default())
.await
.expect("The notification client should be able to sync successfully");
info!("Back to the main sync");
sliding_sync_then_assert_request_and_fake_response! {
[server, stream]
assert request = {
"conn_id": "encryption",
"extensions": {
"to_device": {
"since": "foo",
},
}
},
respond with = {
"pos": "2",
"extensions": {
"to_device": {
"next_batch": "nb4"
},
"e2ee": {
"device_one_time_keys_count": {
"signed_curve25519": 49
}
}
}
},
};
sliding_sync_then_assert_request_and_fake_response! {
[server, stream]
assert request = {
"conn_id": "encryption",
"extensions": {
"to_device": {
"since": "nb4",
},
}
},
respond with = {
"pos": "2",
"extensions": {
"to_device": {
"next_batch": "nb5"
},
}
},
};
assert!(
!found_duplicate.load(Ordering::SeqCst),
"Duplicate one-time keys should not have been created"
);
server.verify().await;
Ok(())
}