chore(base): Move dispatch_and_get_new_users inside the sync module.

This patch moves the `state_events::dispatch_and_get_new_users`
processor inside the `state_events::sync` module. Why? Because a similar
processor for `state_events::stripped` is about to be created.
This commit is contained in:
Ivan Enderlin
2025-04-11 15:12:09 +02:00
parent 0306683cbf
commit e21ae2ae53
3 changed files with 87 additions and 84 deletions

View File

@@ -616,7 +616,7 @@ impl BaseClient {
let (raw_state_events, state_events) =
processors::state_events::sync::collect(&mut context, &new_info.state.events);
let mut new_user_ids = processors::state_events::dispatch_and_get_new_users(
let mut new_user_ids = processors::state_events::sync::dispatch_and_get_new_users(
&mut context,
(&raw_state_events, &state_events),
&mut room_info,
@@ -651,13 +651,14 @@ impl BaseClient {
&new_info.timeline.events,
);
let mut other_new_user_ids = processors::state_events::dispatch_and_get_new_users(
&mut context,
(&raw_state_events_from_timeline, &state_events_from_timeline),
&mut room_info,
&mut ambiguity_cache,
)
.await?;
let mut other_new_user_ids =
processors::state_events::sync::dispatch_and_get_new_users(
&mut context,
(&raw_state_events_from_timeline, &state_events_from_timeline),
&mut room_info,
&mut ambiguity_cache,
)
.await?;
new_user_ids.append(&mut other_new_user_ids);
updated_members_in_room.insert(room_id.to_owned(), new_user_ids.clone());
@@ -745,7 +746,7 @@ impl BaseClient {
let (raw_state_events, state_events) =
processors::state_events::sync::collect(&mut context, &new_info.state.events);
let mut new_user_ids = processors::state_events::dispatch_and_get_new_users(
let mut new_user_ids = processors::state_events::sync::dispatch_and_get_new_users(
&mut context,
(&raw_state_events, &state_events),
&mut room_info,
@@ -759,13 +760,14 @@ impl BaseClient {
&new_info.timeline.events,
);
let mut other_new_user_ids = processors::state_events::dispatch_and_get_new_users(
&mut context,
(&raw_state_events_from_timeline, &state_events_from_timeline),
&mut room_info,
&mut ambiguity_cache,
)
.await?;
let mut other_new_user_ids =
processors::state_events::sync::dispatch_and_get_new_users(
&mut context,
(&raw_state_events_from_timeline, &state_events_from_timeline),
&mut room_info,
&mut ambiguity_cache,
)
.await?;
new_user_ids.append(&mut other_new_user_ids);
let timeline = processors::timeline::build(

View File

@@ -12,30 +12,30 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use std::{
collections::{BTreeMap, BTreeSet},
iter,
};
use ruma::{
events::{room::member::MembershipState, AnySyncStateEvent},
serde::Raw,
OwnedUserId,
};
use ruma::{events::AnySyncStateEvent, serde::Raw};
use serde::Deserialize;
use tracing::{instrument, warn};
use tracing::warn;
use super::{profiles, Context};
use crate::{
store::{ambiguity_map::AmbiguityCache, Result as StoreResult},
RoomInfo,
};
use super::Context;
/// Collect [`AnySyncStateEvent`].
pub mod sync {
use ruma::events::AnySyncTimelineEvent;
use std::{
collections::{BTreeMap, BTreeSet},
iter,
};
use super::{AnySyncStateEvent, Context, Raw};
use ruma::{
events::{room::member::MembershipState, AnySyncTimelineEvent},
OwnedUserId,
};
use tracing::instrument;
use super::{super::profiles, AnySyncStateEvent, Context, Raw};
use crate::{
store::{ambiguity_map::AmbiguityCache, Result as StoreResult},
RoomInfo,
};
/// Collect [`AnySyncStateEvent`] to [`AnySyncStateEvent`].
pub fn collect(
@@ -61,6 +61,56 @@ pub mod sync {
}
}))
}
/// Dispatch the state events and return the new users for this room.
///
/// `raw_events` and `events` must be generated from [`collect_sync`].
/// Events must be exactly the same list of events that are in
/// raw_events, but deserialised. We demand them here to avoid
/// deserialising multiple times.
#[instrument(skip_all, fields(room_id = ?room_info.room_id))]
pub async fn dispatch_and_get_new_users(
context: &mut Context,
(raw_events, events): (&[Raw<AnySyncStateEvent>], &[AnySyncStateEvent]),
room_info: &mut RoomInfo,
ambiguity_cache: &mut AmbiguityCache,
) -> StoreResult<BTreeSet<OwnedUserId>> {
let mut user_ids = BTreeSet::new();
if raw_events.is_empty() {
return Ok(user_ids);
}
let mut state_events = BTreeMap::new();
for (raw_event, event) in iter::zip(raw_events, events) {
room_info.handle_state_event(event);
if let AnySyncStateEvent::RoomMember(member) = event {
ambiguity_cache
.handle_event(&context.state_changes, &room_info.room_id, member)
.await?;
match member.membership() {
MembershipState::Join | MembershipState::Invite => {
user_ids.insert(member.state_key().to_owned());
}
_ => (),
}
profiles::upsert_or_delete(context, &room_info.room_id, member);
}
state_events
.entry(event.event_type())
.or_insert_with(BTreeMap::new)
.insert(event.state_key().to_owned(), raw_event.clone());
}
context.state_changes.state.insert(room_info.room_id.clone(), state_events);
Ok(user_ids)
}
}
/// Collect [`AnyStrippedStateEvent`].
@@ -94,52 +144,3 @@ where
})
.unzip()
}
/// Dispatch the state events and return the new users for this room.
///
/// `raw_events` and `events` must be generated from [`collect_sync`]. Events
/// must be exactly the same list of events that are in raw_events, but
/// deserialised. We demand them here to avoid deserialising multiple times.
#[instrument(skip_all, fields(room_id = ?room_info.room_id))]
pub async fn dispatch_and_get_new_users(
context: &mut Context,
(raw_events, events): (&[Raw<AnySyncStateEvent>], &[AnySyncStateEvent]),
room_info: &mut RoomInfo,
ambiguity_cache: &mut AmbiguityCache,
) -> StoreResult<BTreeSet<OwnedUserId>> {
let mut user_ids = BTreeSet::new();
if raw_events.is_empty() {
return Ok(user_ids);
}
let mut state_events = BTreeMap::new();
for (raw_event, event) in iter::zip(raw_events, events) {
room_info.handle_state_event(event);
if let AnySyncStateEvent::RoomMember(member) = event {
ambiguity_cache
.handle_event(&context.state_changes, &room_info.room_id, member)
.await?;
match member.membership() {
MembershipState::Join | MembershipState::Invite => {
user_ids.insert(member.state_key().to_owned());
}
_ => (),
}
profiles::upsert_or_delete(context, &room_info.room_id, member);
}
state_events
.entry(event.event_type())
.or_insert_with(BTreeMap::new)
.insert(event.state_key().to_owned(), raw_event.clone());
}
context.state_changes.state.insert(room_info.room_id.clone(), state_events);
Ok(user_ids)
}

View File

@@ -394,7 +394,7 @@ impl BaseClient {
room_info.handle_encryption_state(requested_required_states);
#[cfg_attr(not(feature = "e2e-encryption"), allow(unused))]
let new_user_ids = processors::state_events::dispatch_and_get_new_users(
let new_user_ids = processors::state_events::sync::dispatch_and_get_new_users(
context,
(&raw_state_events, &state_events),
&mut room_info,