refactor: Make SendMediaUploadRequest a builder, allowing to create a media upload request with a more flexible API. Use SendMediaUploadRequest in Media::upload

This commit is contained in:
Jorge Martín
2025-07-22 13:08:24 +02:00
parent 2e32c252e8
commit d665ab238e
10 changed files with 151 additions and 96 deletions

View File

@@ -39,8 +39,8 @@ use matrix_sdk::{
},
sliding_sync::Version as SdkSlidingSyncVersion,
store::RoomLoadSettings as SdkRoomLoadSettings,
AuthApi, AuthSession, Client as MatrixClient, SessionChange, SessionTokens,
STATE_STORE_DATABASE_NAME,
AuthApi, AuthSession, Client as MatrixClient, SendMediaUploadRequest, SessionChange,
SessionTokens, STATE_STORE_DATABASE_NAME,
};
use matrix_sdk_common::{stream::StreamExt, SendOutsideWasm, SyncOutsideWasm};
use matrix_sdk_ui::{
@@ -1030,10 +1030,12 @@ impl Client {
progress_watcher: Option<Box<dyn ProgressWatcher>>,
) -> Result<String, ClientError> {
let mime_type: mime::Mime = mime_type.parse().context("Parsing mime type")?;
let request = self.inner.media().upload(&mime_type, data, filename, None);
let send_media_request = SendMediaUploadRequest::new((*self.inner).clone(), data)
.with_content_type(mime_type.essence_str().to_owned())
.with_filename(filename);
if let Some(progress_watcher) = progress_watcher {
let mut subscriber = request.subscribe_to_send_progress();
let mut subscriber = send_media_request.subscribe_to_send_progress();
get_runtime_handle().spawn(async move {
while let Some(progress) = subscriber.next().await {
progress_watcher.transmission_progress(progress.into());
@@ -1041,7 +1043,7 @@ impl Client {
});
}
let response = request.await?;
let response = self.inner.media().upload(send_media_request).await?;
Ok(String::from(response.content_uri))
}

View File

@@ -55,7 +55,7 @@ use ruma::{
use serde::Deserialize;
use tracing::error;
use crate::{config::RequestConfig, Client, Error, Result};
use crate::{config::RequestConfig, Client, Error, Result, SendMediaUploadRequest};
/// A high-level API to manage the client owner's account.
///
@@ -264,7 +264,10 @@ impl Account {
///
/// [`Media::upload()`]: crate::Media::upload
pub async fn upload_avatar(&self, content_type: &Mime, data: Vec<u8>) -> Result<OwnedMxcUri> {
let upload_response = self.client.media().upload(content_type, data, None, None).await?;
let send_media_request = SendMediaUploadRequest::new(self.client.clone(), data)
.with_content_type(content_type.essence_str().to_owned());
let upload_response = self.client.media().upload(send_media_request).await?;
self.set_avatar_url(Some(&upload_response.content_uri)).await?;
Ok(upload_response.content_uri)
}

View File

@@ -156,17 +156,50 @@ where
}
}
/// `IntoFuture` used to send media upload requests. It wraps another
/// [`SendRequest`], checking its size will be accepted by the homeserver before
/// uploading.
/// `IntoFuture` used to send media upload requests. It works as a builder which
/// can create a [`SendRequest`] given several configuration options, checking
/// its size will be accepted by the homeserver before uploading.
#[allow(missing_debug_implementations)]
pub struct SendMediaUploadRequest {
send_request: SendRequest<media::create_content::v3::Request>,
client: Client,
request: media::create_content::v3::Request,
/// The [`RequestConfig`] to use for uploading the media file.
pub request_config: Option<RequestConfig>,
progress_observable: SharedObservable<TransmissionProgress>,
}
impl SendMediaUploadRequest {
pub fn new(request: SendRequest<media::create_content::v3::Request>) -> Self {
Self { send_request: request }
/// Creates a new instance.
pub fn new(client: Client, file: Vec<u8>) -> Self {
Self {
client,
request: media::create_content::v3::Request::new(file),
request_config: Default::default(),
progress_observable: SharedObservable::new(TransmissionProgress::default()),
}
}
/// Returns the data to upload.
pub fn data(&self) -> &Vec<u8> {
&self.request.file
}
/// Sets the content type of the media for the media upload request.
pub fn with_content_type(mut self, content_type: impl Into<String>) -> Self {
self.request.content_type = Some(content_type.into());
self
}
/// Sets the filename for the media upload request.
pub fn with_filename(mut self, filename: Option<impl Into<String>>) -> Self {
self.request.filename = filename.map(Into::into);
self
}
/// Applies the provided [`RequestConfig`] to the future [`SendRequest`].
pub fn with_request_config(mut self, request_config: Option<RequestConfig>) -> Self {
self.request_config = request_config;
self
}
/// Replace the default `SharedObservable` used for tracking upload
@@ -179,14 +212,24 @@ impl SendMediaUploadRequest {
mut self,
send_progress: SharedObservable<TransmissionProgress>,
) -> Self {
self.send_request = self.send_request.with_send_progress_observable(send_progress);
self.progress_observable = send_progress;
self
}
/// Get a subscriber to observe the progress of sending the request
/// body.
pub fn subscribe_to_send_progress(&self) -> Subscriber<TransmissionProgress> {
self.send_request.send_progress.subscribe()
self.progress_observable.subscribe()
}
/// Creates the [`SendRequest`] using the provided parameters.
pub fn build_send_request(self) -> SendRequest<media::create_content::v3::Request> {
SendRequest {
client: self.client,
request: self.request,
config: self.request_config,
send_progress: self.progress_observable,
}
}
}
@@ -195,9 +238,9 @@ impl IntoFuture for SendMediaUploadRequest {
boxed_into_future!();
fn into_future(self) -> Self::IntoFuture {
let request_length = self.send_request.request.file.len();
let client = self.send_request.client.clone();
let send_request = self.send_request;
let send_request = self.build_send_request();
let request_length = send_request.request.file.len();
let client = send_request.client.clone();
Box::pin(async move {
let max_upload_size = client.load_or_fetch_max_upload_size().await?;

View File

@@ -111,7 +111,10 @@ mod builder;
pub(crate) mod caches;
pub(crate) mod futures;
pub use self::builder::{sanitize_server_name, ClientBuildError, ClientBuilder};
pub use self::{
builder::{sanitize_server_name, ClientBuildError, ClientBuilder},
futures::SendMediaUploadRequest,
};
#[cfg(not(target_family = "wasm"))]
type NotificationHandlerFut = Pin<Box<dyn Future<Output = ()> + Send>>;
@@ -2909,7 +2912,6 @@ pub(crate) mod tests {
use assert_matches::assert_matches;
use assert_matches2::assert_let;
use eyeball::SharedObservable;
use futures_util::{pin_mut, FutureExt};
use js_int::{uint, UInt};
use matrix_sdk_base::{
@@ -2950,10 +2952,9 @@ pub(crate) mod tests {
use crate::{
client::{futures::SendMediaUploadRequest, WeakClient},
config::RequestConfig,
futures::SendRequest,
media::MediaError,
test_utils::{client::MockClientBuilder, mocks::MatrixMockServer},
Error, TransmissionProgress,
Error,
};
#[async_test]
@@ -3781,15 +3782,7 @@ pub(crate) mod tests {
assert_eq!(*client.inner.server_max_upload_size.lock().await.get().unwrap(), uint!(1));
let data = vec![1, 2];
let upload_request =
ruma::api::client::media::create_content::v3::Request::new(data.clone());
let request = SendRequest {
client: client.clone(),
request: upload_request,
config: None,
send_progress: SharedObservable::new(TransmissionProgress::default()),
};
let media_request = SendMediaUploadRequest::new(request);
let media_request = SendMediaUploadRequest::new(client, data.clone());
let error = media_request.await.err();
assert_let!(Some(Error::Media(MediaError::MediaTooLargeToUpload { max, current })) = error);

View File

@@ -23,7 +23,9 @@ use eyeball::{SharedObservable, Subscriber};
use matrix_sdk_common::boxed_into_future;
use ruma::events::room::{EncryptedFile, EncryptedFileInit};
use crate::{config::RequestConfig, Client, Media, Result, TransmissionProgress};
use crate::{
config::RequestConfig, Client, Media, Result, SendMediaUploadRequest, TransmissionProgress,
};
/// Future returned by [`Client::upload_encrypted_file`].
#[allow(missing_debug_implementations)]
@@ -89,11 +91,12 @@ where
let request_config =
request_config.map(|config| config.timeout(Media::reasonable_upload_timeout(&buf)));
let response = client
.media()
.upload(&mime::APPLICATION_OCTET_STREAM, buf, None, request_config)
.with_send_progress_observable(send_progress)
.await?;
let send_media_request = SendMediaUploadRequest::new(self.client.clone(), buf)
.with_content_type(mime::APPLICATION_OCTET_STREAM.essence_str())
.with_request_config(request_config)
.with_send_progress_observable(send_progress);
let response = client.media().upload(send_media_request).await?;
let file: EncryptedFile = {
let keys = encryptor.finish();

View File

@@ -94,6 +94,9 @@ pub use sliding_sync::{
uniffi::setup_scaffolding!();
pub mod live_location_share;
pub use client::SendMediaUploadRequest;
#[cfg(any(test, feature = "testing"))]
pub mod test_utils;

View File

@@ -41,8 +41,8 @@ use tempfile::{Builder as TempFileBuilder, NamedTempFile, TempDir};
use tokio::{fs::File as TokioFile, io::AsyncWriteExt};
use crate::{
attachment::Thumbnail, client::futures::SendMediaUploadRequest, config::RequestConfig, Client,
Error, Result, TransmissionProgress,
attachment::Thumbnail, client::futures::SendMediaUploadRequest, Client, Error, Result,
TransmissionProgress,
};
/// A conservative upload speed of 1Mbps
@@ -165,24 +165,14 @@ impl Media {
Self { client }
}
/// Upload some media to the server.
///
/// # Arguments
///
/// * `content_type` - The type of the media, this will be used as the
/// content-type header.
///
/// * `data` - Vector of bytes to be uploaded to the server.
///
/// * `request_config` - Optional request configuration for the HTTP client,
/// overriding the default. If not provided, a reasonable timeout value is
/// inferred.
/// Upload some media to the server using the provided
/// [`SendMediaUploadRequest`].
///
/// # Examples
///
/// ```no_run
/// # use std::fs;
/// # use matrix_sdk::{Client, ruma::room_id};
/// # use matrix_sdk::{Client, ruma::room_id, SendMediaUploadRequest};
/// # use url::Url;
/// # use mime;
/// # async {
@@ -190,32 +180,25 @@ impl Media {
/// # let mut client = Client::new(homeserver).await?;
/// let image = fs::read("/home/example/my-cat.jpg")?;
///
/// let response = client
/// .media()
/// .upload(&mime::IMAGE_JPEG, image, Some("my-cat.jpg".to_string()), None)
/// .await?;
/// let send_media_request = SendMediaUploadRequest::new(client.clone(), image)
/// .with_content_type(mime::IMAGE_JPEG.essence_str().to_string())
/// .with_filename(Some("my-cat.jpg"));
///
/// let response = client.media().upload(send_media_request).await?;
///
/// println!("Cat URI: {}", response.content_uri);
/// # anyhow::Ok(()) };
/// ```
pub fn upload(
pub async fn upload(
&self,
content_type: &Mime,
data: Vec<u8>,
filename: Option<String>,
request_config: Option<RequestConfig>,
) -> SendMediaUploadRequest {
let request_config = request_config.unwrap_or_else(|| {
self.client.request_config().timeout(Self::reasonable_upload_timeout(&data))
send_media_upload_request: SendMediaUploadRequest,
) -> Result<media::create_content::v3::Response, Error> {
let request_config = send_media_upload_request.request_config.unwrap_or_else(|| {
self.client
.request_config()
.timeout(Self::reasonable_upload_timeout(send_media_upload_request.data()))
});
let request = assign!(media::create_content::v3::Request::new(data), {
filename,
content_type: Some(content_type.essence_str().to_owned()),
});
let request = self.client.send(request).with_request_config(request_config);
SendMediaUploadRequest::new(request)
send_media_upload_request.with_request_config(Some(request_config)).await
}
/// Returns a reasonable upload timeout for an upload, based on the size of
@@ -732,11 +715,12 @@ impl Media {
let upload_thumbnail =
self.upload_thumbnail(thumbnail, filename.clone(), send_progress.clone());
let upload_attachment = async move {
self.upload(content_type, data, filename, None)
.with_send_progress_observable(send_progress)
.await
};
let send_media_request = SendMediaUploadRequest::new(self.client.clone(), data)
.with_content_type(content_type.essence_str().to_owned())
.with_filename(filename)
.with_send_progress_observable(send_progress);
let upload_attachment = async move { self.upload(send_media_request).await };
let (thumbnail, response) = try_join(upload_thumbnail, upload_attachment).await?;
@@ -758,10 +742,13 @@ impl Media {
let (data, content_type, thumbnail_info) = thumbnail.into_parts();
let filename = filename.map(|name| format!("thumbnail-{name}"));
let response = self
.upload(&content_type, data, filename, None)
.with_send_progress_observable(send_progress)
.await?;
let send_media_request = SendMediaUploadRequest::new(self.client.clone(), data)
.with_content_type(content_type.essence_str().to_owned())
.with_filename(filename)
.with_send_progress_observable(send_progress);
let response = self.upload(send_media_request).await?;
let url = response.content_uri;
Ok(Some((MediaSource::Plain(url), thumbnail_info)))

View File

@@ -154,7 +154,8 @@ use crate::{
},
sync::RoomUpdate,
utils::{IntoRawMessageLikeEventContent, IntoRawStateEventContent},
BaseRoom, Client, Error, HttpResult, Result, RoomState, TransmissionProgress,
BaseRoom, Client, Error, HttpResult, Result, RoomState, SendMediaUploadRequest,
TransmissionProgress,
};
#[cfg(feature = "e2e-encryption")]
use crate::{crypto::types::events::CryptoContextInfo, encryption::backups::BackupState};
@@ -2258,7 +2259,13 @@ impl Room {
let (media_source, thumbnail) = self
.client
.media()
.upload_plain_media_and_thumbnail(content_type, data.clone(), thumbnail, send_progress)
.upload_plain_media_and_thumbnail(
content_type,
data.clone(),
Some(filename.clone()),
thumbnail,
send_progress,
)
.await?;
if store_in_cache {
@@ -2515,7 +2522,10 @@ impl Room {
) -> Result<send_state_event::v3::Response> {
self.ensure_room_joined()?;
let upload_response = self.client.media().upload(mime, data, None, None).await?;
let send_media_request = SendMediaUploadRequest::new(self.client.clone(), data)
.with_content_type(mime.essence_str().to_owned());
let upload_response = self.client.media().upload(send_media_request).await?;
let mut info = info.unwrap_or_default();
info.blurhash = upload_response.blurhash;
info.mimetype = Some(mime.to_string());

View File

@@ -142,7 +142,7 @@ use as_variant::as_variant;
use matrix_sdk_base::store::FinishGalleryItemInfo;
use matrix_sdk_base::{
event_cache::store::EventCacheStoreError,
media::MediaRequestParameters,
media::{MediaEventContent, MediaRequestParameters},
store::{
ChildTransactionId, DependentQueuedRequest, DependentQueuedRequestKind, DynStateStore,
FinishUploadThumbnailInfo, QueueWedgeError, QueuedRequest, QueuedRequestKind,
@@ -169,7 +169,7 @@ use ruma::{
};
use tokio::sync::{broadcast, oneshot, Mutex, Notify, OwnedMutexGuard};
use tracing::{debug, error, info, instrument, trace, warn};
use matrix_sdk_base::media::MediaEventContent;
#[cfg(feature = "e2e-encryption")]
use crate::crypto::{OlmError, SessionRecipientCollectionError};
use crate::{
@@ -177,7 +177,7 @@ use crate::{
config::RequestConfig,
error::RetryKind,
room::{edit::EditedContent, WeakRoom},
Client, Media, Room,
Client, Media, Room, SendMediaUploadRequest,
};
mod upload;
@@ -789,11 +789,13 @@ impl RoomSendQueue {
trace!("upload will be in clear text (room without encryption)");
let request_config = RequestConfig::short_retry()
.timeout(Media::reasonable_upload_timeout(&data));
let res = room
.client()
.media()
.upload(&mime, data, filename, Some(request_config))
.await?;
let send_media_request = SendMediaUploadRequest::new(room.client(), data)
.with_content_type(mime.essence_str())
.with_request_config(Some(request_config))
.with_filename(filename);
let res = room.client().media().upload(send_media_request).await?;
MediaSource::Plain(res.content_uri)
};
@@ -801,8 +803,13 @@ impl RoomSendQueue {
let media_source = {
let request_config = RequestConfig::short_retry()
.timeout(Media::reasonable_upload_timeout(&data));
let res =
room.client().media().upload(&mime, data, Some(request_config)).await?;
let send_media_request = SendMediaUploadRequest::new(room.client(), data)
.with_content_type(mime.essence_str())
.with_request_config(Some(request_config))
.with_filename(filename);
let res = room.client().media().upload(send_media_request).await?;
MediaSource::Plain(res.content_uri)
};

View File

@@ -2583,7 +2583,8 @@ impl<'a> MockEndpoint<'a, UploadEndpoint> {
/// # Examples
///
/// ```no_run
/// # tokio_test::block_on(async {
/// # use matrix_sdk::SendMediaUploadRequest;
/// tokio_test::block_on(async {
/// use matrix_sdk::{
/// ruma::{event_id, mxc_uri, room_id},
/// test_utils::mocks::MatrixMockServer,
@@ -2595,7 +2596,10 @@ impl<'a> MockEndpoint<'a, UploadEndpoint> {
/// let (receiver, upload_mock) = server.mock_upload().ok_with_capture(mxid);
/// let client = server.client_builder().build().await;
///
/// client.media().upload(&mime::TEXT_PLAIN, vec![1, 2, 3, 4, 5], None).await?;
/// let send_media_request = SendMediaUploadRequest::new(client.clone(), vec![1, 2, 3, 4, 5])
/// .with_content_type(mime::TEXT_PLAIN.essence_str());
///
/// client.media().upload(send_media_request).await?;
///
/// let uploaded = receiver.await?;
///