fix(send queue): when adding a local reaction, look for media events in dependent requests too

This commit is contained in:
Benjamin Bouvier
2024-11-19 14:27:21 +01:00
parent f20401c657
commit 8a6ced0e8f
4 changed files with 113 additions and 18 deletions

View File

@@ -367,6 +367,27 @@ pub struct DependentQueuedRequest {
pub parent_key: Option<SentRequestKey>,
}
impl DependentQueuedRequest {
/// Does the dependent request represent a new event that is *not*
/// aggregated, aka it is going to be its own item in a timeline?
pub fn is_own_event(&self) -> bool {
match self.kind {
DependentQueuedRequestKind::EditEvent { .. }
| DependentQueuedRequestKind::RedactEvent
| DependentQueuedRequestKind::ReactEvent { .. }
| DependentQueuedRequestKind::UploadFileWithThumbnail { .. } => {
// These are all aggregated events, or non-visible items (file upload producing
// a new MXC ID).
false
}
DependentQueuedRequestKind::FinishUpload { .. } => {
// This one graduates into a new media event.
true
}
}
}
}
#[cfg(not(tarpaulin_include))]
impl fmt::Debug for QueuedRequest {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {

View File

@@ -505,7 +505,7 @@ impl<P: RoomDataProvider> TimelineController<P> {
let Some(prev_status) = prev_status else {
match &item.kind {
EventTimelineItemKind::Local(local) => {
if let Some(send_handle) = local.send_handle.clone() {
if let Some(send_handle) = &local.send_handle {
if send_handle
.react(key.to_owned())
.await

View File

@@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use std::{fs::File, io::Write as _, time::Duration};
use std::{fs::File, io::Write as _, path::PathBuf, time::Duration};
use assert_matches::assert_matches;
use assert_matches2::assert_let;
@@ -35,6 +35,24 @@ use tempfile::TempDir;
use tokio::time::sleep;
use wiremock::ResponseTemplate;
fn create_temporary_file(filename: &str) -> (TempDir, PathBuf) {
let tmp_dir = TempDir::new().unwrap();
let file_path = tmp_dir.path().join(filename);
let mut file = File::create(&file_path).unwrap();
file.write_all(b"hello world").unwrap();
(tmp_dir, file_path)
}
fn get_filename_and_caption(msg: &MessageType) -> (&str, Option<&str>) {
match msg {
MessageType::File(event) => (event.filename(), event.caption()),
MessageType::Image(event) => (event.filename(), event.caption()),
MessageType::Video(event) => (event.filename(), event.caption()),
MessageType::Audio(event) => (event.filename(), event.caption()),
_ => panic!("unexpected message type"),
}
}
#[async_test]
async fn test_send_attachment() {
let mock = MatrixMockServer::new().await;
@@ -48,6 +66,7 @@ async fn test_send_attachment() {
let (items, mut timeline_stream) =
timeline.subscribe_filter_map(|item| item.as_event().cloned()).await;
assert!(items.is_empty());
let f = EventFactory::new();
@@ -66,13 +85,7 @@ async fn test_send_attachment() {
assert!(timeline_stream.next().now_or_never().is_none());
// Store a file in a temporary directory.
let tmp_dir = TempDir::new().unwrap();
let file_path = tmp_dir.path().join("test.bin");
{
let mut file = File::create(&file_path).unwrap();
file.write_all(b"hello world").unwrap();
}
let (_tmp_dir, file_path) = create_temporary_file("test.bin");
// Set up mocks for the file upload.
mock.mock_upload()
@@ -94,16 +107,14 @@ async fn test_send_attachment() {
{
assert_let_timeout!(Some(VectorDiff::PushBack { value: item }) = timeline_stream.next());
assert_matches!(item.send_state(), Some(EventSendState::NotSentYet));
assert_let!(TimelineItemContent::Message(msg) = item.content());
// Body is the caption, because there's both a caption and filename.
assert_eq!(msg.body(), "caption");
assert_let!(MessageType::File(file) = msg.msgtype());
assert_eq!(file.filename(), "test.bin");
assert_eq!(file.caption(), Some("caption"));
assert_eq!(get_filename_and_caption(msg.msgtype()), ("test.bin", Some("caption")));
// The URI refers to the local cache.
assert_let!(MessageType::File(file) = msg.msgtype());
assert_let!(MediaSource::Plain(uri) = &file.source);
assert!(uri.to_string().contains("localhost"));
}
@@ -116,12 +127,11 @@ async fn test_send_attachment() {
Some(VectorDiff::Set { index: 1, value: item }) = timeline_stream.next()
);
assert_let!(TimelineItemContent::Message(msg) = item.content());
assert_let!(MessageType::File(file) = msg.msgtype());
assert_eq!(file.filename(), "test.bin");
assert_eq!(file.caption(), Some("caption"));
assert_matches!(item.send_state(), Some(EventSendState::NotSentYet));
assert_eq!(get_filename_and_caption(msg.msgtype()), ("test.bin", Some("caption")));
// The URI now refers to the final MXC URI.
assert_let!(MessageType::File(file) = msg.msgtype());
assert_let!(MediaSource::Plain(uri) = &file.source);
assert_eq!(uri.to_string(), "mxc://sdk.rs/media");
}
@@ -139,3 +149,57 @@ async fn test_send_attachment() {
// That's all, folks!
assert!(timeline_stream.next().now_or_never().is_none());
}
#[async_test]
async fn test_react_to_local_media() {
let mock = MatrixMockServer::new().await;
let client = mock.client_builder().build().await;
// Disable the sending queue, to simulate offline mode.
client.send_queue().set_enabled(false).await;
mock.mock_room_state_encryption().plain().mount().await;
let room_id = room_id!("!a98sd12bjh:example.org");
let room = mock.sync_joined_room(&client, room_id).await;
let timeline = room.timeline().await.unwrap();
let (items, mut timeline_stream) =
timeline.subscribe_filter_map(|item| item.as_event().cloned()).await;
assert!(items.is_empty());
assert!(timeline_stream.next().now_or_never().is_none());
// Store a file in a temporary directory.
let (_tmp_dir, file_path) = create_temporary_file("test.bin");
// Queue sending of an attachment (no captions).
let config = AttachmentConfig::new();
timeline.send_attachment(&file_path, mime::TEXT_PLAIN, config).use_send_queue().await.unwrap();
let item_id = {
assert_let_timeout!(Some(VectorDiff::PushBack { value: item }) = timeline_stream.next());
assert_let!(TimelineItemContent::Message(msg) = item.content());
assert_eq!(get_filename_and_caption(msg.msgtype()), ("test.bin", None));
// The item starts with no reactions.
assert!(item.reactions().is_empty());
item.identifier()
};
// Add a reaction to the file media event.
timeline.toggle_reaction(&item_id, "🤪").await.unwrap();
assert_let_timeout!(Some(VectorDiff::Set { index: 0, value: item }) = timeline_stream.next());
assert_let!(TimelineItemContent::Message(msg) = item.content());
assert_eq!(get_filename_and_caption(msg.msgtype()), ("test.bin", None));
// There's a reaction for the current user for the given emoji.
let reactions = item.reactions();
let own_user_id = client.user_id().unwrap();
reactions.get("🤪").unwrap().get(own_user_id).unwrap();
// That's all, folks!
assert!(timeline_stream.next().now_or_never().is_none());
}

View File

@@ -1270,7 +1270,17 @@ impl QueueStorage {
// If the target event has been already sent, abort immediately.
if !requests.iter().any(|item| item.transaction_id == transaction_id) {
return Ok(None);
// We didn't find it as a queued request; try to find it as a dependent queued
// request.
let dependent_requests = store.load_dependent_queued_requests(&self.room_id).await?;
if !dependent_requests
.into_iter()
.filter_map(|item| item.is_own_event().then_some(item.own_transaction_id))
.any(|child_txn| *child_txn == *transaction_id)
{
// We didn't find it as either a request or a dependent request, abort.
return Ok(None);
}
}
// Record the dependent request.