timeline: move the handle_local_echo and handling of RoomSendQueueUpdate to TimelineInner

This will make testing easier.
This commit is contained in:
Benjamin Bouvier
2024-07-10 14:13:24 +02:00
parent afa2e8063d
commit f96f55ccd9
2 changed files with 83 additions and 98 deletions

View File

@@ -18,7 +18,6 @@ use futures_util::{pin_mut, StreamExt};
use matrix_sdk::{
event_cache::{EventsOrigin, RoomEventCacheUpdate},
executor::spawn,
send_queue::{LocalEcho, RoomSendQueueUpdate},
Room,
};
use ruma::{events::AnySyncTimelineEvent, RoomVersionId};
@@ -32,10 +31,7 @@ use super::{
Error, Timeline, TimelineDropHandle, TimelineFocus,
};
use crate::{
timeline::{
event_handler::TimelineEventKind, event_item::RemoteEventOrigin, inner::TimelineEnd,
EventSendState,
},
timeline::{event_item::RemoteEventOrigin, inner::TimelineEnd},
unable_to_decrypt_hook::UtdHookManager,
};
@@ -273,7 +269,7 @@ impl TimelineBuilder {
Some(spawn({
// Handles existing local echoes first.
for echo in local_echoes {
handle_local_echo(echo, &timeline).await;
timeline.handle_local_echo(echo).await;
}
let span = info_span!(parent: Span::none(), "local_echo_handler", room_id = ?room.room_id());
@@ -285,59 +281,7 @@ impl TimelineBuilder {
loop {
match listener.recv().await {
Ok(update) => match update {
RoomSendQueueUpdate::NewLocalEvent(echo) => {
handle_local_echo(echo, &timeline).await;
}
RoomSendQueueUpdate::CancelledLocalEvent { transaction_id } => {
if !timeline.discard_local_echo(&transaction_id).await {
warn!("couldn't find the local echo to discard");
}
}
RoomSendQueueUpdate::ReplacedLocalEvent {
transaction_id,
new_content,
} => {
let content = match new_content.deserialize() {
Ok(d) => d,
Err(err) => {
warn!(
"error deserializing local echo (upon edit): {err}"
);
return;
}
};
if !timeline.replace_local_echo(&transaction_id, content).await
{
warn!("couldn't find the local echo to replace");
}
}
RoomSendQueueUpdate::SendError {
transaction_id,
error,
is_recoverable,
} => {
timeline
.update_event_send_state(
&transaction_id,
EventSendState::SendingFailed { error, is_recoverable },
)
.await;
}
RoomSendQueueUpdate::SentEvent { transaction_id, event_id } => {
timeline
.update_event_send_state(
&transaction_id,
EventSendState::Sent { event_id },
)
.await;
}
},
Ok(update) => timeline.handle_room_send_queue_update(update).await,
Err(RecvError::Lagged(num_missed)) => {
warn!("missed {num_missed} local echoes, ignoring those missed");
@@ -428,41 +372,3 @@ impl TimelineBuilder {
Ok(timeline)
}
}
#[derive(Debug, Error)]
#[error("local echo failed to send in a previous session")]
struct MissingLocalEchoFailError;
async fn handle_local_echo(echo: LocalEcho, timeline: &TimelineInner) {
let content = match echo.serialized_event.deserialize() {
Ok(d) => d,
Err(err) => {
warn!("error deserializing local echo: {err}");
return;
}
};
timeline
.handle_local_event(
echo.transaction_id.clone(),
TimelineEventKind::Message { content, relations: Default::default() },
Some(echo.send_handle),
)
.await;
if echo.is_wedged {
timeline
.update_event_send_state(
&echo.transaction_id,
EventSendState::SendingFailed {
// Put a dummy error in this case, since we're not persisting the errors that
// occurred in previous sessions.
error: Arc::new(matrix_sdk::Error::UnknownError(Box::new(
MissingLocalEchoFailError,
))),
is_recoverable: false,
},
)
.await;
}
}

View File

@@ -27,7 +27,7 @@ use matrix_sdk::crypto::OlmMachine;
use matrix_sdk::{
deserialized_responses::SyncTimelineEvent,
event_cache::{paginator::Paginator, RoomEventCache},
send_queue::SendHandle,
send_queue::{LocalEcho, RoomSendQueueUpdate, SendHandle},
Result, Room,
};
#[cfg(test)]
@@ -1132,6 +1132,81 @@ impl<P: RoomDataProvider> TimelineInner<P> {
) -> Option<OwnedEventId> {
self.state.read().await.latest_user_read_receipt_timeline_event_id(user_id)
}
/// Handle a room send update that's a new local echo.
pub(crate) async fn handle_local_echo(&self, echo: LocalEcho) {
let content = match echo.serialized_event.deserialize() {
Ok(d) => d,
Err(err) => {
warn!("error deserializing local echo: {err}");
return;
}
};
self.handle_local_event(
echo.transaction_id.clone(),
TimelineEventKind::Message { content, relations: Default::default() },
Some(echo.send_handle),
)
.await;
if echo.is_wedged {
self.update_event_send_state(
&echo.transaction_id,
EventSendState::SendingFailed {
// Put a dummy error in this case, since we're not persisting the errors that
// occurred in previous sessions.
error: Arc::new(matrix_sdk::Error::UnknownError(Box::new(
MissingLocalEchoFailError,
))),
is_recoverable: false,
},
)
.await;
}
}
/// Handle a single room send queue update.
pub(crate) async fn handle_room_send_queue_update(&self, update: RoomSendQueueUpdate) {
match update {
RoomSendQueueUpdate::NewLocalEvent(echo) => {
self.handle_local_echo(echo).await;
}
RoomSendQueueUpdate::CancelledLocalEvent { transaction_id } => {
if !self.discard_local_echo(&transaction_id).await {
warn!("couldn't find the local echo to discard");
}
}
RoomSendQueueUpdate::ReplacedLocalEvent { transaction_id, new_content } => {
let content = match new_content.deserialize() {
Ok(d) => d,
Err(err) => {
warn!("error deserializing local echo (upon edit): {err}");
return;
}
};
if !self.replace_local_echo(&transaction_id, content).await {
warn!("couldn't find the local echo to replace");
}
}
RoomSendQueueUpdate::SendError { transaction_id, error, is_recoverable } => {
self.update_event_send_state(
&transaction_id,
EventSendState::SendingFailed { error, is_recoverable },
)
.await;
}
RoomSendQueueUpdate::SentEvent { transaction_id, event_id } => {
self.update_event_send_state(&transaction_id, EventSendState::Sent { event_id })
.await;
}
}
}
}
impl TimelineInner {
@@ -1283,6 +1358,10 @@ impl TimelineInner {
}
}
#[derive(Debug, Error)]
#[error("local echo failed to send in a previous session")]
struct MissingLocalEchoFailError;
#[derive(Debug, Default)]
pub(super) struct HandleManyEventsResult {
/// The number of items that were added to the timeline.