From 1429c1a06a7a97b496fdec147b54966e656dfd9c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Damir=20Jeli=C4=87?= Date: Mon, 16 Sep 2024 15:22:02 +0200 Subject: [PATCH] test: Confirm that the notification client doesn't create duplicate one-time keys --- crates/matrix-sdk-ui/Cargo.toml | 2 +- .../integration/encryption_sync_service.rs | 252 +++++++++++++++++- 2 files changed, 250 insertions(+), 4 deletions(-) diff --git a/crates/matrix-sdk-ui/Cargo.toml b/crates/matrix-sdk-ui/Cargo.toml index db4bac295..0a757b158 100644 --- a/crates/matrix-sdk-ui/Cargo.toml +++ b/crates/matrix-sdk-ui/Cargo.toml @@ -55,7 +55,7 @@ assert-json-diff = { workspace = true } assert_matches = { workspace = true } assert_matches2 = { workspace = true } eyeball-im-util = { workspace = true } -matrix-sdk = { workspace = true, features = ["testing"] } +matrix-sdk = { workspace = true, features = ["testing", "sqlite"] } matrix-sdk-test = { workspace = true } stream_assert = { workspace = true } tempfile = "3.3.0" diff --git a/crates/matrix-sdk-ui/tests/integration/encryption_sync_service.rs b/crates/matrix-sdk-ui/tests/integration/encryption_sync_service.rs index cc37762c9..e97bd0b93 100644 --- a/crates/matrix-sdk-ui/tests/integration/encryption_sync_service.rs +++ b/crates/matrix-sdk-ui/tests/integration/encryption_sync_service.rs @@ -1,17 +1,35 @@ -use std::sync::{Arc, Mutex}; +use std::{ + collections::{BTreeMap, HashSet}, + sync::{ + atomic::{AtomicBool, Ordering}, + Arc, Mutex, + }, +}; use futures_util::{pin_mut, StreamExt as _}; -use matrix_sdk::test_utils::logged_in_client_with_server; +use matrix_sdk::{ + config::RequestConfig, + matrix_auth::{MatrixSession, MatrixSessionTokens}, + test_utils::{logged_in_client_with_server, test_client_builder_with_server}, + SessionMeta, +}; use matrix_sdk_base::crypto::store::Changes; use matrix_sdk_test::async_test; use matrix_sdk_ui::encryption_sync_service::{ EncryptionSyncPermit, EncryptionSyncService, WithLocking, }; +use ruma::{device_id, user_id}; +use serde::Deserialize; use serde_json::json; use tokio::sync::Mutex as AsyncMutex; -use wiremock::{Mock, MockGuard, MockServer, Request, ResponseTemplate}; +use tracing::{error, info, trace, warn}; +use wiremock::{ + matchers::{method, path}, + Mock, MockGuard, MockServer, Request, ResponseTemplate, +}; use crate::{ + mock_sync, sliding_sync::{check_requests, PartialSlidingSyncRequest, SlidingSyncMatcher}, sliding_sync_then_assert_request_and_fake_response, }; @@ -320,3 +338,231 @@ async fn test_encryption_sync_always_reloads_todevice_token() -> anyhow::Result< Ok(()) } + +#[async_test] +async fn test_notification_client_does_not_upload_duplicate_one_time_keys() -> anyhow::Result<()> { + use tempfile::tempdir; + + let dir = tempdir().unwrap(); + let user_id = user_id!("@example:morpheus.localhost"); + + let (builder, server) = test_client_builder_with_server().await; + let client = builder + .request_config(RequestConfig::new().disable_retry()) + .sqlite_store(dir.path(), None) + .build() + .await + .unwrap(); + + let session = MatrixSession { + meta: SessionMeta { user_id: user_id.into(), device_id: device_id!("DEVICEID").to_owned() }, + tokens: MatrixSessionTokens { access_token: "1234".to_owned(), refresh_token: None }, + }; + + client.restore_session(session.to_owned()).await.unwrap(); + + info!("Creating the notification client"); + let notification_client = client + .notification_client() + .await + .expect("We should be able to build a notification client"); + + let sync_permit = Arc::new(AsyncMutex::new(EncryptionSyncPermit::new_for_testing())); + let sync_permit_guard = sync_permit.lock_owned().await; + let encryption_sync = + EncryptionSyncService::new("tests".to_owned(), client.clone(), None, WithLocking::Yes) + .await?; + + let stream = encryption_sync.sync(sync_permit_guard); + pin_mut!(stream); + + Mock::given(method("POST")) + .and(path("/_matrix/client/r0/keys/query")) + .respond_with(ResponseTemplate::new(200).set_body_json(json!({}))) + .mount(&server) + .await; + + info!("First sync, uploading 50 one-time keys"); + + sliding_sync_then_assert_request_and_fake_response! { + [server, stream] + assert request = { + "conn_id": "encryption", + "extensions": { + "e2ee": { + "enabled": true + }, + "to_device": { + "enabled": true + } + } + }, + respond with = { + "pos": "0", + "extensions": { + "to_device": { + "next_batch": "nb0" + }, + } + }, + }; + + #[derive(Debug, Deserialize)] + struct UploadRequest { + one_time_keys: BTreeMap, + } + + let found_duplicate = Arc::new(AtomicBool::new(false)); + let uploaded_key_ids = Arc::new(Mutex::new(HashSet::new())); + + Mock::given(method("POST")) + .and(path("/_matrix/client/r0/keys/upload")) + .respond_with({ + let found_duplicate = found_duplicate.clone(); + let uploaded_key_ids = uploaded_key_ids.clone(); + + move |request: &Request| { + let request: UploadRequest = request + .body_json() + .expect("The /keys/upload request should contain one-time keys"); + + let mut uploaded_key_ids = uploaded_key_ids.lock().unwrap(); + + let new_key_ids: HashSet = request.one_time_keys.into_keys().collect(); + + warn!(?new_key_ids, "Got a new /keys/upload request"); + + let duplicates: HashSet<_> = uploaded_key_ids.intersection(&new_key_ids).collect(); + + if let Some(duplicate) = duplicates.into_iter().next() { + error!("Duplicate one-time keys were uploaded."); + + found_duplicate.store(true, Ordering::SeqCst); + + ResponseTemplate::new(400).set_body_json(json!({ + "errcode": "M_WAT", + "error:": format!("One time key {duplicate} already exists!") + })) + } else { + trace!("No duplicate one-time keys found."); + uploaded_key_ids.extend(new_key_ids); + + ResponseTemplate::new(200).set_body_json(json!({ + "one_time_key_counts": { + "signed_curve25519": 50 + } + })) + } + } + }) + .expect(4) + .mount(&server) + .await; + + info!("Main sync now gets told that a one-time key has been used up."); + + sliding_sync_then_assert_request_and_fake_response! { + [server, stream] + assert request = { + "conn_id": "encryption", + "extensions": { + "to_device": { + "since": "nb0", + }, + } + }, + respond with = { + "pos": "2", + "extensions": { + "to_device": { + "next_batch": "nb2" + }, + "e2ee": { + "device_one_time_keys_count": { + "signed_curve25519": 49 + } + } + } + }, + }; + + assert!( + !found_duplicate.load(Ordering::SeqCst), + "The main sync should not have caused a duplicate one-time key" + ); + + mock_sync( + &server, + json!({ + "next_batch": "foo", + "device_one_time_keys_count": { + "signed_curve25519": 49 + } + }), + None, + ) + .await; + + info!("The notification client now syncs and tries to upload some one-time keys"); + + notification_client + .sync_once(Default::default()) + .await + .expect("The notification client should be able to sync successfully"); + + info!("Back to the main sync"); + + sliding_sync_then_assert_request_and_fake_response! { + [server, stream] + assert request = { + "conn_id": "encryption", + "extensions": { + "to_device": { + "since": "foo", + }, + } + }, + respond with = { + "pos": "2", + "extensions": { + "to_device": { + "next_batch": "nb4" + }, + "e2ee": { + "device_one_time_keys_count": { + "signed_curve25519": 49 + } + } + } + }, + }; + + sliding_sync_then_assert_request_and_fake_response! { + [server, stream] + assert request = { + "conn_id": "encryption", + "extensions": { + "to_device": { + "since": "nb4", + }, + } + }, + respond with = { + "pos": "2", + "extensions": { + "to_device": { + "next_batch": "nb5" + }, + } + }, + }; + + assert!( + !found_duplicate.load(Ordering::SeqCst), + "Duplicate one-time keys should not have been created" + ); + + server.verify().await; + + Ok(()) +}