chore(sdk): Extract EventCache::thread_subscriber_task into tasks.rs.

This commit is contained in:
Ivan Enderlin
2026-02-27 14:16:34 +01:00
parent 2d13a682a2
commit 16e09dfefa
2 changed files with 285 additions and 263 deletions

View File

@@ -40,27 +40,22 @@ use matrix_sdk_base::{
ThreadingSupport,
cross_process_lock::CrossProcessLockError,
event_cache::store::{EventCacheStoreError, EventCacheStoreLock, EventCacheStoreLockState},
linked_chunk::{OwnedLinkedChunkId, lazy_loader::LazyLoaderError},
serde_helpers::extract_thread_root_from_content,
linked_chunk::lazy_loader::LazyLoaderError,
sync::RoomUpdates,
task_monitor::BackgroundTaskHandle,
timer,
};
use ruma::{OwnedEventId, OwnedRoomId, OwnedTransactionId, RoomId};
use tokio::{
select,
sync::{
Mutex, RwLock,
broadcast::{Receiver, Sender, channel, error::RecvError},
mpsc,
},
use ruma::{OwnedRoomId, RoomId};
use tokio::sync::{
Mutex, RwLock,
broadcast::{Receiver, Sender, channel, error::RecvError},
mpsc,
};
use tracing::{Instrument as _, Span, debug, error, info, info_span, instrument, trace, warn};
use crate::{
Client,
client::{ClientInner, WeakClient},
send_queue::{LocalEchoContent, RoomSendQueueUpdate, SendQueueUpdate},
};
mod caches;
@@ -68,6 +63,7 @@ mod deduplicator;
mod persistence;
#[cfg(feature = "e2e-encryption")]
mod redecryptor;
mod tasks;
use caches::room::{RoomEventCacheLinkedChunkUpdate, RoomEventCacheStateLock};
pub use caches::{
@@ -206,7 +202,7 @@ impl EventCache {
.task_monitor
.spawn_background_task(
"event_cache::thread_subscriber",
Self::thread_subscriber_task(
tasks::thread_subscriber_task(
weak_client.clone(),
linked_chunk_update_sender.clone(),
thread_subscriber_sender,
@@ -512,257 +508,6 @@ impl EventCache {
self.inner.generic_update_sender.subscribe()
}
/// React to a given linked chunk update by subscribing the user to a
/// thread, if needs be (when the user got mentioned in a thread reply, for
/// a thread they were not subscribed to).
///
/// Returns a boolean indicating whether the task should keep on running or
/// not.
#[instrument(skip(client, thread_subscriber_sender))]
async fn handle_thread_subscriber_linked_chunk_update(
client: &WeakClient,
thread_subscriber_sender: &Sender<()>,
up: RoomEventCacheLinkedChunkUpdate,
) -> bool {
let Some(client) = client.get() else {
// Client shutting down.
debug!("Client is shutting down, exiting thread subscriber task");
return false;
};
let OwnedLinkedChunkId::Thread(room_id, thread_root) = &up.linked_chunk_id else {
trace!("received an update for a non-thread linked chunk, ignoring");
return true;
};
let Some(room) = client.get_room(room_id) else {
warn!(%room_id, "unknown room");
return true;
};
let thread_root = thread_root.clone();
let mut new_events = up.events().peekable();
if new_events.peek().is_none() {
// No new events, nothing to do.
return true;
}
// This `PushContext` is going to be used to compute whether an in-thread event
// would trigger a mention.
//
// Of course, we're not interested in an in-thread event causing a mention,
// because it's part of a thread we've subscribed to. So the
// `PushContext` must not include the check for thread subscriptions (otherwise
// it would be impossible to subscribe to new threads).
let with_thread_subscriptions = false;
let Some(push_context) = room
.push_context_internal(with_thread_subscriptions)
.await
.inspect_err(|err| {
warn!("Failed to get push context for threads: {err}");
})
.ok()
.flatten()
else {
warn!("Missing push context for thread subscriptions.");
return true;
};
let mut subscribe_up_to = None;
// Find if there's an event that would trigger a mention for the current
// user, iterating from the end of the new events towards the oldest, so we can
// find the most recent event to subscribe to.
for ev in new_events.rev() {
if push_context
.for_event(ev.raw())
.await
.into_iter()
.any(|action| action.should_notify())
{
let Some(event_id) = ev.event_id() else {
// Shouldn't happen.
continue;
};
subscribe_up_to = Some(event_id);
break;
}
}
// And if we've found such a mention, subscribe to the thread up to this
// event.
if let Some(event_id) = subscribe_up_to {
trace!(thread = %thread_root, up_to = %event_id, "found a new thread to subscribe to");
if let Err(err) = room.subscribe_thread_if_needed(&thread_root, Some(event_id)).await {
warn!(%err, "Failed to subscribe to thread");
} else {
let _ = thread_subscriber_sender.send(());
}
}
true
}
/// React to a given send queue update by subscribing the user to a
/// thread, if needs be (when the user sent an event in a thread they were
/// not subscribed to).
///
/// Returns a boolean indicating whether the task should keep on running or
/// not.
#[instrument(skip(client, thread_subscriber_sender))]
async fn handle_thread_subscriber_send_queue_update(
client: &WeakClient,
thread_subscriber_sender: &Sender<()>,
events_being_sent: &mut HashMap<OwnedTransactionId, OwnedEventId>,
up: SendQueueUpdate,
) -> bool {
let Some(client) = client.get() else {
// Client shutting down.
debug!("Client is shutting down, exiting thread subscriber task");
return false;
};
let room_id = up.room_id;
let Some(room) = client.get_room(&room_id) else {
warn!(%room_id, "unknown room");
return true;
};
let (thread_root, subscribe_up_to) = match up.update {
RoomSendQueueUpdate::NewLocalEvent(local_echo) => {
match local_echo.content {
LocalEchoContent::Event { serialized_event, .. } => {
if let Some(thread_root) =
extract_thread_root_from_content(serialized_event.into_raw().0)
{
events_being_sent.insert(local_echo.transaction_id, thread_root);
}
}
LocalEchoContent::React { .. } => {
// Nothing to do, reactions don't count as a thread
// subscription.
}
}
return true;
}
RoomSendQueueUpdate::CancelledLocalEvent { transaction_id } => {
events_being_sent.remove(&transaction_id);
return true;
}
RoomSendQueueUpdate::ReplacedLocalEvent { transaction_id, new_content } => {
if let Some(thread_root) =
extract_thread_root_from_content(new_content.into_raw().0)
{
events_being_sent.insert(transaction_id, thread_root);
} else {
// It could be that the event isn't part of a thread anymore; handle that by
// removing the pending transaction id.
events_being_sent.remove(&transaction_id);
}
return true;
}
RoomSendQueueUpdate::SentEvent { transaction_id, event_id } => {
if let Some(thread_root) = events_being_sent.remove(&transaction_id) {
(thread_root, event_id)
} else {
// We don't know about the event that has been sent, so ignore it.
trace!(%transaction_id, "received a sent event that we didn't know about, ignoring");
return true;
}
}
RoomSendQueueUpdate::SendError { .. }
| RoomSendQueueUpdate::RetryEvent { .. }
| RoomSendQueueUpdate::MediaUpload { .. } => {
// Nothing to do for these bad boys.
return true;
}
};
// And if we've found such a mention, subscribe to the thread up to this event.
trace!(thread = %thread_root, up_to = %subscribe_up_to, "found a new thread to subscribe to");
if let Err(err) = room.subscribe_thread_if_needed(&thread_root, Some(subscribe_up_to)).await
{
warn!(%err, "Failed to subscribe to thread");
} else {
let _ = thread_subscriber_sender.send(());
}
true
}
#[instrument(skip_all)]
async fn thread_subscriber_task(
client: WeakClient,
linked_chunk_update_sender: Sender<RoomEventCacheLinkedChunkUpdate>,
thread_subscriber_sender: Sender<()>,
) {
let mut send_q_rx = if let Some(client) = client.get() {
if !client.enabled_thread_subscriptions() {
trace!("Thread subscriptions are not enabled, not spawning thread subscriber task");
return;
}
client.send_queue().subscribe()
} else {
trace!("Client is shutting down, not spawning thread subscriber task");
return;
};
let mut linked_chunk_rx = linked_chunk_update_sender.subscribe();
// A mapping of local echoes (events being sent), to their thread root, if
// they're in an in-thread reply.
//
// Entirely managed by `handle_thread_subscriber_send_queue_update`.
let mut events_being_sent = HashMap::new();
loop {
select! {
res = send_q_rx.recv() => {
match res {
Ok(up) => {
if !Self::handle_thread_subscriber_send_queue_update(&client, &thread_subscriber_sender, &mut events_being_sent, up).await {
break;
}
}
Err(RecvError::Closed) => {
debug!("Linked chunk update channel has been closed, exiting thread subscriber task");
break;
}
Err(RecvError::Lagged(num_skipped)) => {
warn!(num_skipped, "Lagged behind linked chunk updates");
}
}
}
res = linked_chunk_rx.recv() => {
match res {
Ok(up) => {
if !Self::handle_thread_subscriber_linked_chunk_update(&client, &thread_subscriber_sender, up).await {
break;
}
}
Err(RecvError::Closed) => {
debug!("Linked chunk update channel has been closed, exiting thread subscriber task");
break;
}
Err(RecvError::Lagged(num_skipped)) => {
warn!(num_skipped, "Lagged behind linked chunk updates");
}
}
}
}
}
}
/// Takes a [`TimelineEvent`] and passes it to the [`RoomIndex`] of the
/// given room which will add/remove/edit an event in the index based on
/// the event type.

View File

@@ -0,0 +1,277 @@
// Copyright 2026 The Matrix.org Foundation C.I.C.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use std::collections::HashMap;
use matrix_sdk_base::{
linked_chunk::OwnedLinkedChunkId, serde_helpers::extract_thread_root_from_content,
};
use ruma::{OwnedEventId, OwnedTransactionId};
use tokio::{
select,
sync::broadcast::{Sender, error::RecvError},
};
use tracing::{debug, instrument, trace, warn};
use super::RoomEventCacheLinkedChunkUpdate;
use crate::{
client::WeakClient,
send_queue::{LocalEchoContent, RoomSendQueueUpdate, SendQueueUpdate},
};
/// Handle [`SendQueueUpdate`] and [`RoomEventCacheLinkedChunkUpdate`] to update
/// the threads, for a thread the user was not subscribed to.
#[instrument(skip_all)]
pub(super) async fn thread_subscriber_task(
client: WeakClient,
linked_chunk_update_sender: Sender<RoomEventCacheLinkedChunkUpdate>,
thread_subscriber_sender: Sender<()>,
) {
let mut send_q_rx = if let Some(client) = client.get() {
if !client.enabled_thread_subscriptions() {
trace!("Thread subscriptions are not enabled, not spawning thread subscriber task");
return;
}
client.send_queue().subscribe()
} else {
trace!("Client is shutting down, not spawning thread subscriber task");
return;
};
let mut linked_chunk_rx = linked_chunk_update_sender.subscribe();
// A mapping of local echoes (events being sent), to their thread root, if
// they're in an in-thread reply.
//
// Entirely managed by `handle_thread_subscriber_send_queue_update`.
let mut events_being_sent = HashMap::new();
loop {
select! {
res = send_q_rx.recv() => {
match res {
Ok(up) => {
if !handle_thread_subscriber_send_queue_update(&client, &thread_subscriber_sender, &mut events_being_sent, up).await {
break;
}
}
Err(RecvError::Closed) => {
debug!("Linked chunk update channel has been closed, exiting thread subscriber task");
break;
}
Err(RecvError::Lagged(num_skipped)) => {
warn!(num_skipped, "Lagged behind linked chunk updates");
}
}
}
res = linked_chunk_rx.recv() => {
match res {
Ok(up) => {
if !handle_thread_subscriber_linked_chunk_update(&client, &thread_subscriber_sender, up).await {
break;
}
}
Err(RecvError::Closed) => {
debug!("Linked chunk update channel has been closed, exiting thread subscriber task");
break;
}
Err(RecvError::Lagged(num_skipped)) => {
warn!(num_skipped, "Lagged behind linked chunk updates");
}
}
}
}
}
}
/// React to a given send queue update by subscribing the user to a
/// thread, if needs be (when the user sent an event in a thread they were
/// not subscribed to).
///
/// Returns a boolean indicating whether the task should keep on running or
/// not.
#[instrument(skip(client, thread_subscriber_sender))]
async fn handle_thread_subscriber_send_queue_update(
client: &WeakClient,
thread_subscriber_sender: &Sender<()>,
events_being_sent: &mut HashMap<OwnedTransactionId, OwnedEventId>,
up: SendQueueUpdate,
) -> bool {
let Some(client) = client.get() else {
// Client shutting down.
debug!("Client is shutting down, exiting thread subscriber task");
return false;
};
let room_id = up.room_id;
let Some(room) = client.get_room(&room_id) else {
warn!(%room_id, "unknown room");
return true;
};
let (thread_root, subscribe_up_to) = match up.update {
RoomSendQueueUpdate::NewLocalEvent(local_echo) => {
match local_echo.content {
LocalEchoContent::Event { serialized_event, .. } => {
if let Some(thread_root) =
extract_thread_root_from_content(serialized_event.into_raw().0)
{
events_being_sent.insert(local_echo.transaction_id, thread_root);
}
}
LocalEchoContent::React { .. } => {
// Nothing to do, reactions don't count as a thread
// subscription.
}
}
return true;
}
RoomSendQueueUpdate::CancelledLocalEvent { transaction_id } => {
events_being_sent.remove(&transaction_id);
return true;
}
RoomSendQueueUpdate::ReplacedLocalEvent { transaction_id, new_content } => {
if let Some(thread_root) = extract_thread_root_from_content(new_content.into_raw().0) {
events_being_sent.insert(transaction_id, thread_root);
} else {
// It could be that the event isn't part of a thread anymore; handle that by
// removing the pending transaction id.
events_being_sent.remove(&transaction_id);
}
return true;
}
RoomSendQueueUpdate::SentEvent { transaction_id, event_id } => {
if let Some(thread_root) = events_being_sent.remove(&transaction_id) {
(thread_root, event_id)
} else {
// We don't know about the event that has been sent, so ignore it.
trace!(%transaction_id, "received a sent event that we didn't know about, ignoring");
return true;
}
}
RoomSendQueueUpdate::SendError { .. }
| RoomSendQueueUpdate::RetryEvent { .. }
| RoomSendQueueUpdate::MediaUpload { .. } => {
// Nothing to do for these bad boys.
return true;
}
};
// And if we've found such a mention, subscribe to the thread up to this event.
trace!(thread = %thread_root, up_to = %subscribe_up_to, "found a new thread to subscribe to");
if let Err(err) = room.subscribe_thread_if_needed(&thread_root, Some(subscribe_up_to)).await {
warn!(%err, "Failed to subscribe to thread");
} else {
let _ = thread_subscriber_sender.send(());
}
true
}
/// React to a given linked chunk update by subscribing the user to a
/// thread, if needs be (when the user got mentioned in a thread reply, for
/// a thread they were not subscribed to).
///
/// Returns a boolean indicating whether the task should keep on running or
/// not.
#[instrument(skip(client, thread_subscriber_sender))]
async fn handle_thread_subscriber_linked_chunk_update(
client: &WeakClient,
thread_subscriber_sender: &Sender<()>,
up: RoomEventCacheLinkedChunkUpdate,
) -> bool {
let Some(client) = client.get() else {
// Client shutting down.
debug!("Client is shutting down, exiting thread subscriber task");
return false;
};
let OwnedLinkedChunkId::Thread(room_id, thread_root) = &up.linked_chunk_id else {
trace!("received an update for a non-thread linked chunk, ignoring");
return true;
};
let Some(room) = client.get_room(room_id) else {
warn!(%room_id, "unknown room");
return true;
};
let thread_root = thread_root.clone();
let mut new_events = up.events().peekable();
if new_events.peek().is_none() {
// No new events, nothing to do.
return true;
}
// This `PushContext` is going to be used to compute whether an in-thread event
// would trigger a mention.
//
// Of course, we're not interested in an in-thread event causing a mention,
// because it's part of a thread we've subscribed to. So the
// `PushContext` must not include the check for thread subscriptions (otherwise
// it would be impossible to subscribe to new threads).
let with_thread_subscriptions = false;
let Some(push_context) = room
.push_context_internal(with_thread_subscriptions)
.await
.inspect_err(|err| {
warn!("Failed to get push context for threads: {err}");
})
.ok()
.flatten()
else {
warn!("Missing push context for thread subscriptions.");
return true;
};
let mut subscribe_up_to = None;
// Find if there's an event that would trigger a mention for the current
// user, iterating from the end of the new events towards the oldest, so we can
// find the most recent event to subscribe to.
for ev in new_events.rev() {
if push_context.for_event(ev.raw()).await.into_iter().any(|action| action.should_notify()) {
let Some(event_id) = ev.event_id() else {
// Shouldn't happen.
continue;
};
subscribe_up_to = Some(event_id);
break;
}
}
// And if we've found such a mention, subscribe to the thread up to this
// event.
if let Some(event_id) = subscribe_up_to {
trace!(thread = %thread_root, up_to = %event_id, "found a new thread to subscribe to");
if let Err(err) = room.subscribe_thread_if_needed(&thread_root, Some(event_id)).await {
warn!(%err, "Failed to subscribe to thread");
} else {
let _ = thread_subscriber_sender.send(());
}
}
true
}