test(send queue): add more tests for cancellation

This commit is contained in:
Benjamin Bouvier
2024-11-13 18:00:16 +01:00
parent b7d4be9b65
commit 9b6de4e436

View File

@@ -44,7 +44,7 @@ use wiremock::{Request, ResponseTemplate};
/// Queues an attachment whenever the actual data/mime type etc. don't matter.
///
/// Returns the filename, for sanity check purposes.
async fn queue_attachment_no_thumbnail(q: &RoomSendQueue) -> &'static str {
async fn queue_attachment_no_thumbnail(q: &RoomSendQueue) -> (SendHandle, &'static str) {
let filename = "surprise.jpeg.exe";
let content_type = mime::IMAGE_JPEG;
let data = b"hello world".to_vec();
@@ -54,10 +54,11 @@ async fn queue_attachment_no_thumbnail(q: &RoomSendQueue) -> &'static str {
size: Some(uint!(42)),
blurhash: None,
}));
q.send_attachment(filename, content_type, data, config)
let handle = q
.send_attachment(filename, content_type, data, config)
.await
.expect("queuing the attachment works");
filename
(handle, filename)
}
/// Queues an attachment whenever the actual data/mime type etc. don't matter,
@@ -2006,7 +2007,7 @@ async fn test_media_upload_retry() {
// Send the media.
assert!(watch.is_empty());
let filename = queue_attachment_no_thumbnail(&q).await;
let (_handle, filename) = queue_attachment_no_thumbnail(&q).await;
// Observe the local echo.
let (event_txn, _send_handle, content) = assert_update!(watch => local echo event);
@@ -2075,7 +2076,7 @@ async fn test_unwedging_media_upload() {
// Send the media.
assert!(watch.is_empty());
let filename = queue_attachment_no_thumbnail(&q).await;
let (_handle, filename) = queue_attachment_no_thumbnail(&q).await;
// Observe the local echo.
let (event_txn, send_handle, content) = assert_update!(watch => local echo event);
@@ -2196,7 +2197,7 @@ async fn test_media_event_is_sent_in_order() {
mock.mock_room_send().ok(event_id!("$text")).mock_once().mount().await;
// 2. Queue the media.
let filename = queue_attachment_no_thumbnail(&q).await;
let (_handle, filename) = queue_attachment_no_thumbnail(&q).await;
// 3. Queue the message.
q.send(RoomMessageEventContent::text_plain("hello world").into()).await.unwrap();
@@ -2311,12 +2312,8 @@ async fn test_cancel_upload_with_thumbnail_active() {
// Have the thumbnail upload take forever and time out, if continued. This will
// be interrupted when aborting, so this will never have to complete.
mock.mock_upload()
.respond_with(ResponseTemplate::new(200).set_delay(Duration::from_secs(60)).set_body_json(
json!({
"mxc_id": "mxc://sdk.rs/unreachable"
}),
))
.expect(0)
.respond_with(ResponseTemplate::new(200).set_delay(Duration::from_secs(60)))
.expect(1)
.mount()
.await;
@@ -2329,6 +2326,9 @@ async fn test_cancel_upload_with_thumbnail_active() {
assert_let!(MessageType::Image(img_content) = content.msgtype);
assert_eq!(img_content.filename(), filename);
// Let the upload request start.
sleep(Duration::from_millis(500)).await;
// Abort the upload.
abort_and_verify(&client, &mut watch, img_content, upload_handle, upload_txn).await;
@@ -2341,3 +2341,234 @@ async fn test_cancel_upload_with_thumbnail_active() {
// That's all, folks!
assert!(watch.is_empty());
}
#[async_test]
async fn test_cancel_upload_with_uploaded_thumbnail_and_file_active() {
let mock = MatrixMockServer::new().await;
// Mark the room as joined.
let room_id = room_id!("!a:b.c");
let client = mock.client_builder().build().await;
let room = mock.sync_joined_room(&client, room_id).await;
let q = room.send_queue();
let (local_echoes, mut watch) = q.subscribe().await.unwrap();
assert!(local_echoes.is_empty());
// Prepare endpoints.
mock.mock_room_state_encryption().plain().mount().await;
mock.mock_room_send().ok(event_id!("$msg")).mock_once().named("send event").mount().await;
// Have the thumbnail upload finish early.
mock.mock_upload()
.ok(mxc_uri!("mxc://sdk.rs/thumbnail"))
.mock_once()
.named("thumbnail upload")
.mount()
.await;
// Have the file upload take forever and time out, if continued. This will
// be interrupted when aborting, so this will never have to complete.
mock.mock_upload()
.respond_with(ResponseTemplate::new(200).set_delay(Duration::from_secs(60)))
.expect(1)
.named("file upload")
.mount()
.await;
// Send the media.
assert!(watch.is_empty());
let (upload_handle, filename) = queue_attachment_with_thumbnail(&q).await;
let (upload_txn, _send_handle, content) = assert_update!(watch => local echo event);
assert_let!(MessageType::Image(img_content) = content.msgtype);
assert_eq!(img_content.filename(), filename);
// The thumbnail uploads just fine.
assert_update!(watch => uploaded { related_to = upload_txn, mxc = mxc_uri!("mxc://sdk.rs/thumbnail") });
// Let the file upload request start.
sleep(Duration::from_millis(500)).await;
// Abort the upload.
abort_and_verify(&client, &mut watch, img_content, upload_handle, upload_txn).await;
// To prove we're not waiting for the upload to finish, send a message and
// observe it's immediately sent.
q.send(RoomMessageEventContent::text_plain("hi").into()).await.unwrap();
let (msg_txn, _handle) = assert_update!(watch => local echo { body = "hi" });
assert_update!(watch => sent { txn = msg_txn, });
// That's all, folks!
assert!(watch.is_empty());
}
#[async_test]
async fn test_cancel_upload_only_file_with_file_active() {
let mock = MatrixMockServer::new().await;
// Mark the room as joined.
let room_id = room_id!("!a:b.c");
let client = mock.client_builder().build().await;
let room = mock.sync_joined_room(&client, room_id).await;
let q = room.send_queue();
let (local_echoes, mut watch) = q.subscribe().await.unwrap();
assert!(local_echoes.is_empty());
// Prepare endpoints.
mock.mock_room_state_encryption().plain().mount().await;
mock.mock_room_send().ok(event_id!("$msg")).mock_once().named("send event").mount().await;
// Have the file upload take forever and time out, if continued. This will
// be interrupted when aborting, so this will never have to complete.
mock.mock_upload()
.respond_with(ResponseTemplate::new(200).set_delay(Duration::from_secs(60)))
.expect(1)
.named("file upload")
.mount()
.await;
// Send the media.
assert!(watch.is_empty());
let (upload_handle, filename) = queue_attachment_no_thumbnail(&q).await;
let (upload_txn, _send_handle, content) = assert_update!(watch => local echo event);
assert_let!(MessageType::Image(img_content) = content.msgtype);
assert_eq!(img_content.filename(), filename);
// Let the upload request start.
sleep(Duration::from_millis(500)).await;
// Abort the upload.
let aborted = upload_handle.abort().await.unwrap();
assert!(aborted, "upload must have been aborted");
assert_update!(watch => cancelled { txn = upload_txn });
// The event cache doesn't contain the medias anymore.
client
.media()
.get_media_content(
&MediaRequestParameters { source: img_content.source, format: MediaFormat::File },
true,
)
.await
.unwrap_err();
// To prove we're not waiting for the upload to finish, send a message and
// observe it's immediately sent.
q.send(RoomMessageEventContent::text_plain("hi").into()).await.unwrap();
let (msg_txn, _handle) = assert_update!(watch => local echo { body = "hi" });
assert_update!(watch => sent { txn = msg_txn, });
// That's all, folks!
assert!(watch.is_empty());
}
#[async_test]
async fn test_cancel_upload_while_sending_event() {
let mock = MatrixMockServer::new().await;
// Mark the room as joined.
let room_id = room_id!("!a:b.c");
let client = mock.client_builder().build().await;
let room = mock.sync_joined_room(&client, room_id).await;
let q = room.send_queue();
let (local_echoes, mut watch) = q.subscribe().await.unwrap();
assert!(local_echoes.is_empty());
// Prepare endpoints.
mock.mock_room_state_encryption().plain().mount().await;
// File upload will succeed immediately.
mock.mock_upload()
.ok(mxc_uri!("mxc://sdk.rs/media"))
.mock_once()
.named("file upload")
.mount()
.await;
// Sending of the media event will take 1 second, so we can abort it while it's
// happening.
mock.mock_room_send()
.respond_with(ResponseTemplate::new(200).set_delay(Duration::from_secs(1)).set_body_json(
json!({
"event_id": "$media"
}),
))
.mock_once()
.named("send event")
.mock_once()
.mount()
.await;
// A redaction will happen because the abort happens after the event is getting
// sent.
mock.mock_room_redact().ok(event_id!("$redaction")).mock_once().mount().await;
// Send the media.
assert!(watch.is_empty());
let (upload_handle, filename) = queue_attachment_no_thumbnail(&q).await;
let (upload_txn, _send_handle, content) = assert_update!(watch => local echo event);
assert_let!(MessageType::Image(local_content) = content.msgtype);
assert_eq!(local_content.filename(), filename);
assert_update!(watch => uploaded { related_to = upload_txn, mxc = mxc_uri!("mxc://sdk.rs/media") });
let edit_msg = assert_update!(watch => edit local echo { txn = upload_txn });
assert_let!(MessageType::Image(remote_content) = edit_msg.msgtype);
assert_let!(MediaSource::Plain(new_uri) = &remote_content.source);
assert_eq!(new_uri, mxc_uri!("mxc://sdk.rs/media"));
// Let the upload request start.
sleep(Duration::from_millis(250)).await;
// Abort the upload.
let aborted = upload_handle.abort().await.unwrap();
assert!(aborted, "upload must have been aborted");
// We get a local echo for the cancelled media event…
assert_update!(watch => cancelled { txn = upload_txn });
// …But the event is still sent, before getting redacted.
assert_update!(watch => sent { txn = upload_txn, });
// The event cache doesn't contain the media with the local URI.
client
.media()
.get_media_content(
&MediaRequestParameters { source: local_content.source, format: MediaFormat::File },
true,
)
.await
.unwrap_err();
// But it does contain the media with the remote URI, which hasn't been removed
// from the remote server.
client
.media()
.get_media_content(
&MediaRequestParameters { source: remote_content.source, format: MediaFormat::File },
true,
)
.await
.unwrap();
// Let things settle (and the redaction endpoint be called).
sleep(Duration::from_secs(1)).await;
// Trying to abort after it's been sent/redacted is a no-op
let aborted = upload_handle.abort().await.unwrap();
assert!(aborted.not());
// That's all, folks!
assert!(watch.is_empty());
}