feat(send_queue): communicate media upload progress in RoomSendQueueUpdate::MediaUpload

Signed-off-by: Johannes Marbach <n0-0ne+github@mailbox.org>
This commit is contained in:
Johannes Marbach
2025-07-07 19:58:36 +02:00
committed by Benjamin Bouvier
parent 30c0420f83
commit fc7124fd1a
2 changed files with 348 additions and 24 deletions

View File

@@ -178,7 +178,7 @@ use crate::{
config::RequestConfig,
error::RetryKind,
room::{edit::EditedContent, WeakRoom},
Client, Media, Room, TransmissionProgress,
AbstractProgress, Client, Media, Room, TransmissionProgress,
};
mod upload;
@@ -578,21 +578,10 @@ impl RoomSendQueue {
locally_enabled: Arc<AtomicBool>,
global_error_sender: broadcast::Sender<SendQueueRoomError>,
is_dropping: Arc<AtomicBool>,
_report_media_upload_progress: Arc<AtomicBool>,
report_media_upload_progress: Arc<AtomicBool>,
) {
trace!("spawned the sending task");
fn send_update(
global_update_sender: &broadcast::Sender<SendQueueUpdate>,
update_sender: &broadcast::Sender<RoomSendQueueUpdate>,
room_id: &RoomId,
update: RoomSendQueueUpdate,
) {
let _ = update_sender.send(update.clone());
let _ =
global_update_sender.send(SendQueueUpdate { room_id: room_id.to_owned(), update });
}
let room_id = room.room_id();
loop {
@@ -649,7 +638,23 @@ impl RoomSendQueue {
continue;
};
match Self::handle_request(&room, queued_request, cancel_upload_rx, None).await {
// Prepare to watch and communicate the request progress for media uploads.
let media_upload_info =
RoomSendQueue::try_create_media_upload_info(&queued_request, &room, &queue)
.await
.unwrap_or_default();
let progress = RoomSendQueue::try_create_media_upload_progress_observable(
&report_media_upload_progress,
&media_upload_info,
&related_txn_id,
&global_update_sender,
&update_sender,
room_id,
);
match Self::handle_request(&room, queued_request, cancel_upload_rx, progress.clone())
.await
{
Ok(Some(parent_key)) => match queue.mark_as_sent(&txn_id, parent_key.clone()).await
{
Ok(()) => match parent_key {
@@ -669,7 +674,16 @@ impl RoomSendQueue {
room_id,
RoomSendQueueUpdate::MediaUpload {
related_to: related_txn_id.as_ref().unwrap_or(&txn_id).clone(),
file: media_info.file,
file: Some(media_info.file),
index: media_upload_info.index,
progress: estimate_combined_media_upload_progress(
// The file finished uploading
AbstractProgress {
current: media_upload_info.bytes,
total: media_upload_info.bytes,
},
&media_upload_info,
),
},
);
}
@@ -755,6 +769,172 @@ impl RoomSendQueue {
info!("exited sending task");
}
/// Try to create metadata required to compute the progress of a media
/// upload.
async fn try_create_media_upload_info(
queued_request: &QueuedRequest,
room: &Room,
queue: &QueueStorage,
) -> Option<MediaUploadInfo> {
let QueuedRequestKind::MediaUpload {
cache_key,
thumbnail_source,
related_to,
#[cfg(feature = "unstable-msc4274")]
accumulated,
..
} = &queued_request.kind
else {
return None;
};
// Determine the item's index, if this is a gallery upload.
let index = {
cfg_if::cfg_if! {
if #[cfg(feature = "unstable-msc4274")] {
accumulated.len()
} else {
0 // Before MSC4274 only a single file (and thumbnail) could be sent per event.
}
}
};
// Get the size of the file being uploaded from the event cache.
let bytes = if let Ok(cache) = room.client().event_cache_store().lock().await {
if let Ok(Some(content)) = cache.get_media_content(cache_key).await {
content.len()
} else {
0
}
} else {
0
};
// If this is a file upload, get the size of any previously uploaded thumbnail
// from the in-memory media sizes cache.
let uploaded_thumbnail_bytes = if thumbnail_source.is_some() {
if let Some(sizes) = queue.store.lock().await.thumbnail_size_cache.get(related_to) {
sizes.get(index).copied().flatten().unwrap_or(0)
} else {
0
}
} else {
0
};
// If this is a thumbnail upload, get the size of the pending file upload from
// the dependent requests.
let pending_file_bytes = RoomSendQueue::get_dependent_pending_file_upload_size(
queued_request.transaction_id.clone(),
queue,
room,
)
.await;
Some(MediaUploadInfo {
index: index as u64,
bytes,
uploaded_thumbnail_bytes,
pending_file_bytes,
})
}
/// Determine the size of a pending file upload, if this is a thumbnail
/// upload or return 0 otherwise.
async fn get_dependent_pending_file_upload_size(
txn_id: OwnedTransactionId,
queue: &QueueStorage,
room: &Room,
) -> usize {
let guard = queue.store.lock().await;
let Ok(client) = guard.client() else {
return 0;
};
let Ok(dependent_requests) =
client.state_store().load_dependent_queued_requests(room.room_id()).await
else {
return 0;
};
let Some((cache_key, parent_is_thumbnail_upload)) = dependent_requests.iter().find_map(|r| {
if r.parent_transaction_id != txn_id {
return None;
}
as_variant!(&r.kind, DependentQueuedRequestKind::UploadFileOrThumbnail { cache_key, parent_is_thumbnail_upload, .. } => (cache_key.clone(), *parent_is_thumbnail_upload))
}) else {
return 0;
};
// If this is not a thumbnail upload, we're uploading a gallery and the
// dependent request is for the next gallery item.
if !parent_is_thumbnail_upload {
return 0;
}
if let Ok(cache) = room.client().event_cache_store().lock().await {
if let Ok(Some(content)) = cache.get_media_content(&cache_key).await {
content.len()
} else {
0
}
} else {
0
}
}
/// Try to create an observable to watch a media's upload progress.
fn try_create_media_upload_progress_observable(
report_media_upload_progress: &Arc<AtomicBool>,
media_upload_info: &MediaUploadInfo,
related_txn_id: &Option<OwnedTransactionId>,
global_update_sender: &broadcast::Sender<SendQueueUpdate>,
update_sender: &broadcast::Sender<RoomSendQueueUpdate>,
room_id: &RoomId,
) -> Option<SharedObservable<TransmissionProgress>> {
if !report_media_upload_progress.load(Ordering::SeqCst) {
return None;
}
if let Some(related_txn_id) = related_txn_id {
let progress: SharedObservable<TransmissionProgress> = Default::default();
let mut subscriber = progress.subscribe();
let media_upload_info = media_upload_info.clone();
let related_to = related_txn_id.clone();
let global_update_sender = global_update_sender.clone();
let update_sender = update_sender.clone();
let room_id = room_id.to_owned();
// Watch and communicate the progress on a detached background task. Once
// the progress observable is dropped, next() will return None and the
// task will end.
spawn(async move {
while let Some(progress) = subscriber.next().await {
send_update(
&global_update_sender,
&update_sender,
&room_id,
RoomSendQueueUpdate::MediaUpload {
related_to: related_to.clone(),
file: None,
index: media_upload_info.index,
progress: estimate_combined_media_upload_progress(
estimate_media_upload_progress(progress, media_upload_info.bytes),
&media_upload_info,
),
},
);
}
});
Some(progress)
} else {
None
}
}
/// Handles a single request and returns the [`SentRequestKey`] on success
/// (unless the request was cancelled, in which case it'll return
/// `None`).
@@ -916,6 +1096,68 @@ impl RoomSendQueue {
}
}
fn send_update(
global_update_sender: &broadcast::Sender<SendQueueUpdate>,
update_sender: &broadcast::Sender<RoomSendQueueUpdate>,
room_id: &RoomId,
update: RoomSendQueueUpdate,
) {
let _ = update_sender.send(update.clone());
let _ = global_update_sender.send(SendQueueUpdate { room_id: room_id.to_owned(), update });
}
/// Estimates the upload progress for a single media file (either a thumbnail or
/// a file).
///
/// This proportionally maps the upload progress given in actual bytes sent
/// (possibly after encryption) into units of the unencrypted file sizes.
///
/// # Arguments
///
/// * `progress` - The [`TransmissionProgress`] of uploading the file (possibly
/// after encryption).
///
/// * `bytes` - The total number of bytes in the file before encryption.
fn estimate_media_upload_progress(
progress: TransmissionProgress,
bytes: usize,
) -> AbstractProgress {
if progress.total == 0 {
return AbstractProgress { current: 0, total: 0 };
}
// Did the file finish uploading?
if progress.current == progress.total {
return AbstractProgress { current: bytes, total: bytes };
}
// The file is still uploading. Use the rule of 3 to proportionally map the
// progress into units of the original file size.
AbstractProgress {
current: (progress.current as f64 / progress.total as f64 * bytes as f64).round() as usize,
total: bytes,
}
}
/// Estimate the combined upload progress across a media file and its
/// thumbnail.
///
/// # Arguments
///
/// * `progress` - The progress of uploading the current file mapped into units
/// of the original file size before encryption.
///
/// * `info` - Information about the file(s) being uploaded.
fn estimate_combined_media_upload_progress(
progress: AbstractProgress,
info: &MediaUploadInfo,
) -> AbstractProgress {
AbstractProgress {
current: info.uploaded_thumbnail_bytes + progress.current,
total: info.uploaded_thumbnail_bytes + progress.total + info.pending_file_bytes,
}
}
impl From<&crate::Error> for QueueWedgeError {
fn from(value: &crate::Error) -> Self {
match value {
@@ -993,6 +1235,23 @@ struct BeingSentInfo {
cancel_upload: Option<oneshot::Sender<()>>,
}
/// Information needed to compute the progress of uploading a media and its
/// associated thumbnail.
#[derive(Clone, Default)]
struct MediaUploadInfo {
/// The index of the uploaded item if this is a gallery upload. Otherwise,
/// zero.
index: u64,
/// The total number of bytes in the file currently being uploaded.
bytes: usize,
/// If the current file is not a thumbnail, the total number of bytes in a
/// previously uploaded thumbnail, if any exists. Otherwise, zero.
uploaded_thumbnail_bytes: usize,
/// If the current file is a thumbnail, the total number of bytes in the
/// related media file still to be uploaded. Otherwise, zero.
pending_file_bytes: usize,
}
impl BeingSentInfo {
/// Aborts the upload, if a trigger is available.
///
@@ -2225,8 +2484,18 @@ pub enum RoomSendQueueUpdate {
/// The media event this uploaded media relates to.
related_to: OwnedTransactionId,
/// The final media source for the file that was just uploaded.
file: MediaSource,
/// The final media source for the file if it has finished uploading.
file: Option<MediaSource>,
/// The index of the media within the transaction. A file and its
/// thumbnail share the same index. Will always be 0 for non-gallery
/// media uploads.
index: u64,
/// The combined upload progress across the file and, if existing, its
/// thumbnail. For gallery uploads, the progress is reported per indexed
/// gallery item.
progress: AbstractProgress,
},
}

View File

@@ -14,7 +14,7 @@ use matrix_sdk::{
RoomSendQueueUpdate, SendHandle, SendQueueUpdate,
},
test_utils::mocks::{MatrixMock, MatrixMockServer},
Client, MemoryStore,
AbstractProgress, Client, MemoryStore,
};
use matrix_sdk_test::{
async_test, event_factory::EventFactory, InvitedRoomBuilder, KnockedRoomBuilder,
@@ -162,21 +162,64 @@ macro_rules! assert_update {
}};
// Check the next stream event is a notification about an uploaded media.
// Returns a tuple of (transaction_id, send_handle).
(($global_watch:ident, $watch:ident) => uploaded { related_to = $related_to:expr, mxc = $mxc:expr }) => {{
assert_let!(
Ok(Ok(RoomSendQueueUpdate::MediaUpload {
related_to,
file,
..
})) = timeout(Duration::from_secs(1), $watch.recv()).await
);
assert_matches!($global_watch.recv().await, Ok(SendQueueUpdate { update: RoomSendQueueUpdate::MediaUpload { .. }, .. }));
assert_eq!(related_to, $related_to);
assert_let!(MediaSource::Plain(mxc) = file);
assert_let!(Some(MediaSource::Plain(mxc)) = file);
assert_eq!(mxc, $mxc);
}};
// Check the next stream events communicate upload progress and finally the uploaded media.
(($global_watch:ident, $watch:ident) => uploaded_with_progress {
related_to = $related_to:expr,
mxc = $mxc:expr,
index = $index:expr,
progress_start = $progress_start:expr,
progress_end = $progress_end:expr,
progress_total = $progress_total:expr
}) => {{
let mut prev_progress: Option<AbstractProgress> = None;
loop {
assert_let!(
Ok(Ok(RoomSendQueueUpdate::MediaUpload {
related_to,
file,
index,
progress, ..
})) = timeout(Duration::from_secs(1), $watch.recv()).await
);
assert_matches!($global_watch.recv().await, Ok(SendQueueUpdate { update: RoomSendQueueUpdate::MediaUpload { .. }, .. }));
assert_eq!(related_to, $related_to);
assert_eq!(index, $index);
if let Some(progress_start) = $progress_start {
assert!(progress.current >= progress_start);
}
assert!(progress.current <= $progress_end);
assert_eq!(progress.total, $progress_total);
if let Some(prev_progress) = prev_progress {
assert!(progress.current >= prev_progress.current);
}
prev_progress = Some(progress);
if let Some(MediaSource::Plain(mxc)) = file {
assert_eq!(progress.current, $progress_end);
assert_eq!(mxc, $mxc);
break;
}
}
}};
// Check the next stream event is a local echo for a reaction with the content $key which
// applies to the local echo with transaction id $parent.
(($global_watch:ident, $watch:ident) => local reaction { key = $key:expr, parent = $parent_txn_id:expr }) => {{
@@ -1843,6 +1886,7 @@ async fn test_media_uploads() {
// Mark the room as joined.
let room_id = room_id!("!a:b.c");
let client = mock.client_builder().build().await;
client.send_queue().enable_upload_progress(true);
let room = mock.sync_joined_room(&client, room_id).await;
let q = room.send_queue();
@@ -1874,6 +1918,9 @@ async fn test_media_uploads() {
..Default::default()
});
let size_data = data.len();
let size_thumbnail = thumbnail.data.len();
let transaction_id = TransactionId::new();
let mentions = Mentions::with_user_ids([owned_user_id!("@ivan:sdk.rs")]);
let config = AttachmentConfig::new()
@@ -2033,14 +2080,22 @@ async fn test_media_uploads() {
assert!(watch.is_empty());
drop(block_upload);
assert_update!((global_watch, watch) => uploaded {
assert_update!((global_watch, watch) => uploaded_with_progress {
related_to = transaction_id,
mxc = mxc_uri!("mxc://sdk.rs/thumbnail")
mxc = mxc_uri!("mxc://sdk.rs/thumbnail"),
index = 0,
progress_start = None,
progress_end = size_thumbnail,
progress_total = size_data + size_thumbnail
});
assert_update!((global_watch, watch) => uploaded {
assert_update!((global_watch, watch) => uploaded_with_progress {
related_to = transaction_id,
mxc = mxc_uri!("mxc://sdk.rs/media")
mxc = mxc_uri!("mxc://sdk.rs/media"),
index = 0,
progress_start = Some(size_thumbnail),
progress_end = size_data + size_thumbnail,
progress_total = size_data + size_thumbnail
});
let edit_msg = assert_update!((global_watch, watch) => edit local echo {