mirror of
https://github.com/matrix-org/matrix-rust-sdk.git
synced 2026-05-18 13:40:55 -04:00
event graph: add an initial implementation of the event graph
This is mostly a demonstration of how to plug this with the timeline, and how little it changes as a result. Remove read receipts
This commit is contained in:
@@ -473,6 +473,7 @@ impl RoomListItem {
|
||||
}
|
||||
|
||||
/// Initializes the timeline for this room using the provided parameters.
|
||||
///
|
||||
/// * `event_type_filter` - An optional [`TimelineEventTypeFilter`] to be
|
||||
/// used to filter timeline events besides the default timeline filter. If
|
||||
/// `None` is passed, only the default timeline filter will be used.
|
||||
@@ -480,7 +481,7 @@ impl RoomListItem {
|
||||
&self,
|
||||
event_type_filter: Option<Arc<TimelineEventTypeFilter>>,
|
||||
) -> Result<(), RoomListError> {
|
||||
let mut timeline_builder = self.inner.default_room_timeline_builder();
|
||||
let mut timeline_builder = self.inner.default_room_timeline_builder().await;
|
||||
if let Some(event_type_filter) = event_type_filter {
|
||||
timeline_builder = timeline_builder.event_filter(move |event, room_version_id| {
|
||||
// Always perform the default filter first
|
||||
|
||||
377
crates/matrix-sdk-ui/src/event_graph.rs
Normal file
377
crates/matrix-sdk-ui/src/event_graph.rs
Normal file
@@ -0,0 +1,377 @@
|
||||
// Copyright 2024 The Matrix.org Foundation C.I.C.
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
//! The event graph is an abstraction layer, sitting between the Rust SDK and a
|
||||
//! final client, that acts as a global observer of all the rooms, gathering and
|
||||
//! inferring some extra useful information about each room. In particular, this
|
||||
//! doesn't require subscribing to a specific room to get access to this
|
||||
//! information.
|
||||
//!
|
||||
//! It's intended to be fast, robust and easy to maintain.
|
||||
//!
|
||||
//! See the [github issue](https://github.com/matrix-org/matrix-rust-sdk/issues/3058) for more details about the historical reasons that led us to start writing this.
|
||||
//!
|
||||
//! Most of it is still a work-in-progress, as of 2024-01-22.
|
||||
//!
|
||||
//! The desired set of features it may eventually implement is the following:
|
||||
//!
|
||||
//! - [ ] compute proper unread room counts, and use backpagination to get
|
||||
//! missing messages/notifications/mentions, if needs be.
|
||||
//! - [ ] expose that information with a new data structure similar to the
|
||||
//! `RoomInfo`, and that may update a `RoomListService`.
|
||||
//! - [ ] provide read receipts for each message.
|
||||
//! - [ ] backwards and forward pagination, and reconcile results with cached
|
||||
//! timelines.
|
||||
//! - [ ] retry decryption upon receiving new keys (from an encryption sync
|
||||
//! service or from a key backup).
|
||||
//! - [ ] expose the latest event for a given room.
|
||||
//! - [ ] caching of events on-disk.
|
||||
|
||||
#![forbid(missing_docs)]
|
||||
|
||||
use std::{collections::BTreeMap, fmt::Debug, sync::Arc};
|
||||
|
||||
use async_trait::async_trait;
|
||||
use matrix_sdk::{sync::RoomUpdate, Client, Room};
|
||||
use matrix_sdk_base::{
|
||||
deserialized_responses::SyncTimelineEvent,
|
||||
sync::{JoinedRoom, LeftRoom, Timeline},
|
||||
};
|
||||
use ruma::{
|
||||
events::{AnyRoomAccountDataEvent, AnySyncEphemeralRoomEvent},
|
||||
serde::Raw,
|
||||
OwnedRoomId, RoomId,
|
||||
};
|
||||
use tokio::{
|
||||
spawn,
|
||||
sync::{
|
||||
broadcast::{error::RecvError, Receiver, Sender},
|
||||
RwLock,
|
||||
},
|
||||
task::JoinHandle,
|
||||
};
|
||||
use tracing::{debug, error, trace};
|
||||
|
||||
/// An error observed in the `EventGraph`.
|
||||
#[derive(thiserror::Error, Debug)]
|
||||
pub enum EventGraphError {
|
||||
/// A room hasn't been found, when trying to create a graph view for that
|
||||
/// room.
|
||||
#[error("Room with id {0} not found")]
|
||||
RoomNotFound(OwnedRoomId),
|
||||
}
|
||||
|
||||
/// A result using the [`EventGraphError`].
|
||||
pub type Result<T> = std::result::Result<T, EventGraphError>;
|
||||
|
||||
/// Hold handles to the tasks spawn by a [`RoomEventGraph`].
|
||||
struct RoomGraphDropHandles {
|
||||
listen_updates_task: JoinHandle<()>,
|
||||
}
|
||||
|
||||
impl Drop for RoomGraphDropHandles {
|
||||
fn drop(&mut self) {
|
||||
self.listen_updates_task.abort();
|
||||
}
|
||||
}
|
||||
|
||||
/// An event graph, providing lots of useful functionality for clients.
|
||||
///
|
||||
/// See also the module-level comment.
|
||||
pub struct EventGraph {
|
||||
/// Reference to the client used to navigate this graph.
|
||||
client: Client,
|
||||
/// Lazily-filled cache of live [`RoomEventGraph`], once per room.
|
||||
by_room: BTreeMap<OwnedRoomId, RoomEventGraph>,
|
||||
/// Backend used for storage.
|
||||
store: Arc<dyn EventGraphStore>,
|
||||
}
|
||||
|
||||
impl Debug for EventGraph {
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||
f.debug_struct("EventGraph").finish_non_exhaustive()
|
||||
}
|
||||
}
|
||||
|
||||
impl EventGraph {
|
||||
/// Create a new [`EventGraph`] for the given client.
|
||||
pub fn new(client: Client) -> Self {
|
||||
let store = Arc::new(MemoryStore::new());
|
||||
Self { client, by_room: Default::default(), store }
|
||||
}
|
||||
|
||||
/// Return a room-specific view over the [`EventGraph`].
|
||||
///
|
||||
/// It may not be found, if the room isn't known to the client.
|
||||
pub fn for_room(&mut self, room_id: &RoomId) -> Option<RoomEventGraph> {
|
||||
match self.by_room.get(room_id) {
|
||||
Some(room) => Some(room.clone()),
|
||||
None => {
|
||||
let room = self.client.get_room(room_id)?;
|
||||
let room_event_graph = RoomEventGraph::new(room, self.store.clone());
|
||||
self.by_room.insert(room_id.to_owned(), room_event_graph.clone());
|
||||
Some(room_event_graph)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Add an initial set of events to the event graph, reloaded from a cache.
|
||||
///
|
||||
/// TODO: temporary for API compat, as the event graph should take care of
|
||||
/// its own cache.
|
||||
pub async fn add_initial_events(
|
||||
&mut self,
|
||||
room_id: &RoomId,
|
||||
events: Vec<SyncTimelineEvent>,
|
||||
) -> Result<()> {
|
||||
let room_graph = self
|
||||
.for_room(room_id)
|
||||
.ok_or_else(|| EventGraphError::RoomNotFound(room_id.to_owned()))?;
|
||||
room_graph.inner.append_events(events).await?;
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
/// A store that can be remember information about the event graph.
|
||||
///
|
||||
/// It really acts as a cache, in the sense that clearing the backing data
|
||||
/// should not have any irremediable effect, other than providing a lesser user
|
||||
/// experience.
|
||||
#[async_trait]
|
||||
pub trait EventGraphStore: Send + Sync {
|
||||
/// Returns all the known events for the given room.
|
||||
async fn room_events(&self, room: &RoomId) -> Result<Vec<SyncTimelineEvent>>;
|
||||
|
||||
/// Adds all the events to the given room.
|
||||
async fn add_room_events(&self, room: &RoomId, events: Vec<SyncTimelineEvent>) -> Result<()>;
|
||||
|
||||
/// Clear all the events from the given room.
|
||||
async fn clear_room_events(&self, room: &RoomId) -> Result<()>;
|
||||
}
|
||||
|
||||
struct MemoryStore {
|
||||
/// All the events per room, in sync order.
|
||||
by_room: RwLock<BTreeMap<OwnedRoomId, Vec<SyncTimelineEvent>>>,
|
||||
}
|
||||
|
||||
impl MemoryStore {
|
||||
fn new() -> Self {
|
||||
Self { by_room: Default::default() }
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
impl EventGraphStore for MemoryStore {
|
||||
async fn room_events(&self, room: &RoomId) -> Result<Vec<SyncTimelineEvent>> {
|
||||
Ok(self.by_room.read().await.get(room).cloned().unwrap_or_default())
|
||||
}
|
||||
|
||||
async fn add_room_events(&self, room: &RoomId, events: Vec<SyncTimelineEvent>) -> Result<()> {
|
||||
self.by_room.write().await.entry(room.to_owned()).or_default().extend(events);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn clear_room_events(&self, room: &RoomId) -> Result<()> {
|
||||
let _ = self.by_room.write().await.remove(room);
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
/// A subset of an event graph, for a room.
|
||||
///
|
||||
/// Cloning is shallow, and thus is cheap to do.
|
||||
#[derive(Clone)]
|
||||
pub struct RoomEventGraph {
|
||||
inner: Arc<RoomEventGraphInner>,
|
||||
|
||||
_drop_handles: Arc<RoomGraphDropHandles>,
|
||||
}
|
||||
|
||||
impl Debug for RoomEventGraph {
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||
f.debug_struct("RoomEventGraph").finish_non_exhaustive()
|
||||
}
|
||||
}
|
||||
|
||||
impl RoomEventGraph {
|
||||
/// Create a new [`RoomEventGraph`] using the given room and store.
|
||||
fn new(room: Room, store: Arc<dyn EventGraphStore>) -> Self {
|
||||
let (inner, drop_handles) = RoomEventGraphInner::new(room, store);
|
||||
Self { inner, _drop_handles: drop_handles }
|
||||
}
|
||||
|
||||
/// Subscribe to room updates for this room, after getting the initial list
|
||||
/// of events. XXX: Could/should it use some kind of `Observable`
|
||||
/// instead? Or not something async, like explicit handlers as our event
|
||||
/// handlers?
|
||||
pub async fn subscribe(
|
||||
&self,
|
||||
) -> Result<(Vec<SyncTimelineEvent>, Receiver<RoomEventGraphUpdate>)> {
|
||||
Ok((
|
||||
self.inner.store.room_events(self.inner.room.room_id()).await?,
|
||||
self.inner.sender.subscribe(),
|
||||
))
|
||||
}
|
||||
}
|
||||
|
||||
struct RoomEventGraphInner {
|
||||
sender: Sender<RoomEventGraphUpdate>,
|
||||
store: Arc<dyn EventGraphStore>,
|
||||
room: Room,
|
||||
}
|
||||
|
||||
impl RoomEventGraphInner {
|
||||
/// Creates a new graph for a room, and subscribes to room updates., so as
|
||||
/// to handle new timeline events.
|
||||
fn new(room: Room, store: Arc<dyn EventGraphStore>) -> (Arc<Self>, Arc<RoomGraphDropHandles>) {
|
||||
let sender = Sender::new(32);
|
||||
|
||||
let room_graph = Arc::new(Self { room, store, sender });
|
||||
|
||||
let listen_updates_task = spawn(Self::listen_task(room_graph.clone()));
|
||||
|
||||
(room_graph, Arc::new(RoomGraphDropHandles { listen_updates_task }))
|
||||
}
|
||||
|
||||
async fn handle_joined_room_update(&self, updates: JoinedRoom) -> Result<()> {
|
||||
self.handle_timeline(updates.timeline, updates.ephemeral.clone(), updates.account_data)
|
||||
.await?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn handle_timeline(
|
||||
&self,
|
||||
timeline: Timeline,
|
||||
ephemeral: Vec<Raw<AnySyncEphemeralRoomEvent>>,
|
||||
account_data: Vec<Raw<AnyRoomAccountDataEvent>>,
|
||||
) -> Result<()> {
|
||||
let room_id = self.room.room_id();
|
||||
|
||||
if timeline.limited {
|
||||
// Ideally we'd try to reconcile existing events against those received in the
|
||||
// timeline, but we're not there yet. In the meanwhile, clear the
|
||||
// items from the room. TODO: implement Smart Matching™.
|
||||
trace!("limited timeline, clearing all previous events");
|
||||
self.store.clear_room_events(room_id).await?;
|
||||
let _ = self.sender.send(RoomEventGraphUpdate::Clear);
|
||||
}
|
||||
|
||||
// Add all the events to the backend.
|
||||
trace!("adding new events");
|
||||
self.store.add_room_events(room_id, timeline.events.clone()).await?;
|
||||
|
||||
// Propagate events to observers.
|
||||
let _ = self.sender.send(RoomEventGraphUpdate::Append {
|
||||
events: timeline.events,
|
||||
prev_batch: timeline.prev_batch,
|
||||
ephemeral,
|
||||
account_data,
|
||||
});
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn handle_left_room_update(&self, updates: LeftRoom) -> Result<()> {
|
||||
self.handle_timeline(updates.timeline, Vec::new(), Vec::new()).await?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn listen_task(this: Arc<Self>) {
|
||||
// TODO for prototyping, i'm spawning a new task to get the room updates.
|
||||
// Ideally we'd have something like the whole sync update, a generalisation of
|
||||
// the room update.
|
||||
trace!("Spawning the listen task");
|
||||
|
||||
let mut update_receiver = this.room.client().subscribe_to_room_updates(this.room.room_id());
|
||||
|
||||
loop {
|
||||
match update_receiver.recv().await {
|
||||
Ok(update) => {
|
||||
trace!("Listen task received an update");
|
||||
|
||||
match update {
|
||||
RoomUpdate::Left { updates, .. } => {
|
||||
if let Err(err) = this.handle_left_room_update(updates).await {
|
||||
error!("handling left room update: {err}");
|
||||
}
|
||||
}
|
||||
RoomUpdate::Joined { updates, .. } => {
|
||||
if let Err(err) = this.handle_joined_room_update(updates).await {
|
||||
error!("handling joined room update: {err}");
|
||||
}
|
||||
}
|
||||
RoomUpdate::Invited { .. } => {
|
||||
// We don't do anything for invited rooms at this
|
||||
// point. TODO should
|
||||
// we?
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Err(RecvError::Closed) => {
|
||||
// The loop terminated successfully.
|
||||
debug!("Listen task closed");
|
||||
break;
|
||||
}
|
||||
|
||||
Err(RecvError::Lagged(_)) => {
|
||||
// Since we've lagged behind updates to this room, we might be out of
|
||||
// sync with the events, leading to potentially lost events. Play it
|
||||
// safe here, and clear the cache. It's fine because we can retrigger
|
||||
// backpagination from the last event at any time, if needs be.
|
||||
debug!("Listen task lagged, clearing room");
|
||||
if let Err(err) = this.store.clear_room_events(this.room.room_id()).await {
|
||||
error!("unable to clear room after room updates lag: {err}");
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Append a set of events to the room graph and storage, notifying
|
||||
/// observers.
|
||||
async fn append_events(&self, events: Vec<SyncTimelineEvent>) -> Result<()> {
|
||||
self.store.add_room_events(self.room.room_id(), events.clone()).await?;
|
||||
|
||||
let _ = self.sender.send(RoomEventGraphUpdate::Append {
|
||||
events,
|
||||
prev_batch: None,
|
||||
account_data: Default::default(),
|
||||
ephemeral: Default::default(),
|
||||
});
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
/// An update related to events happened in a room.
|
||||
#[derive(Clone)]
|
||||
pub enum RoomEventGraphUpdate {
|
||||
/// The room has been cleared from events.
|
||||
Clear,
|
||||
/// The room has new events.
|
||||
Append {
|
||||
/// All the new events that have been added to the room.
|
||||
events: Vec<SyncTimelineEvent>,
|
||||
/// XXX: this is temporary, until backpagination lives in the event
|
||||
/// graph.
|
||||
prev_batch: Option<String>,
|
||||
/// XXX: this is temporary, until account data lives in the event graph
|
||||
/// — or will it live there?
|
||||
account_data: Vec<Raw<AnyRoomAccountDataEvent>>,
|
||||
/// XXX: this is temporary, until read receipts are handled in the event
|
||||
/// graph
|
||||
ephemeral: Vec<Raw<AnySyncEphemeralRoomEvent>>,
|
||||
},
|
||||
}
|
||||
@@ -17,6 +17,7 @@ use ruma::html::HtmlSanitizerMode;
|
||||
mod events;
|
||||
|
||||
pub mod encryption_sync_service;
|
||||
pub mod event_graph;
|
||||
pub mod notification_client;
|
||||
pub mod room_list_service;
|
||||
pub mod sync_service;
|
||||
|
||||
@@ -174,12 +174,13 @@ impl Room {
|
||||
}
|
||||
|
||||
/// Create a new [`TimelineBuilder`] with the default configuration.
|
||||
pub fn default_room_timeline_builder(&self) -> TimelineBuilder {
|
||||
pub async fn default_room_timeline_builder(&self) -> TimelineBuilder {
|
||||
Timeline::builder(&self.inner.room)
|
||||
.events(
|
||||
self.inner.sliding_sync_room.prev_batch(),
|
||||
self.inner.sliding_sync_room.timeline_queue(),
|
||||
)
|
||||
.await
|
||||
.track_read_marker_and_receipts()
|
||||
}
|
||||
}
|
||||
|
||||
@@ -17,9 +17,8 @@ use std::{collections::BTreeSet, sync::Arc};
|
||||
use eyeball::SharedObservable;
|
||||
use futures_util::{pin_mut, StreamExt};
|
||||
use imbl::Vector;
|
||||
use matrix_sdk::{
|
||||
deserialized_responses::SyncTimelineEvent, executor::spawn, sync::RoomUpdate, Room,
|
||||
};
|
||||
use matrix_sdk::{deserialized_responses::SyncTimelineEvent, executor::spawn, Room};
|
||||
use matrix_sdk_base::sync::JoinedRoom;
|
||||
use ruma::{
|
||||
events::{receipt::ReceiptType, AnySyncTimelineEvent},
|
||||
RoomVersionId,
|
||||
@@ -34,6 +33,7 @@ use super::{
|
||||
queue::send_queued_messages,
|
||||
BackPaginationStatus, Timeline, TimelineDropHandle,
|
||||
};
|
||||
use crate::event_graph::{EventGraph, RoomEventGraphUpdate};
|
||||
|
||||
/// Builder that allows creating and configuring various parts of a
|
||||
/// [`Timeline`].
|
||||
@@ -42,8 +42,8 @@ use super::{
|
||||
pub struct TimelineBuilder {
|
||||
room: Room,
|
||||
prev_token: Option<String>,
|
||||
events: Vector<SyncTimelineEvent>,
|
||||
settings: TimelineInnerSettings,
|
||||
event_graph: EventGraph,
|
||||
}
|
||||
|
||||
impl TimelineBuilder {
|
||||
@@ -51,19 +51,25 @@ impl TimelineBuilder {
|
||||
Self {
|
||||
room: room.clone(),
|
||||
prev_token: None,
|
||||
events: Vector::new(),
|
||||
settings: TimelineInnerSettings::default(),
|
||||
event_graph: EventGraph::new(room.client()),
|
||||
}
|
||||
}
|
||||
|
||||
/// Add initial events to the timeline.
|
||||
pub(crate) fn events(
|
||||
/// TODO: remove this, the EventGraph should hold the events data in the
|
||||
/// first place, and we'd provide an existing EventGraph to the
|
||||
/// TimelineBuilder.
|
||||
pub(crate) async fn events(
|
||||
mut self,
|
||||
prev_token: Option<String>,
|
||||
events: Vector<SyncTimelineEvent>,
|
||||
) -> Self {
|
||||
self.prev_token = prev_token;
|
||||
self.events = events;
|
||||
self.event_graph
|
||||
.add_initial_events(self.room.room_id(), events.iter().cloned().collect())
|
||||
.await
|
||||
.expect("room exists");
|
||||
self
|
||||
}
|
||||
|
||||
@@ -120,13 +126,19 @@ impl TimelineBuilder {
|
||||
skip(self),
|
||||
fields(
|
||||
room_id = ?self.room.room_id(),
|
||||
events_length = self.events.len(),
|
||||
track_read_receipts = self.settings.track_read_receipts,
|
||||
prev_token = self.prev_token,
|
||||
)
|
||||
)]
|
||||
pub async fn build(self) -> Timeline {
|
||||
let Self { room, prev_token, events, settings } = self;
|
||||
let Self { room, mut event_graph, prev_token, settings } = self;
|
||||
|
||||
let room_event_graph = event_graph.for_room(room.room_id()).expect("room exists");
|
||||
let (events, mut event_subscriber) = room_event_graph
|
||||
.subscribe()
|
||||
.await
|
||||
.expect("make this function fallible, or allow this for the time being?");
|
||||
|
||||
let has_events = !events.is_empty();
|
||||
let track_read_marker_and_receipts = settings.track_read_receipts;
|
||||
|
||||
@@ -148,7 +160,6 @@ impl TimelineBuilder {
|
||||
let client = room.client();
|
||||
|
||||
let sync_response_notify = Arc::new(Notify::new());
|
||||
let mut room_update_rx = room.subscribe_to_updates();
|
||||
let room_update_join_handle = spawn({
|
||||
let sync_response_notify = sync_response_notify.clone();
|
||||
let inner = inner.clone();
|
||||
@@ -158,8 +169,12 @@ impl TimelineBuilder {
|
||||
span.follows_from(Span::current());
|
||||
|
||||
async move {
|
||||
trace!("Spawned the event subscriber task");
|
||||
|
||||
loop {
|
||||
let update = match room_update_rx.recv().await {
|
||||
trace!("Waiting for an event");
|
||||
|
||||
let update = match event_subscriber.recv().await {
|
||||
Ok(up) => up,
|
||||
Err(broadcast::error::RecvError::Closed) => break,
|
||||
Err(broadcast::error::RecvError::Lagged(_)) => {
|
||||
@@ -169,21 +184,41 @@ impl TimelineBuilder {
|
||||
}
|
||||
};
|
||||
|
||||
trace!("Handling a room update");
|
||||
|
||||
match update {
|
||||
RoomUpdate::Left { updates, .. } => {
|
||||
inner.handle_sync_timeline(updates.timeline).await;
|
||||
RoomEventGraphUpdate::Clear => {
|
||||
trace!("Clearing the timeline.");
|
||||
inner.clear().await;
|
||||
}
|
||||
RoomUpdate::Joined { updates, .. } => {
|
||||
inner.handle_joined_room_update(updates).await;
|
||||
}
|
||||
RoomUpdate::Invited { .. } => {
|
||||
warn!("Room is in invited state, can't build or update its timeline");
|
||||
|
||||
RoomEventGraphUpdate::Append {
|
||||
events,
|
||||
prev_batch,
|
||||
account_data,
|
||||
ephemeral,
|
||||
} => {
|
||||
trace!("Received new events");
|
||||
|
||||
// XXX this timeline and the joined room updates are synthetic, until
|
||||
// we get rid of `handle_joined_room_update` by adding all functionality
|
||||
// back in the event graph, and replacing it with a simple
|
||||
// `handle_add_events`.
|
||||
let timeline = matrix_sdk_base::sync::Timeline {
|
||||
limited: false,
|
||||
prev_batch,
|
||||
events,
|
||||
};
|
||||
let update = JoinedRoom {
|
||||
unread_notifications: Default::default(),
|
||||
timeline,
|
||||
state: Default::default(),
|
||||
account_data,
|
||||
ephemeral,
|
||||
};
|
||||
inner.handle_joined_room_update(update).await;
|
||||
|
||||
sync_response_notify.notify_waiters();
|
||||
}
|
||||
}
|
||||
|
||||
sync_response_notify.notify_waiters();
|
||||
}
|
||||
}
|
||||
.instrument(span)
|
||||
@@ -268,6 +303,7 @@ impl TimelineBuilder {
|
||||
room_update_join_handle,
|
||||
ignore_user_list_update_join_handle,
|
||||
room_key_from_backups_join_handle,
|
||||
_event_graph: room_event_graph,
|
||||
}),
|
||||
};
|
||||
|
||||
|
||||
@@ -29,7 +29,6 @@ use matrix_sdk::{
|
||||
sync::JoinedRoom,
|
||||
Error, Result, Room,
|
||||
};
|
||||
use matrix_sdk_base::sync::Timeline;
|
||||
#[cfg(test)]
|
||||
use ruma::events::receipt::ReceiptEventContent;
|
||||
#[cfg(all(test, feature = "e2e-encryption"))]
|
||||
@@ -407,7 +406,7 @@ impl<P: RoomDataProvider> TimelineInner<P> {
|
||||
|
||||
pub(super) async fn add_initial_events(
|
||||
&mut self,
|
||||
events: Vector<SyncTimelineEvent>,
|
||||
events: Vec<SyncTimelineEvent>,
|
||||
back_pagination_token: Option<String>,
|
||||
) {
|
||||
if events.is_empty() {
|
||||
@@ -434,11 +433,6 @@ impl<P: RoomDataProvider> TimelineInner<P> {
|
||||
state.handle_joined_room_update(update, &self.room_data_provider, &self.settings).await;
|
||||
}
|
||||
|
||||
pub(super) async fn handle_sync_timeline(&self, timeline: Timeline) {
|
||||
let mut state = self.state.write().await;
|
||||
state.handle_sync_timeline(timeline, &self.room_data_provider, &self.settings).await;
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
pub(super) async fn handle_live_event(&self, event: SyncTimelineEvent) {
|
||||
let mut state = self.state.write().await;
|
||||
|
||||
@@ -21,7 +21,6 @@ use std::{
|
||||
};
|
||||
|
||||
use eyeball_im::{ObservableVector, ObservableVectorTransaction, ObservableVectorTransactionEntry};
|
||||
use imbl::Vector;
|
||||
use indexmap::IndexMap;
|
||||
use matrix_sdk::{deserialized_responses::SyncTimelineEvent, sync::Timeline};
|
||||
use matrix_sdk_base::{deserialized_responses::TimelineEvent, sync::JoinedRoom};
|
||||
@@ -82,7 +81,7 @@ impl TimelineInnerState {
|
||||
#[tracing::instrument(skip_all)]
|
||||
pub(super) async fn add_initial_events<P: RoomDataProvider>(
|
||||
&mut self,
|
||||
events: Vector<SyncTimelineEvent>,
|
||||
events: Vec<SyncTimelineEvent>,
|
||||
mut back_pagination_token: Option<String>,
|
||||
room_data_provider: &P,
|
||||
settings: &TimelineInnerSettings,
|
||||
@@ -111,17 +110,6 @@ impl TimelineInnerState {
|
||||
txn.commit();
|
||||
}
|
||||
|
||||
pub(super) async fn handle_sync_timeline<P: RoomDataProvider>(
|
||||
&mut self,
|
||||
timeline: Timeline,
|
||||
room_data_provider: &P,
|
||||
settings: &TimelineInnerSettings,
|
||||
) {
|
||||
let mut txn = self.transaction();
|
||||
txn.handle_sync_timeline(timeline, room_data_provider, settings).await;
|
||||
txn.commit();
|
||||
}
|
||||
|
||||
#[instrument(skip_all)]
|
||||
pub(super) async fn handle_joined_room_update<P: RoomDataProvider>(
|
||||
&mut self,
|
||||
|
||||
@@ -59,6 +59,7 @@ use tokio::sync::{mpsc::Sender, Mutex, Notify};
|
||||
use tracing::{debug, error, info, instrument, trace, warn};
|
||||
|
||||
use self::futures::SendAttachment;
|
||||
use crate::event_graph::RoomEventGraph;
|
||||
|
||||
mod builder;
|
||||
mod error;
|
||||
@@ -796,6 +797,7 @@ struct TimelineDropHandle {
|
||||
room_update_join_handle: JoinHandle<()>,
|
||||
ignore_user_list_update_join_handle: JoinHandle<()>,
|
||||
room_key_from_backups_join_handle: JoinHandle<()>,
|
||||
_event_graph: RoomEventGraph,
|
||||
}
|
||||
|
||||
impl Drop for TimelineDropHandle {
|
||||
|
||||
@@ -33,7 +33,13 @@ pub trait SlidingSyncRoomExt {
|
||||
#[async_trait]
|
||||
impl SlidingSyncRoomExt for SlidingSyncRoom {
|
||||
async fn timeline(&self) -> Option<Timeline> {
|
||||
Some(sliding_sync_timeline_builder(self)?.track_read_marker_and_receipts().build().await)
|
||||
Some(
|
||||
sliding_sync_timeline_builder(self)
|
||||
.await?
|
||||
.track_read_marker_and_receipts()
|
||||
.build()
|
||||
.await,
|
||||
)
|
||||
}
|
||||
|
||||
/// Get a timeline item representing the latest event in this room.
|
||||
@@ -46,10 +52,12 @@ impl SlidingSyncRoomExt for SlidingSyncRoom {
|
||||
}
|
||||
}
|
||||
|
||||
fn sliding_sync_timeline_builder(room: &SlidingSyncRoom) -> Option<TimelineBuilder> {
|
||||
async fn sliding_sync_timeline_builder(room: &SlidingSyncRoom) -> Option<TimelineBuilder> {
|
||||
let room_id = room.room_id();
|
||||
match room.client().get_room(room_id) {
|
||||
Some(r) => Some(Timeline::builder(&r).events(room.prev_batch(), room.timeline_queue())),
|
||||
Some(r) => {
|
||||
Some(Timeline::builder(&r).events(room.prev_batch(), room.timeline_queue()).await)
|
||||
}
|
||||
None => {
|
||||
error!(?room_id, "Room not found in client. Can't provide a timeline for it");
|
||||
None
|
||||
|
||||
@@ -15,7 +15,6 @@
|
||||
use assert_matches::assert_matches;
|
||||
use assert_matches2::assert_let;
|
||||
use eyeball_im::VectorDiff;
|
||||
use imbl::vector;
|
||||
use matrix_sdk_base::deserialized_responses::SyncTimelineEvent;
|
||||
use matrix_sdk_test::{async_test, sync_timeline_event, ALICE, BOB, CAROL};
|
||||
use ruma::{
|
||||
@@ -47,7 +46,7 @@ async fn initial_events() {
|
||||
timeline
|
||||
.inner
|
||||
.add_initial_events(
|
||||
vector![
|
||||
vec![
|
||||
SyncTimelineEvent::new(
|
||||
timeline
|
||||
.event_builder
|
||||
@@ -239,7 +238,7 @@ async fn dedup_initial() {
|
||||
timeline
|
||||
.inner
|
||||
.add_initial_events(
|
||||
vector![
|
||||
vec![
|
||||
// two events
|
||||
event_a.clone(),
|
||||
event_b.clone(),
|
||||
@@ -247,7 +246,7 @@ async fn dedup_initial() {
|
||||
event_a,
|
||||
event_b,
|
||||
// … and a new event also came in
|
||||
event_c
|
||||
event_c,
|
||||
],
|
||||
None,
|
||||
)
|
||||
|
||||
@@ -18,7 +18,6 @@ use assert_matches::assert_matches;
|
||||
use assert_matches2::assert_let;
|
||||
use eyeball_im::VectorDiff;
|
||||
use futures_core::Stream;
|
||||
use imbl::vector;
|
||||
use matrix_sdk_base::deserialized_responses::SyncTimelineEvent;
|
||||
use matrix_sdk_test::{async_test, ALICE, BOB};
|
||||
use ruma::{
|
||||
@@ -248,7 +247,7 @@ async fn initial_reaction_timestamp_is_stored() {
|
||||
timeline
|
||||
.inner
|
||||
.add_initial_events(
|
||||
vector![
|
||||
vec![
|
||||
SyncTimelineEvent::new(timeline.event_builder.make_sync_reaction(
|
||||
*ALICE,
|
||||
&Annotation::new(message_event_id.clone(), REACTION_KEY.to_owned()),
|
||||
@@ -258,7 +257,7 @@ async fn initial_reaction_timestamp_is_stored() {
|
||||
*ALICE,
|
||||
&message_event_id,
|
||||
RoomMessageEventContent::text_plain("A"),
|
||||
))
|
||||
)),
|
||||
],
|
||||
None,
|
||||
)
|
||||
|
||||
@@ -40,7 +40,7 @@ fn filter_notice(ev: &AnySyncTimelineEvent, _room_version: &RoomVersionId) -> bo
|
||||
}
|
||||
|
||||
#[async_test]
|
||||
async fn read_receipts_updates_on_live_events() {
|
||||
async fn test_read_receipts_updates_on_live_events() {
|
||||
let timeline = TestTimeline::new()
|
||||
.with_settings(TimelineInnerSettings { track_read_receipts: true, ..Default::default() });
|
||||
let mut stream = timeline.subscribe().await;
|
||||
@@ -138,7 +138,7 @@ async fn read_receipts_updates_on_back_paginated_events() {
|
||||
}
|
||||
|
||||
#[async_test]
|
||||
async fn read_receipts_updates_on_filtered_events() {
|
||||
async fn test_read_receipts_updates_on_filtered_events() {
|
||||
let timeline = TestTimeline::new().with_settings(TimelineInnerSettings {
|
||||
track_read_receipts: true,
|
||||
event_filter: Arc::new(filter_notice),
|
||||
|
||||
@@ -15,7 +15,6 @@
|
||||
use assert_matches::assert_matches;
|
||||
use assert_matches2::assert_let;
|
||||
use eyeball_im::VectorDiff;
|
||||
use imbl::vector;
|
||||
use matrix_sdk_base::deserialized_responses::SyncTimelineEvent;
|
||||
use matrix_sdk_test::{async_test, sync_timeline_event, ALICE, BOB};
|
||||
use ruma::{
|
||||
@@ -148,10 +147,10 @@ async fn reaction_redaction_timeline_filter() {
|
||||
timeline
|
||||
.inner
|
||||
.add_initial_events(
|
||||
vector![SyncTimelineEvent::new(
|
||||
vec![SyncTimelineEvent::new(
|
||||
timeline
|
||||
.event_builder
|
||||
.make_sync_redacted_message_event(*ALICE, RedactedReactionEventContent::new())
|
||||
.make_sync_redacted_message_event(*ALICE, RedactedReactionEventContent::new()),
|
||||
)],
|
||||
None,
|
||||
)
|
||||
|
||||
@@ -94,7 +94,7 @@ async fn day_divider() {
|
||||
}
|
||||
|
||||
#[async_test]
|
||||
async fn update_read_marker() {
|
||||
async fn test_update_read_marker() {
|
||||
let timeline = TestTimeline::new();
|
||||
let mut stream = timeline.subscribe().await;
|
||||
|
||||
|
||||
@@ -2370,7 +2370,7 @@ async fn test_room_timeline() -> Result<(), Error> {
|
||||
};
|
||||
|
||||
let room = room_list.room(room_id).await?;
|
||||
room.init_timeline_with_builder(room.default_room_timeline_builder()).await?;
|
||||
room.init_timeline_with_builder(room.default_room_timeline_builder().await).await?;
|
||||
let timeline = room.timeline().unwrap();
|
||||
|
||||
let (previous_timeline_items, mut timeline_items_stream) = timeline.subscribe().await;
|
||||
@@ -2452,7 +2452,7 @@ async fn test_room_latest_event() -> Result<(), Error> {
|
||||
};
|
||||
|
||||
let room = room_list.room(room_id).await?;
|
||||
room.init_timeline_with_builder(room.default_room_timeline_builder()).await?;
|
||||
room.init_timeline_with_builder(room.default_room_timeline_builder().await).await?;
|
||||
|
||||
// The latest event does not exist.
|
||||
assert!(room.latest_event().await.is_none());
|
||||
|
||||
@@ -281,7 +281,7 @@ async fn read_receipts_updates() {
|
||||
}
|
||||
|
||||
#[async_test]
|
||||
async fn read_receipts_updates_on_filtered_events() {
|
||||
async fn test_read_receipts_updates_on_filtered_events() {
|
||||
let room_id = room_id!("!a98sd12bjh:example.org");
|
||||
let (client, server) = logged_in_client().await;
|
||||
let sync_settings = SyncSettings::new().timeout(Duration::from_millis(3000));
|
||||
|
||||
@@ -134,7 +134,7 @@ async fn login_and_sync(server_name: String) -> anyhow::Result<()> {
|
||||
let room_id = { rooms.lock().unwrap()[id].as_room_id().map(ToOwned::to_owned) };
|
||||
if let Some(room_id) = &room_id {
|
||||
let room = room_list_service.room(room_id).await?;
|
||||
room.init_timeline_with_builder(room.default_room_timeline_builder())
|
||||
room.init_timeline_with_builder(room.default_room_timeline_builder().await)
|
||||
.await?;
|
||||
let timeline = room.timeline().unwrap();
|
||||
|
||||
|
||||
Reference in New Issue
Block a user