From fc7124fd1ab9424b8afbed7ef8d6debb2a183f47 Mon Sep 17 00:00:00 2001 From: Johannes Marbach Date: Mon, 7 Jul 2025 19:58:36 +0200 Subject: [PATCH] feat(send_queue): communicate media upload progress in RoomSendQueueUpdate::MediaUpload Signed-off-by: Johannes Marbach --- crates/matrix-sdk/src/send_queue/mod.rs | 303 +++++++++++++++++- .../tests/integration/send_queue.rs | 69 +++- 2 files changed, 348 insertions(+), 24 deletions(-) diff --git a/crates/matrix-sdk/src/send_queue/mod.rs b/crates/matrix-sdk/src/send_queue/mod.rs index 1299fa413..6d3aff97c 100644 --- a/crates/matrix-sdk/src/send_queue/mod.rs +++ b/crates/matrix-sdk/src/send_queue/mod.rs @@ -178,7 +178,7 @@ use crate::{ config::RequestConfig, error::RetryKind, room::{edit::EditedContent, WeakRoom}, - Client, Media, Room, TransmissionProgress, + AbstractProgress, Client, Media, Room, TransmissionProgress, }; mod upload; @@ -578,21 +578,10 @@ impl RoomSendQueue { locally_enabled: Arc, global_error_sender: broadcast::Sender, is_dropping: Arc, - _report_media_upload_progress: Arc, + report_media_upload_progress: Arc, ) { trace!("spawned the sending task"); - fn send_update( - global_update_sender: &broadcast::Sender, - update_sender: &broadcast::Sender, - room_id: &RoomId, - update: RoomSendQueueUpdate, - ) { - let _ = update_sender.send(update.clone()); - let _ = - global_update_sender.send(SendQueueUpdate { room_id: room_id.to_owned(), update }); - } - let room_id = room.room_id(); loop { @@ -649,7 +638,23 @@ impl RoomSendQueue { continue; }; - match Self::handle_request(&room, queued_request, cancel_upload_rx, None).await { + // Prepare to watch and communicate the request progress for media uploads. + let media_upload_info = + RoomSendQueue::try_create_media_upload_info(&queued_request, &room, &queue) + .await + .unwrap_or_default(); + let progress = RoomSendQueue::try_create_media_upload_progress_observable( + &report_media_upload_progress, + &media_upload_info, + &related_txn_id, + &global_update_sender, + &update_sender, + room_id, + ); + + match Self::handle_request(&room, queued_request, cancel_upload_rx, progress.clone()) + .await + { Ok(Some(parent_key)) => match queue.mark_as_sent(&txn_id, parent_key.clone()).await { Ok(()) => match parent_key { @@ -669,7 +674,16 @@ impl RoomSendQueue { room_id, RoomSendQueueUpdate::MediaUpload { related_to: related_txn_id.as_ref().unwrap_or(&txn_id).clone(), - file: media_info.file, + file: Some(media_info.file), + index: media_upload_info.index, + progress: estimate_combined_media_upload_progress( + // The file finished uploading + AbstractProgress { + current: media_upload_info.bytes, + total: media_upload_info.bytes, + }, + &media_upload_info, + ), }, ); } @@ -755,6 +769,172 @@ impl RoomSendQueue { info!("exited sending task"); } + /// Try to create metadata required to compute the progress of a media + /// upload. + async fn try_create_media_upload_info( + queued_request: &QueuedRequest, + room: &Room, + queue: &QueueStorage, + ) -> Option { + let QueuedRequestKind::MediaUpload { + cache_key, + thumbnail_source, + related_to, + #[cfg(feature = "unstable-msc4274")] + accumulated, + .. + } = &queued_request.kind + else { + return None; + }; + + // Determine the item's index, if this is a gallery upload. + let index = { + cfg_if::cfg_if! { + if #[cfg(feature = "unstable-msc4274")] { + accumulated.len() + } else { + 0 // Before MSC4274 only a single file (and thumbnail) could be sent per event. + } + } + }; + + // Get the size of the file being uploaded from the event cache. + let bytes = if let Ok(cache) = room.client().event_cache_store().lock().await { + if let Ok(Some(content)) = cache.get_media_content(cache_key).await { + content.len() + } else { + 0 + } + } else { + 0 + }; + + // If this is a file upload, get the size of any previously uploaded thumbnail + // from the in-memory media sizes cache. + let uploaded_thumbnail_bytes = if thumbnail_source.is_some() { + if let Some(sizes) = queue.store.lock().await.thumbnail_size_cache.get(related_to) { + sizes.get(index).copied().flatten().unwrap_or(0) + } else { + 0 + } + } else { + 0 + }; + + // If this is a thumbnail upload, get the size of the pending file upload from + // the dependent requests. + let pending_file_bytes = RoomSendQueue::get_dependent_pending_file_upload_size( + queued_request.transaction_id.clone(), + queue, + room, + ) + .await; + + Some(MediaUploadInfo { + index: index as u64, + bytes, + uploaded_thumbnail_bytes, + pending_file_bytes, + }) + } + + /// Determine the size of a pending file upload, if this is a thumbnail + /// upload or return 0 otherwise. + async fn get_dependent_pending_file_upload_size( + txn_id: OwnedTransactionId, + queue: &QueueStorage, + room: &Room, + ) -> usize { + let guard = queue.store.lock().await; + + let Ok(client) = guard.client() else { + return 0; + }; + + let Ok(dependent_requests) = + client.state_store().load_dependent_queued_requests(room.room_id()).await + else { + return 0; + }; + + let Some((cache_key, parent_is_thumbnail_upload)) = dependent_requests.iter().find_map(|r| { + if r.parent_transaction_id != txn_id { + return None; + } + as_variant!(&r.kind, DependentQueuedRequestKind::UploadFileOrThumbnail { cache_key, parent_is_thumbnail_upload, .. } => (cache_key.clone(), *parent_is_thumbnail_upload)) + }) else { + return 0; + }; + + // If this is not a thumbnail upload, we're uploading a gallery and the + // dependent request is for the next gallery item. + if !parent_is_thumbnail_upload { + return 0; + } + + if let Ok(cache) = room.client().event_cache_store().lock().await { + if let Ok(Some(content)) = cache.get_media_content(&cache_key).await { + content.len() + } else { + 0 + } + } else { + 0 + } + } + + /// Try to create an observable to watch a media's upload progress. + fn try_create_media_upload_progress_observable( + report_media_upload_progress: &Arc, + media_upload_info: &MediaUploadInfo, + related_txn_id: &Option, + global_update_sender: &broadcast::Sender, + update_sender: &broadcast::Sender, + room_id: &RoomId, + ) -> Option> { + if !report_media_upload_progress.load(Ordering::SeqCst) { + return None; + } + + if let Some(related_txn_id) = related_txn_id { + let progress: SharedObservable = Default::default(); + let mut subscriber = progress.subscribe(); + + let media_upload_info = media_upload_info.clone(); + let related_to = related_txn_id.clone(); + let global_update_sender = global_update_sender.clone(); + let update_sender = update_sender.clone(); + let room_id = room_id.to_owned(); + + // Watch and communicate the progress on a detached background task. Once + // the progress observable is dropped, next() will return None and the + // task will end. + spawn(async move { + while let Some(progress) = subscriber.next().await { + send_update( + &global_update_sender, + &update_sender, + &room_id, + RoomSendQueueUpdate::MediaUpload { + related_to: related_to.clone(), + file: None, + index: media_upload_info.index, + progress: estimate_combined_media_upload_progress( + estimate_media_upload_progress(progress, media_upload_info.bytes), + &media_upload_info, + ), + }, + ); + } + }); + + Some(progress) + } else { + None + } + } + /// Handles a single request and returns the [`SentRequestKey`] on success /// (unless the request was cancelled, in which case it'll return /// `None`). @@ -916,6 +1096,68 @@ impl RoomSendQueue { } } +fn send_update( + global_update_sender: &broadcast::Sender, + update_sender: &broadcast::Sender, + room_id: &RoomId, + update: RoomSendQueueUpdate, +) { + let _ = update_sender.send(update.clone()); + let _ = global_update_sender.send(SendQueueUpdate { room_id: room_id.to_owned(), update }); +} + +/// Estimates the upload progress for a single media file (either a thumbnail or +/// a file). +/// +/// This proportionally maps the upload progress given in actual bytes sent +/// (possibly after encryption) into units of the unencrypted file sizes. +/// +/// # Arguments +/// +/// * `progress` - The [`TransmissionProgress`] of uploading the file (possibly +/// after encryption). +/// +/// * `bytes` - The total number of bytes in the file before encryption. +fn estimate_media_upload_progress( + progress: TransmissionProgress, + bytes: usize, +) -> AbstractProgress { + if progress.total == 0 { + return AbstractProgress { current: 0, total: 0 }; + } + + // Did the file finish uploading? + if progress.current == progress.total { + return AbstractProgress { current: bytes, total: bytes }; + } + + // The file is still uploading. Use the rule of 3 to proportionally map the + // progress into units of the original file size. + AbstractProgress { + current: (progress.current as f64 / progress.total as f64 * bytes as f64).round() as usize, + total: bytes, + } +} + +/// Estimate the combined upload progress across a media file and its +/// thumbnail. +/// +/// # Arguments +/// +/// * `progress` - The progress of uploading the current file mapped into units +/// of the original file size before encryption. +/// +/// * `info` - Information about the file(s) being uploaded. +fn estimate_combined_media_upload_progress( + progress: AbstractProgress, + info: &MediaUploadInfo, +) -> AbstractProgress { + AbstractProgress { + current: info.uploaded_thumbnail_bytes + progress.current, + total: info.uploaded_thumbnail_bytes + progress.total + info.pending_file_bytes, + } +} + impl From<&crate::Error> for QueueWedgeError { fn from(value: &crate::Error) -> Self { match value { @@ -993,6 +1235,23 @@ struct BeingSentInfo { cancel_upload: Option>, } +/// Information needed to compute the progress of uploading a media and its +/// associated thumbnail. +#[derive(Clone, Default)] +struct MediaUploadInfo { + /// The index of the uploaded item if this is a gallery upload. Otherwise, + /// zero. + index: u64, + /// The total number of bytes in the file currently being uploaded. + bytes: usize, + /// If the current file is not a thumbnail, the total number of bytes in a + /// previously uploaded thumbnail, if any exists. Otherwise, zero. + uploaded_thumbnail_bytes: usize, + /// If the current file is a thumbnail, the total number of bytes in the + /// related media file still to be uploaded. Otherwise, zero. + pending_file_bytes: usize, +} + impl BeingSentInfo { /// Aborts the upload, if a trigger is available. /// @@ -2225,8 +2484,18 @@ pub enum RoomSendQueueUpdate { /// The media event this uploaded media relates to. related_to: OwnedTransactionId, - /// The final media source for the file that was just uploaded. - file: MediaSource, + /// The final media source for the file if it has finished uploading. + file: Option, + + /// The index of the media within the transaction. A file and its + /// thumbnail share the same index. Will always be 0 for non-gallery + /// media uploads. + index: u64, + + /// The combined upload progress across the file and, if existing, its + /// thumbnail. For gallery uploads, the progress is reported per indexed + /// gallery item. + progress: AbstractProgress, }, } diff --git a/crates/matrix-sdk/tests/integration/send_queue.rs b/crates/matrix-sdk/tests/integration/send_queue.rs index 7f67a9328..5c9430341 100644 --- a/crates/matrix-sdk/tests/integration/send_queue.rs +++ b/crates/matrix-sdk/tests/integration/send_queue.rs @@ -14,7 +14,7 @@ use matrix_sdk::{ RoomSendQueueUpdate, SendHandle, SendQueueUpdate, }, test_utils::mocks::{MatrixMock, MatrixMockServer}, - Client, MemoryStore, + AbstractProgress, Client, MemoryStore, }; use matrix_sdk_test::{ async_test, event_factory::EventFactory, InvitedRoomBuilder, KnockedRoomBuilder, @@ -162,21 +162,64 @@ macro_rules! assert_update { }}; // Check the next stream event is a notification about an uploaded media. - // Returns a tuple of (transaction_id, send_handle). (($global_watch:ident, $watch:ident) => uploaded { related_to = $related_to:expr, mxc = $mxc:expr }) => {{ assert_let!( Ok(Ok(RoomSendQueueUpdate::MediaUpload { related_to, file, + .. })) = timeout(Duration::from_secs(1), $watch.recv()).await ); assert_matches!($global_watch.recv().await, Ok(SendQueueUpdate { update: RoomSendQueueUpdate::MediaUpload { .. }, .. })); assert_eq!(related_to, $related_to); - assert_let!(MediaSource::Plain(mxc) = file); + assert_let!(Some(MediaSource::Plain(mxc)) = file); assert_eq!(mxc, $mxc); }}; + // Check the next stream events communicate upload progress and finally the uploaded media. + (($global_watch:ident, $watch:ident) => uploaded_with_progress { + related_to = $related_to:expr, + mxc = $mxc:expr, + index = $index:expr, + progress_start = $progress_start:expr, + progress_end = $progress_end:expr, + progress_total = $progress_total:expr + }) => {{ + let mut prev_progress: Option = None; + + loop { + assert_let!( + Ok(Ok(RoomSendQueueUpdate::MediaUpload { + related_to, + file, + index, + progress, .. + })) = timeout(Duration::from_secs(1), $watch.recv()).await + ); + assert_matches!($global_watch.recv().await, Ok(SendQueueUpdate { update: RoomSendQueueUpdate::MediaUpload { .. }, .. })); + + assert_eq!(related_to, $related_to); + assert_eq!(index, $index); + + if let Some(progress_start) = $progress_start { + assert!(progress.current >= progress_start); + } + assert!(progress.current <= $progress_end); + assert_eq!(progress.total, $progress_total); + if let Some(prev_progress) = prev_progress { + assert!(progress.current >= prev_progress.current); + } + prev_progress = Some(progress); + + if let Some(MediaSource::Plain(mxc)) = file { + assert_eq!(progress.current, $progress_end); + assert_eq!(mxc, $mxc); + break; + } + } + }}; + // Check the next stream event is a local echo for a reaction with the content $key which // applies to the local echo with transaction id $parent. (($global_watch:ident, $watch:ident) => local reaction { key = $key:expr, parent = $parent_txn_id:expr }) => {{ @@ -1843,6 +1886,7 @@ async fn test_media_uploads() { // Mark the room as joined. let room_id = room_id!("!a:b.c"); let client = mock.client_builder().build().await; + client.send_queue().enable_upload_progress(true); let room = mock.sync_joined_room(&client, room_id).await; let q = room.send_queue(); @@ -1874,6 +1918,9 @@ async fn test_media_uploads() { ..Default::default() }); + let size_data = data.len(); + let size_thumbnail = thumbnail.data.len(); + let transaction_id = TransactionId::new(); let mentions = Mentions::with_user_ids([owned_user_id!("@ivan:sdk.rs")]); let config = AttachmentConfig::new() @@ -2033,14 +2080,22 @@ async fn test_media_uploads() { assert!(watch.is_empty()); drop(block_upload); - assert_update!((global_watch, watch) => uploaded { + assert_update!((global_watch, watch) => uploaded_with_progress { related_to = transaction_id, - mxc = mxc_uri!("mxc://sdk.rs/thumbnail") + mxc = mxc_uri!("mxc://sdk.rs/thumbnail"), + index = 0, + progress_start = None, + progress_end = size_thumbnail, + progress_total = size_data + size_thumbnail }); - assert_update!((global_watch, watch) => uploaded { + assert_update!((global_watch, watch) => uploaded_with_progress { related_to = transaction_id, - mxc = mxc_uri!("mxc://sdk.rs/media") + mxc = mxc_uri!("mxc://sdk.rs/media"), + index = 0, + progress_start = Some(size_thumbnail), + progress_end = size_data + size_thumbnail, + progress_total = size_data + size_thumbnail }); let edit_msg = assert_update!((global_watch, watch) => edit local echo {