feat(timeline): use the send queue for media uploads behind a feature toggle

This commit is contained in:
Benjamin Bouvier
2024-11-06 14:23:27 +01:00
parent 9178e4ce33
commit 4bbe620d0f
2 changed files with 81 additions and 14 deletions

View File

@@ -106,12 +106,17 @@ impl Timeline {
mime_type: Option<String>,
attachment_config: AttachmentConfig,
progress_watcher: Option<Box<dyn ProgressWatcher>>,
use_send_queue: bool,
) -> Result<(), RoomError> {
let mime_str = mime_type.as_ref().ok_or(RoomError::InvalidAttachmentMimeType)?;
let mime_type =
mime_str.parse::<Mime>().map_err(|_| RoomError::InvalidAttachmentMimeType)?;
let request = self.inner.send_attachment(filename, mime_type, attachment_config);
let mut request = self.inner.send_attachment(filename, mime_type, attachment_config);
if use_send_queue {
request = request.use_send_queue();
}
if let Some(progress_watcher) = progress_watcher {
let mut subscriber = request.subscribe_to_send_progress();
@@ -273,6 +278,7 @@ impl Timeline {
}
}
#[allow(clippy::too_many_arguments)]
pub fn send_image(
self: Arc<Self>,
url: String,
@@ -281,6 +287,7 @@ impl Timeline {
caption: Option<String>,
formatted_caption: Option<FormattedBody>,
progress_watcher: Option<Box<dyn ProgressWatcher>>,
use_send_queue: bool,
) -> Arc<SendAttachmentJoinHandle> {
SendAttachmentJoinHandle::new(RUNTIME.spawn(async move {
let base_image_info = BaseImageInfo::try_from(&image_info)
@@ -292,11 +299,18 @@ impl Timeline {
.caption(caption)
.formatted_caption(formatted_caption.map(Into::into));
self.send_attachment(url, image_info.mimetype, attachment_config, progress_watcher)
.await
self.send_attachment(
url,
image_info.mimetype,
attachment_config,
progress_watcher,
use_send_queue,
)
.await
}))
}
#[allow(clippy::too_many_arguments)]
pub fn send_video(
self: Arc<Self>,
url: String,
@@ -305,6 +319,7 @@ impl Timeline {
caption: Option<String>,
formatted_caption: Option<FormattedBody>,
progress_watcher: Option<Box<dyn ProgressWatcher>>,
use_send_queue: bool,
) -> Arc<SendAttachmentJoinHandle> {
SendAttachmentJoinHandle::new(RUNTIME.spawn(async move {
let base_video_info: BaseVideoInfo = BaseVideoInfo::try_from(&video_info)
@@ -316,8 +331,14 @@ impl Timeline {
.caption(caption)
.formatted_caption(formatted_caption.map(Into::into));
self.send_attachment(url, video_info.mimetype, attachment_config, progress_watcher)
.await
self.send_attachment(
url,
video_info.mimetype,
attachment_config,
progress_watcher,
use_send_queue,
)
.await
}))
}
@@ -328,6 +349,7 @@ impl Timeline {
caption: Option<String>,
formatted_caption: Option<FormattedBody>,
progress_watcher: Option<Box<dyn ProgressWatcher>>,
use_send_queue: bool,
) -> Arc<SendAttachmentJoinHandle> {
SendAttachmentJoinHandle::new(RUNTIME.spawn(async move {
let base_audio_info: BaseAudioInfo = BaseAudioInfo::try_from(&audio_info)
@@ -339,8 +361,14 @@ impl Timeline {
.caption(caption)
.formatted_caption(formatted_caption.map(Into::into));
self.send_attachment(url, audio_info.mimetype, attachment_config, progress_watcher)
.await
self.send_attachment(
url,
audio_info.mimetype,
attachment_config,
progress_watcher,
use_send_queue,
)
.await
}))
}
@@ -353,6 +381,7 @@ impl Timeline {
caption: Option<String>,
formatted_caption: Option<FormattedBody>,
progress_watcher: Option<Box<dyn ProgressWatcher>>,
use_send_queue: bool,
) -> Arc<SendAttachmentJoinHandle> {
SendAttachmentJoinHandle::new(RUNTIME.spawn(async move {
let base_audio_info: BaseAudioInfo = BaseAudioInfo::try_from(&audio_info)
@@ -365,8 +394,14 @@ impl Timeline {
.caption(caption)
.formatted_caption(formatted_caption.map(Into::into));
self.send_attachment(url, audio_info.mimetype, attachment_config, progress_watcher)
.await
self.send_attachment(
url,
audio_info.mimetype,
attachment_config,
progress_watcher,
use_send_queue,
)
.await
}))
}
@@ -375,6 +410,7 @@ impl Timeline {
url: String,
file_info: FileInfo,
progress_watcher: Option<Box<dyn ProgressWatcher>>,
use_send_queue: bool,
) -> Arc<SendAttachmentJoinHandle> {
SendAttachmentJoinHandle::new(RUNTIME.spawn(async move {
let base_file_info: BaseFileInfo =
@@ -383,7 +419,14 @@ impl Timeline {
let attachment_config = AttachmentConfig::new().info(attachment_info);
self.send_attachment(url, file_info.mimetype, attachment_config, progress_watcher).await
self.send_attachment(
url,
file_info.mimetype,
attachment_config,
progress_watcher,
use_send_queue,
)
.await
}))
}

View File

@@ -15,6 +15,7 @@ pub struct SendAttachment<'a> {
config: AttachmentConfig,
tracing_span: Span,
pub(crate) send_progress: SharedObservable<TransmissionProgress>,
use_send_queue: bool,
}
impl<'a> SendAttachment<'a> {
@@ -31,9 +32,22 @@ impl<'a> SendAttachment<'a> {
config,
tracing_span: Span::current(),
send_progress: Default::default(),
use_send_queue: false,
}
}
/// (Experimental) Uses the send queue to upload this media.
///
/// This uses the send queue to upload the medias, and as such it provides
/// local echoes for the uploaded media too, not blocking the sending
/// request.
///
/// This will be the default in future versions, when the feature work will
/// be done there.
pub fn use_send_queue(self) -> Self {
Self { use_send_queue: true, ..self }
}
/// Get a subscriber to observe the progress of sending the request
/// body.
#[cfg(not(target_arch = "wasm32"))]
@@ -47,7 +61,8 @@ impl<'a> IntoFuture for SendAttachment<'a> {
boxed_into_future!(extra_bounds: 'a);
fn into_future(self) -> Self::IntoFuture {
let Self { timeline, path, mime_type, config, tracing_span, send_progress: _ } = self;
let Self { timeline, path, mime_type, config, tracing_span, use_send_queue, send_progress } =
self;
let fut = async move {
let filename = path
@@ -57,9 +72,18 @@ impl<'a> IntoFuture for SendAttachment<'a> {
.ok_or(Error::InvalidAttachmentFileName)?;
let data = fs::read(&path).map_err(|_| Error::InvalidAttachmentData)?;
let send_queue = timeline.room().send_queue();
let fut = send_queue.send_attachment(filename, mime_type, data, config);
fut.await.map_err(|_| Error::FailedSendingAttachment)?;
if use_send_queue {
let send_queue = timeline.room().send_queue();
let fut = send_queue.send_attachment(filename, mime_type, data, config);
fut.await.map_err(|_| Error::FailedSendingAttachment)?;
} else {
let fut = timeline
.room()
.send_attachment(filename, &mime_type, data, config)
.with_send_progress_observable(send_progress)
.store_in_cache();
fut.await.map_err(|_| Error::FailedSendingAttachment)?;
}
Ok(())
};