feat(media)!: optionally cache a media after upload

Changelog: Uploaded medias can now be cached in multiple
attachment-related methods like `Room::send_attachment`.
This commit is contained in:
Benjamin Bouvier
2024-10-17 16:08:52 +02:00
parent 3887c10444
commit d3bfdb9563
5 changed files with 156 additions and 30 deletions

View File

@@ -104,12 +104,18 @@ impl Timeline {
mime_type: Option<String>,
attachment_config: AttachmentConfig,
progress_watcher: Option<Box<dyn ProgressWatcher>>,
store_in_cache: bool,
) -> Result<(), RoomError> {
let mime_str = mime_type.as_ref().ok_or(RoomError::InvalidAttachmentMimeType)?;
let mime_type =
mime_str.parse::<Mime>().map_err(|_| RoomError::InvalidAttachmentMimeType)?;
let request = self.inner.send_attachment(filename, mime_type, attachment_config);
let mut request = self.inner.send_attachment(filename, mime_type, attachment_config);
if store_in_cache {
request.store_in_cache();
}
if let Some(progress_watcher) = progress_watcher {
let mut subscriber = request.subscribe_to_send_progress();
RUNTIME.spawn(async move {
@@ -270,6 +276,7 @@ impl Timeline {
}
}
#[allow(clippy::too_many_arguments)]
pub fn send_image(
self: Arc<Self>,
url: String,
@@ -278,6 +285,7 @@ impl Timeline {
caption: Option<String>,
formatted_caption: Option<FormattedBody>,
progress_watcher: Option<Box<dyn ProgressWatcher>>,
store_in_cache: bool,
) -> Arc<SendAttachmentJoinHandle> {
SendAttachmentJoinHandle::new(RUNTIME.spawn(async move {
let base_image_info = BaseImageInfo::try_from(&image_info)
@@ -289,11 +297,18 @@ impl Timeline {
.caption(caption)
.formatted_caption(formatted_caption.map(Into::into));
self.send_attachment(url, image_info.mimetype, attachment_config, progress_watcher)
.await
self.send_attachment(
url,
image_info.mimetype,
attachment_config,
progress_watcher,
store_in_cache,
)
.await
}))
}
#[allow(clippy::too_many_arguments)]
pub fn send_video(
self: Arc<Self>,
url: String,
@@ -302,6 +317,7 @@ impl Timeline {
caption: Option<String>,
formatted_caption: Option<FormattedBody>,
progress_watcher: Option<Box<dyn ProgressWatcher>>,
store_in_cache: bool,
) -> Arc<SendAttachmentJoinHandle> {
SendAttachmentJoinHandle::new(RUNTIME.spawn(async move {
let base_video_info: BaseVideoInfo = BaseVideoInfo::try_from(&video_info)
@@ -313,8 +329,14 @@ impl Timeline {
.caption(caption)
.formatted_caption(formatted_caption.map(Into::into));
self.send_attachment(url, video_info.mimetype, attachment_config, progress_watcher)
.await
self.send_attachment(
url,
video_info.mimetype,
attachment_config,
progress_watcher,
store_in_cache,
)
.await
}))
}
@@ -325,6 +347,7 @@ impl Timeline {
caption: Option<String>,
formatted_caption: Option<FormattedBody>,
progress_watcher: Option<Box<dyn ProgressWatcher>>,
store_in_cache: bool,
) -> Arc<SendAttachmentJoinHandle> {
SendAttachmentJoinHandle::new(RUNTIME.spawn(async move {
let base_audio_info: BaseAudioInfo = BaseAudioInfo::try_from(&audio_info)
@@ -336,11 +359,18 @@ impl Timeline {
.caption(caption)
.formatted_caption(formatted_caption.map(Into::into));
self.send_attachment(url, audio_info.mimetype, attachment_config, progress_watcher)
.await
self.send_attachment(
url,
audio_info.mimetype,
attachment_config,
progress_watcher,
store_in_cache,
)
.await
}))
}
#[allow(clippy::too_many_arguments)]
pub fn send_voice_message(
self: Arc<Self>,
url: String,
@@ -349,6 +379,7 @@ impl Timeline {
caption: Option<String>,
formatted_caption: Option<FormattedBody>,
progress_watcher: Option<Box<dyn ProgressWatcher>>,
store_in_cache: bool,
) -> Arc<SendAttachmentJoinHandle> {
SendAttachmentJoinHandle::new(RUNTIME.spawn(async move {
let base_audio_info: BaseAudioInfo = BaseAudioInfo::try_from(&audio_info)
@@ -361,8 +392,14 @@ impl Timeline {
.caption(caption)
.formatted_caption(formatted_caption.map(Into::into));
self.send_attachment(url, audio_info.mimetype, attachment_config, progress_watcher)
.await
self.send_attachment(
url,
audio_info.mimetype,
attachment_config,
progress_watcher,
store_in_cache,
)
.await
}))
}
@@ -371,6 +408,7 @@ impl Timeline {
url: String,
file_info: FileInfo,
progress_watcher: Option<Box<dyn ProgressWatcher>>,
store_in_cache: bool,
) -> Arc<SendAttachmentJoinHandle> {
SendAttachmentJoinHandle::new(RUNTIME.spawn(async move {
let base_file_info: BaseFileInfo =
@@ -379,7 +417,14 @@ impl Timeline {
let attachment_config = AttachmentConfig::new().info(attachment_info);
self.send_attachment(url, file_info.mimetype, attachment_config, progress_watcher).await
self.send_attachment(
url,
file_info.mimetype,
attachment_config,
progress_watcher,
store_in_cache,
)
.await
}))
}

View File

@@ -15,6 +15,7 @@ pub struct SendAttachment<'a> {
config: AttachmentConfig,
tracing_span: Span,
pub(crate) send_progress: SharedObservable<TransmissionProgress>,
store_in_cache: bool,
}
impl<'a> SendAttachment<'a> {
@@ -31,6 +32,7 @@ impl<'a> SendAttachment<'a> {
config,
tracing_span: Span::current(),
send_progress: Default::default(),
store_in_cache: false,
}
}
@@ -40,6 +42,14 @@ impl<'a> SendAttachment<'a> {
pub fn subscribe_to_send_progress(&self) -> Subscriber<TransmissionProgress> {
self.send_progress.subscribe()
}
/// Whether the sent attachment should be stored in the cache or not.
///
/// If set to true, then retrieving the data for the attachment will result
/// in a cache hit immediately after upload.
pub fn store_in_cache(&mut self) {
self.store_in_cache = true;
}
}
impl<'a> IntoFuture for SendAttachment<'a> {
@@ -47,7 +57,9 @@ impl<'a> IntoFuture for SendAttachment<'a> {
boxed_into_future!(extra_bounds: 'a);
fn into_future(self) -> Self::IntoFuture {
let Self { timeline, path, mime_type, config, tracing_span, send_progress } = self;
let Self { timeline, path, mime_type, config, tracing_span, send_progress, store_in_cache } =
self;
let fut = async move {
let filename = path
.file_name()
@@ -56,12 +68,16 @@ impl<'a> IntoFuture for SendAttachment<'a> {
.ok_or(Error::InvalidAttachmentFileName)?;
let data = fs::read(&path).map_err(|_| Error::InvalidAttachmentData)?;
timeline
let mut fut = timeline
.room()
.send_attachment(filename, &mime_type, data, config)
.with_send_progress_observable(send_progress)
.await
.map_err(|_| Error::FailedSendingAttachment)?;
.with_send_progress_observable(send_progress);
if store_in_cache {
fut = fut.store_in_cache();
}
fut.await.map_err(|_| Error::FailedSendingAttachment)?;
Ok(())
};

View File

@@ -246,6 +246,7 @@ pub struct SendAttachment<'a> {
config: AttachmentConfig,
tracing_span: Span,
send_progress: SharedObservable<TransmissionProgress>,
store_in_cache: bool,
}
impl<'a> SendAttachment<'a> {
@@ -264,6 +265,7 @@ impl<'a> SendAttachment<'a> {
config,
tracing_span: Span::current(),
send_progress: Default::default(),
store_in_cache: false,
}
}
@@ -277,6 +279,15 @@ impl<'a> SendAttachment<'a> {
self.send_progress = send_progress;
self
}
/// Whether the sent attachment should be stored in the cache or not.
///
/// If set to true, then retrieving the data for the attachment will result
/// in a cache hit immediately after upload.
pub fn store_in_cache(mut self) -> Self {
self.store_in_cache = true;
self
}
}
impl<'a> IntoFuture for SendAttachment<'a> {
@@ -284,10 +295,26 @@ impl<'a> IntoFuture for SendAttachment<'a> {
boxed_into_future!(extra_bounds: 'a);
fn into_future(self) -> Self::IntoFuture {
let Self { room, filename, content_type, data, config, tracing_span, send_progress } = self;
let Self {
room,
filename,
content_type,
data,
config,
tracing_span,
send_progress,
store_in_cache,
} = self;
let fut = async move {
room.prepare_and_send_attachment(filename, content_type, data, config, send_progress)
.await
room.prepare_and_send_attachment(
filename,
content_type,
data,
config,
send_progress,
store_in_cache,
)
.await
};
Box::pin(fut.instrument(tracing_span))

View File

@@ -1909,6 +1909,12 @@ impl Room {
/// media.
///
/// * `config` - Metadata and configuration for the attachment.
///
/// * `send_progress` - An observable to transmit forward progress about the
/// upload.
///
/// * `store_in_cache` - A boolean defining whether the uploaded media will
/// be stored in the cache immediately after a successful upload.
pub(super) async fn prepare_and_send_attachment<'a>(
&'a self,
filename: &'a str,
@@ -1916,19 +1922,22 @@ impl Room {
data: Vec<u8>,
mut config: AttachmentConfig,
send_progress: SharedObservable<TransmissionProgress>,
store_in_cache: bool,
) -> Result<send_message_event::v3::Response> {
self.ensure_room_joined()?;
let txn_id = config.txn_id.take();
let mentions = config.mentions.take();
let thumbnail = config.thumbnail.take();
#[cfg(feature = "e2e-encryption")]
let (media_source, thumbnail_source, thumbnail_info) = if self.is_encrypted().await? {
self.client
.upload_encrypted_media_and_thumbnail(
content_type,
data,
config.thumbnail.take(),
data.clone(),
thumbnail,
send_progress,
)
.await?
@@ -1937,8 +1946,8 @@ impl Room {
.media()
.upload_plain_media_and_thumbnail(
content_type,
data,
config.thumbnail.take(),
data.clone(),
thumbnail,
send_progress,
)
.await?
@@ -1948,14 +1957,19 @@ impl Room {
let (media_source, thumbnail_source, thumbnail_info) = self
.client
.media()
.upload_plain_media_and_thumbnail(
content_type,
data,
config.thumbnail.take(),
send_progress,
)
.upload_plain_media_and_thumbnail(content_type, data.clone(), thumbnail, send_progress)
.await?;
if store_in_cache {
let cache_store = self.client.event_cache_store();
let request = MediaRequest { source: media_source.clone(), format: MediaFormat::File };
// This shouldn't prevent the whole process from finishing properly.
if let Err(err) = cache_store.add_media_content(&request, data).await {
warn!("unable to cache the media after uploading it: {err}");
}
}
let msg_type = self.make_attachment_message(
content_type,
media_source,

View File

@@ -6,10 +6,15 @@ use matrix_sdk::{
Thumbnail,
},
config::SyncSettings,
media::{MediaFormat, MediaRequest},
test_utils::logged_in_client_with_server,
};
use matrix_sdk_test::{async_test, mocks::mock_encryption_state, test_json, DEFAULT_TEST_ROOM_ID};
use ruma::{event_id, events::Mentions, owned_user_id, uint};
use ruma::{
event_id,
events::{room::MediaSource, Mentions},
owned_mxc_uri, owned_user_id, uint,
};
use serde_json::json;
use wiremock::{
matchers::{body_partial_json, header, method, path, path_regex},
@@ -60,10 +65,29 @@ async fn test_room_attachment_send() {
b"Hello world".to_vec(),
AttachmentConfig::new(),
)
.store_in_cache()
.await
.unwrap();
assert_eq!(event_id!("$h29iv0s8:example.com"), response.event_id)
assert_eq!(event_id!("$h29iv0s8:example.com"), response.event_id);
// The media is immediately cached in the cache store, so we don't need to set
// up another mock endpoint for getting the media.
let reloaded = client
.media()
.get_media_content(
&MediaRequest {
source: MediaSource::Plain(owned_mxc_uri!(
"mxc://example.com/AQwafuaFswefuhsfAFAgsw"
)),
format: MediaFormat::File,
},
true,
)
.await
.unwrap();
assert_eq!(reloaded, b"Hello world");
}
#[async_test]