base: move processing of the direct room inside the AccountDataProcessor

This commit is contained in:
Benjamin Bouvier
2024-10-01 17:08:32 +02:00
parent 759a9b0e18
commit 4f265ccd22
3 changed files with 103 additions and 120 deletions

View File

@@ -16,7 +16,7 @@
#[cfg(feature = "e2e-encryption")]
use std::ops::Deref;
use std::{
collections::{BTreeMap, BTreeSet, HashMap, HashSet},
collections::{BTreeMap, BTreeSet},
fmt, iter,
sync::Arc,
};
@@ -49,10 +49,9 @@ use ruma::{
RoomPowerLevelsEvent, RoomPowerLevelsEventContent, StrippedRoomPowerLevelsEvent,
},
},
AnyGlobalAccountDataEvent, AnyRoomAccountDataEvent, AnyStrippedStateEvent,
AnySyncEphemeralRoomEvent, AnySyncMessageLikeEvent, AnySyncStateEvent,
AnySyncTimelineEvent, GlobalAccountDataEventType, StateEvent, StateEventType,
SyncStateEvent,
AnyRoomAccountDataEvent, AnyStrippedStateEvent, AnySyncEphemeralRoomEvent,
AnySyncMessageLikeEvent, AnySyncStateEvent, AnySyncTimelineEvent,
GlobalAccountDataEventType, StateEvent, StateEventType, SyncStateEvent,
},
push::{Action, PushConditionRoomCtx, Ruleset},
serde::Raw,
@@ -689,72 +688,6 @@ impl BaseClient {
}
}
/// Processes the direct rooms in a sync response:
///
/// Given a [`StateChanges`] instance, processes any direct room info
/// from the global account data and adds it to the room infos to
/// save.
#[instrument(skip_all)]
pub(crate) async fn process_direct_rooms(
&self,
events: &[AnyGlobalAccountDataEvent],
changes: &mut StateChanges,
) {
for event in events {
let AnyGlobalAccountDataEvent::Direct(direct_event) = event else { continue };
let mut new_dms = HashMap::<&RoomId, HashSet<OwnedUserId>>::new();
for (user_id, rooms) in direct_event.content.iter() {
for room_id in rooms {
new_dms.entry(room_id).or_default().insert(user_id.clone());
}
}
let rooms = self.store.rooms();
let mut old_dms = rooms
.iter()
.filter_map(|r| {
let direct_targets = r.direct_targets();
(!direct_targets.is_empty()).then(|| (r.room_id(), direct_targets))
})
.collect::<HashMap<_, _>>();
// Update the direct targets of rooms if they changed.
for (room_id, new_direct_targets) in new_dms {
if let Some(old_direct_targets) = old_dms.remove(&room_id) {
if old_direct_targets == new_direct_targets {
continue;
}
}
trace!(
?room_id, targets = ?new_direct_targets,
"Marking room as direct room"
);
if let Some(info) = changes.room_infos.get_mut(room_id) {
info.base_info.dm_targets = new_direct_targets;
} else if let Some(room) = self.store.room(room_id) {
let mut info = room.clone_info();
info.base_info.dm_targets = new_direct_targets;
changes.add_room(info);
}
}
// Remove the targets of old direct chats.
for room_id in old_dms.keys() {
trace!(?room_id, "Unmarking room as direct room");
if let Some(info) = changes.room_infos.get_mut(*room_id) {
info.base_info.dm_targets.clear();
} else if let Some(room) = self.store.room(room_id) {
let mut info = room.clone_info();
info.base_info.dm_targets.clear();
changes.add_room(info);
}
}
}
}
#[cfg(feature = "e2e-encryption")]
#[instrument(skip_all)]
pub(crate) async fn preprocess_to_device_events(
@@ -1168,26 +1101,7 @@ impl BaseClient {
new_rooms.invite.insert(room_id, new_info);
}
// We're processing direct state events here separately
// because we want to have the push rules in place before we process
// rooms and their events, but we want to create the rooms before we
// process the `m.direct` account data event.
let global_account_data_events = account_data.apply(&mut changes);
let has_new_direct_room_data = global_account_data_events
.iter()
.any(|event| event.event_type() == GlobalAccountDataEventType::Direct);
if has_new_direct_room_data {
self.process_direct_rooms(&global_account_data_events, &mut changes).await;
} else if let Ok(Some(direct_account_data)) =
self.store.get_account_data_event(GlobalAccountDataEventType::Direct).await
{
debug!("Found direct room data in the Store, applying it");
if let Ok(direct_account_data) = direct_account_data.deserialize() {
self.process_direct_rooms(&[direct_account_data], &mut changes).await;
} else {
warn!("Failed to deserialize direct room account data");
}
}
account_data.apply(&mut changes, &self.store).await;
changes.presence = response
.presence

View File

@@ -12,15 +12,19 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use std::{collections::BTreeMap, mem};
use std::{
collections::{BTreeMap, HashMap, HashSet},
mem,
};
use ruma::{
events::{AnyGlobalAccountDataEvent, GlobalAccountDataEventType},
serde::Raw,
OwnedUserId, RoomId,
};
use tracing::warn;
use tracing::{debug, instrument, trace, warn};
use crate::StateChanges;
use crate::{store::Store, StateChanges};
#[must_use]
pub(crate) struct AccountDataProcessor {
@@ -56,9 +60,96 @@ impl AccountDataProcessor {
self.raw_by_type.get(&GlobalAccountDataEventType::PushRules)
}
/// Processes the direct rooms in a sync response:
///
/// Given a [`StateChanges`] instance, processes any direct room info
/// from the global account data and adds it to the room infos to
/// save.
#[instrument(skip_all)]
pub(crate) fn process_direct_rooms(
&self,
events: &[AnyGlobalAccountDataEvent],
store: &Store,
changes: &mut StateChanges,
) {
for event in events {
let AnyGlobalAccountDataEvent::Direct(direct_event) = event else { continue };
let mut new_dms = HashMap::<&RoomId, HashSet<OwnedUserId>>::new();
for (user_id, rooms) in direct_event.content.iter() {
for room_id in rooms {
new_dms.entry(room_id).or_default().insert(user_id.clone());
}
}
let rooms = store.rooms();
let mut old_dms = rooms
.iter()
.filter_map(|r| {
let direct_targets = r.direct_targets();
(!direct_targets.is_empty()).then(|| (r.room_id(), direct_targets))
})
.collect::<HashMap<_, _>>();
// Update the direct targets of rooms if they changed.
for (room_id, new_direct_targets) in new_dms {
if let Some(old_direct_targets) = old_dms.remove(&room_id) {
if old_direct_targets == new_direct_targets {
continue;
}
}
trace!(
?room_id, targets = ?new_direct_targets,
"Marking room as direct room"
);
if let Some(info) = changes.room_infos.get_mut(room_id) {
info.base_info.dm_targets = new_direct_targets;
} else if let Some(room) = store.room(room_id) {
let mut info = room.clone_info();
info.base_info.dm_targets = new_direct_targets;
changes.add_room(info);
}
}
// Remove the targets of old direct chats.
for room_id in old_dms.keys() {
trace!(?room_id, "Unmarking room as direct room");
if let Some(info) = changes.room_infos.get_mut(*room_id) {
info.base_info.dm_targets.clear();
} else if let Some(room) = store.room(room_id) {
let mut info = room.clone_info();
info.base_info.dm_targets.clear();
changes.add_room(info);
}
}
}
}
/// Applies the processed data to the state changes.
pub fn apply(mut self, changes: &mut StateChanges) -> Vec<AnyGlobalAccountDataEvent> {
pub async fn apply(mut self, changes: &mut StateChanges, store: &Store) {
// Fill in the content of `changes.account_data`.
mem::swap(&mut changes.account_data, &mut self.raw_by_type);
self.parsed_events
// Process direct rooms.
let has_new_direct_room_data = self
.parsed_events
.iter()
.any(|event| event.event_type() == GlobalAccountDataEventType::Direct);
if has_new_direct_room_data {
self.process_direct_rooms(&self.parsed_events, store, changes);
} else if let Ok(Some(direct_account_data)) =
store.get_account_data_event(GlobalAccountDataEventType::Direct).await
{
debug!("Found direct room data in the Store, applying it");
if let Ok(direct_account_data) = direct_account_data.deserialize() {
self.process_direct_rooms(&[direct_account_data], store, changes);
} else {
warn!("Failed to deserialize direct room account data");
}
}
}
}

View File

@@ -28,10 +28,7 @@ use ruma::api::client::sync::sync_events::v5;
use ruma::events::AnyToDeviceEvent;
use ruma::{
api::client::sync::sync_events::v3::{self, InvitedRoom},
events::{
AnyRoomAccountDataEvent, AnySyncStateEvent, AnySyncTimelineEvent,
GlobalAccountDataEventType,
},
events::{AnyRoomAccountDataEvent, AnySyncStateEvent, AnySyncTimelineEvent},
serde::Raw,
JsOption, OwnedRoomId, RoomId, UInt,
};
@@ -301,26 +298,7 @@ impl BaseClient {
}
}
// We're processing direct state events here separately
// because we want to have the push rules in place before we process
// rooms and their events, but we want to create the rooms before we
// process the `m.direct` account data event.
let global_account_data_events = account_data.apply(&mut changes);
let has_new_direct_room_data = global_account_data_events
.iter()
.any(|event| event.event_type() == GlobalAccountDataEventType::Direct);
if has_new_direct_room_data {
self.process_direct_rooms(&global_account_data_events, &mut changes).await;
} else if let Ok(Some(direct_account_data)) =
self.store.get_account_data_event(GlobalAccountDataEventType::Direct).await
{
debug!("Found direct room data in the Store, applying it");
if let Ok(direct_account_data) = direct_account_data.deserialize() {
self.process_direct_rooms(&[direct_account_data], &mut changes).await;
} else {
warn!("Failed to deserialize direct room account data");
}
}
account_data.apply(&mut changes, &store).await;
// FIXME not yet supported by sliding sync.
// changes.presence = presence