refactor(sdk): Add trait ProfileProvider for use in TimelineInner

This commit is contained in:
Jonas Platte
2023-01-10 19:13:36 +01:00
parent fa1b3c36a5
commit 809e9bce6c
4 changed files with 73 additions and 66 deletions

View File

@@ -29,12 +29,13 @@ use super::{
},
find_event_by_txn_id, TimelineItem, TimelineKey,
};
use crate::events::SyncTimelineEventWithoutContent;
use crate::{events::SyncTimelineEventWithoutContent, room};
#[derive(Debug, Default)]
pub(super) struct TimelineInner {
#[derive(Debug)]
pub(super) struct TimelineInner<P: ProfileProvider = room::Common> {
items: MutableVec<Arc<TimelineItem>>,
metadata: Mutex<TimelineInnerMetadata>,
profile_provider: P,
}
/// Non-signalling parts of `TimelineInner`.
@@ -46,7 +47,11 @@ pub(super) struct TimelineInnerMetadata {
pub(super) fully_read_event_in_timeline: bool,
}
impl TimelineInner {
impl<P: ProfileProvider> TimelineInner<P> {
pub(super) fn new(profile_provider: P) -> Self {
Self { items: Default::default(), metadata: Default::default(), profile_provider }
}
pub(super) fn items(&self) -> MutableVecLockRef<'_, Arc<TimelineItem>> {
self.items.lock_ref()
}
@@ -55,11 +60,7 @@ impl TimelineInner {
self.items.signal_vec_cloned()
}
pub(super) fn add_initial_events(
&mut self,
events: Vec<SyncTimelineEvent>,
own_user_id: &UserId,
) {
pub(super) fn add_initial_events(&mut self, events: Vec<SyncTimelineEvent>) {
if events.is_empty() {
return;
}
@@ -72,11 +73,11 @@ impl TimelineInner {
for event in events {
handle_remote_event(
event.event,
own_user_id,
event.encryption_info,
TimelineItemPosition::End,
timeline_items,
timeline_meta,
&self.profile_provider,
);
}
}
@@ -85,16 +86,15 @@ impl TimelineInner {
&self,
raw: Raw<AnySyncTimelineEvent>,
encryption_info: Option<EncryptionInfo>,
own_user_id: &UserId,
) {
let mut timeline_meta = self.metadata.lock().await;
handle_remote_event(
raw,
own_user_id,
encryption_info,
TimelineItemPosition::End,
&mut self.items.lock_mut(),
&mut timeline_meta,
&self.profile_provider,
);
}
@@ -102,10 +102,9 @@ impl TimelineInner {
&self,
txn_id: OwnedTransactionId,
content: AnyMessageLikeEventContent,
own_user_id: &UserId,
) {
let event_meta = TimelineEventMetadata {
sender: own_user_id.to_owned(),
sender: self.profile_provider.own_user_id().to_owned(),
is_own_event: true,
relations: Default::default(),
// FIXME: Should we supply something here for encrypted rooms?
@@ -127,16 +126,15 @@ impl TimelineInner {
pub(super) async fn handle_back_paginated_event(
&self,
event: TimelineEvent,
own_user_id: &UserId,
) -> HandleEventResult {
let mut metadata_lock = self.metadata.lock().await;
handle_remote_event(
event.event.cast(),
own_user_id,
event.encryption_info,
TimelineItemPosition::Start,
&mut self.items.lock_mut(),
&mut metadata_lock,
&self.profile_provider,
)
}
@@ -210,13 +208,12 @@ impl TimelineInner {
}
#[cfg(feature = "e2e-encryption")]
#[instrument(skip(self, olm_machine, own_user_id))]
#[instrument(skip(self, olm_machine))]
pub(super) async fn retry_event_decryption(
&self,
room_id: &RoomId,
olm_machine: &OlmMachine,
session_ids: BTreeSet<&str>,
own_user_id: &UserId,
) {
use super::EncryptedMessage;
@@ -284,26 +281,42 @@ impl TimelineInner {
let mut items_lock = self.items.lock_mut();
handle_remote_event(
event.event.cast(),
own_user_id,
event.encryption_info,
TimelineItemPosition::Update(*idx),
&mut items_lock,
&mut metadata_lock,
&self.profile_provider,
);
}
}
}
impl TimelineInner {
pub(super) fn room(&self) -> &room::Common {
&self.profile_provider
}
}
pub(super) trait ProfileProvider {
fn own_user_id(&self) -> &UserId;
}
impl ProfileProvider for room::Common {
fn own_user_id(&self) -> &UserId {
(**self).own_user_id()
}
}
/// Handle a remote event.
///
/// Returns the number of timeline updates that were made.
fn handle_remote_event(
fn handle_remote_event<P: ProfileProvider>(
raw: Raw<AnySyncTimelineEvent>,
own_user_id: &UserId,
encryption_info: Option<EncryptionInfo>,
position: TimelineItemPosition,
timeline_items: &mut MutableVecLockMut<'_, Arc<TimelineItem>>,
timeline_meta: &mut TimelineInnerMetadata,
profile_provider: &P,
) -> HandleEventResult {
let (event_id, sender, origin_server_ts, txn_id, relations, event_kind) =
match raw.deserialize() {
@@ -331,7 +344,7 @@ fn handle_remote_event(
},
};
let is_own_event = sender == own_user_id;
let is_own_event = sender == profile_provider.own_user_id();
let event_meta = TimelineEventMetadata { sender, is_own_event, relations, encryption_info };
let flow = Flow::Remote { event_id, origin_server_ts, raw_event: raw, txn_id, position };

View File

@@ -31,7 +31,7 @@ use ruma::{
};
use tracing::{error, instrument, warn};
use super::{Joined, Room};
use super::Joined;
use crate::{
event_handler::EventHandlerHandle,
room::{self, MessagesOptions},
@@ -68,8 +68,7 @@ use self::{
/// messages.
#[derive(Debug)]
pub struct Timeline {
inner: Arc<TimelineInner>,
room: room::Common,
inner: Arc<TimelineInner<room::Common>>,
start_token: Mutex<Option<String>>,
_end_token: Mutex<Option<String>>,
event_handler_handles: Vec<EventHandlerHandle>,
@@ -78,7 +77,7 @@ pub struct Timeline {
impl Drop for Timeline {
fn drop(&mut self) {
for handle in self.event_handler_handles.drain(..) {
self.room.client.remove_event_handler(handle);
self.inner.room().client().remove_event_handler(handle);
}
}
}
@@ -93,17 +92,17 @@ impl Timeline {
prev_token: Option<String>,
events: Vec<SyncTimelineEvent>,
) -> Self {
let mut inner = TimelineInner::default();
inner.add_initial_events(events, room.own_user_id());
let mut inner = TimelineInner::new(room.to_owned());
inner.add_initial_events(events);
let inner = Arc::new(inner);
let timeline_event_handle = room.add_event_handler({
let inner = inner.clone();
move |event, encryption_info: Option<EncryptionInfo>, room: Room| {
move |event, encryption_info: Option<EncryptionInfo>| {
let inner = inner.clone();
async move {
inner.handle_live_event(event, encryption_info, room.own_user_id()).await;
inner.handle_live_event(event, encryption_info).await;
}
}
});
@@ -129,16 +128,19 @@ impl Timeline {
Timeline {
inner,
room: room.clone(),
start_token: Mutex::new(prev_token),
_end_token: Mutex::new(None),
event_handler_handles,
}
}
fn room(&self) -> &room::Common {
self.inner.room()
}
/// Enable tracking of the fully-read marker on this `Timeline`.
pub async fn with_fully_read_tracking(mut self) -> Self {
match self.room.account_data_static::<FullyReadEventContent>().await {
match self.room().account_data_static::<FullyReadEventContent>().await {
Ok(Some(fully_read)) => match fully_read.deserialize() {
Ok(fully_read) => {
self.inner.set_fully_read_event(fully_read.content.event_id).await
@@ -154,7 +156,7 @@ impl Timeline {
}
let inner = self.inner.clone();
let fully_read_handle = self.room.add_event_handler(move |event| {
let fully_read_handle = self.room().add_event_handler(move |event| {
let inner = inner.clone();
async move {
inner.handle_fully_read(event).await;
@@ -174,7 +176,7 @@ impl Timeline {
/// fewer events, for example because the supplied number is too big or
/// the beginning of the visible timeline was reached.
/// * `
#[instrument(skip_all, fields(initial_pagination_size, room_id = ?self.room.room_id()))]
#[instrument(skip_all, fields(initial_pagination_size, room_id = ?self.room().room_id()))]
pub async fn paginate_backwards(&self, mut opts: PaginationOptions<'_>) -> Result<()> {
let mut start_lock = self.start_token.lock().await;
if start_lock.is_none()
@@ -186,13 +188,12 @@ impl Timeline {
self.inner.add_loading_indicator();
let own_user_id = self.room.own_user_id();
let mut from = start_lock.clone();
let mut outcome = PaginationOutcome::new();
while let Some(limit) = opts.next_event_limit(outcome) {
let messages = self
.room
.room()
.messages(assign!(MessagesOptions::backward(), {
from,
limit: limit.into(),
@@ -207,7 +208,7 @@ impl Timeline {
outcome.items_updated = 0;
for room_ev in messages.chunk {
let res = self.inner.handle_back_paginated_event(room_ev, own_user_id).await;
let res = self.inner.handle_back_paginated_event(room_ev).await;
outcome.items_added = outcome.items_added.checked_add(res.item_added as u16)?;
outcome.items_updated = outcome.items_updated.checked_add(res.items_updated)?;
}
@@ -272,10 +273,9 @@ impl Timeline {
) {
self.inner
.retry_event_decryption(
self.room.room_id(),
self.room.client.olm_machine().expect("Olm machine wasn't started"),
self.room().room_id(),
self.room().client.olm_machine().expect("Olm machine wasn't started"),
session_ids.into_iter().map(AsRef::as_ref).collect(),
self.room.own_user_id(),
)
.await;
}
@@ -329,20 +329,18 @@ impl Timeline {
///
/// [`MessageLikeUnsigned`]: ruma::events::MessageLikeUnsigned
/// [`SyncMessageLikeEvent`]: ruma::events::SyncMessageLikeEvent
#[instrument(skip(self, content), fields(room_id = ?self.room.room_id()))]
#[instrument(skip(self, content), fields(room_id = ?self.room().room_id()))]
pub async fn send(
&self,
content: AnyMessageLikeEventContent,
txn_id: Option<&TransactionId>,
) -> Result<()> {
let txn_id = txn_id.map_or_else(TransactionId::new, ToOwned::to_owned);
self.inner
.handle_local_event(txn_id.clone(), content.clone(), self.room.own_user_id())
.await;
self.inner.handle_local_event(txn_id.clone(), content.clone()).await;
// If this room isn't actually in joined state, we'll get a server error.
// Not ideal, but works for now.
let room = Joined { inner: self.room.clone() };
let room = Joined { inner: self.room().clone() };
let response = room.send(content, Some(&txn_id)).await?;
self.inner.add_event_id(&txn_id, response.event_id);

View File

@@ -47,8 +47,8 @@ use ruma::{
use serde_json::{json, Value as JsonValue};
use super::{
EncryptedMessage, TimelineInner, TimelineItem, TimelineItemContent, TimelineKey,
VirtualTimelineItem,
inner::ProfileProvider, EncryptedMessage, TimelineInner, TimelineItem, TimelineItemContent,
TimelineKey, VirtualTimelineItem,
};
static ALICE: Lazy<&UserId> = Lazy::new(|| user_id!("@alice:server.name"));
@@ -229,7 +229,6 @@ async fn unable_to_decrypt() {
room_id!("!DovneieKSTkdHKpIXy:morpheus.localhost"),
&olm_machine,
iter::once(SESSION_ID).collect(),
own_user_id,
)
.await;
@@ -540,7 +539,7 @@ async fn initial_events() {
}
struct TestTimeline {
inner: TimelineInner,
inner: TimelineInner<TestProfileProvider>,
}
impl TestTimeline {
@@ -551,7 +550,7 @@ impl TestTimeline {
fn with_initial_events<'a>(
events: impl IntoIterator<Item = (&'a UserId, AnyMessageLikeEventContent)>,
) -> Self {
let mut inner = TimelineInner::default();
let mut inner = TimelineInner::new(TestProfileProvider);
inner.add_initial_events(
events
.into_iter()
@@ -561,7 +560,6 @@ impl TestTimeline {
SyncTimelineEvent { event, encryption_info: None }
})
.collect(),
&ALICE,
);
Self { inner }
@@ -577,12 +575,12 @@ impl TestTimeline {
{
let ev = make_message_event(sender, content);
let raw = Raw::new(&ev).unwrap().cast();
self.inner.handle_live_event(raw, None, &ALICE).await;
self.inner.handle_live_event(raw, None).await;
}
async fn handle_live_custom_event(&self, event: JsonValue) {
let raw = Raw::new(&event).unwrap().cast();
self.inner.handle_live_event(raw, None, &ALICE).await;
self.inner.handle_live_event(raw, None).await;
}
async fn handle_live_redaction(&self, sender: &UserId, redacts: &EventId) {
@@ -595,16 +593,24 @@ impl TestTimeline {
"origin_server_ts": next_server_ts(),
});
let raw = Raw::new(&ev).unwrap().cast();
self.inner.handle_live_event(raw, None, &ALICE).await;
self.inner.handle_live_event(raw, None).await;
}
async fn handle_local_event(&self, content: AnyMessageLikeEventContent) -> OwnedTransactionId {
let txn_id = TransactionId::new();
self.inner.handle_local_event(txn_id.clone(), content, &ALICE).await;
self.inner.handle_local_event(txn_id.clone(), content).await;
txn_id
}
}
struct TestProfileProvider;
impl ProfileProvider for TestProfileProvider {
fn own_user_id(&self) -> &UserId {
&ALICE
}
}
fn make_message_event<C: MessageLikeEventContent>(sender: &UserId, content: C) -> JsonValue {
json!({
"type": content.event_type(),

View File

@@ -61,17 +61,7 @@ async fn retry_decryption(
return;
};
let Some(own_user_id) = client.user_id() else {
error!("The user's own ID isn't available");
return;
};
inner
.retry_event_decryption(
&room_id,
olm_machine,
iter::once(session_id.as_str()).collect(),
own_user_id,
)
.retry_event_decryption(&room_id, olm_machine, iter::once(session_id.as_str()).collect())
.await;
}