refactor(base): Create the state_events::dispatch_and_get_new_users response processor.

This patch extracts the `BaseClient::handle_state` method as a new
response processor named `state_events::dispatch_and_get_new_users`.
It appears that we can do something more elegant with the returned new
users. See the next patch.
This commit is contained in:
Ivan Enderlin
2025-04-09 11:05:29 +02:00
parent afecd6d508
commit b0c1eda682
3 changed files with 83 additions and 81 deletions

View File

@@ -610,54 +610,6 @@ impl BaseClient {
Ok(())
}
/// Process the events provided during a sync.
///
/// events must be exactly the same list of events that are in raw_events,
/// but deserialised. We demand them here to avoid deserialising
/// multiple times.
#[instrument(skip_all, fields(room_id = ?room_info.room_id))]
pub(crate) async fn handle_state(
&self,
context: &mut Context,
raw_events: &[Raw<AnySyncStateEvent>],
events: &[AnySyncStateEvent],
room_info: &mut RoomInfo,
ambiguity_cache: &mut AmbiguityCache,
) -> StoreResult<BTreeSet<OwnedUserId>> {
let mut state_events = BTreeMap::new();
let mut user_ids = BTreeSet::new();
assert_eq!(raw_events.len(), events.len());
for (raw_event, event) in iter::zip(raw_events, events) {
room_info.handle_state_event(event);
if let AnySyncStateEvent::RoomMember(member) = &event {
ambiguity_cache
.handle_event(&context.state_changes, &room_info.room_id, member)
.await?;
match member.membership() {
MembershipState::Join | MembershipState::Invite => {
user_ids.insert(member.state_key().to_owned());
}
_ => (),
}
processors::profiles::upsert_or_delete(context, &room_info.room_id, member);
}
state_events
.entry(event.event_type())
.or_insert_with(BTreeMap::new)
.insert(event.state_key().to_owned(), raw_event.clone());
}
context.state_changes.state.insert((*room_info.room_id).to_owned(), state_events);
Ok(user_ids)
}
/// User has knocked on a room.
///
/// Update the internal and cached state accordingly. Return the final Room.
@@ -848,15 +800,13 @@ impl BaseClient {
let (raw_state_events, state_events) =
processors::state_events::collect_sync(&mut context, &new_info.state.events);
let mut new_user_ids = self
.handle_state(
&mut context,
&raw_state_events,
&state_events,
&mut room_info,
&mut ambiguity_cache,
)
.await?;
let mut new_user_ids = processors::state_events::dispatch_and_get_new_users(
&mut context,
(&raw_state_events, &state_events),
&mut room_info,
&mut ambiguity_cache,
)
.await?;
updated_members_in_room.insert(room_id.to_owned(), new_user_ids.clone());
@@ -962,15 +912,13 @@ impl BaseClient {
let (raw_state_events, state_events) =
processors::state_events::collect_sync(&mut context, &new_info.state.events);
let mut user_ids = self
.handle_state(
&mut context,
&raw_state_events,
&state_events,
&mut room_info,
&mut ambiguity_cache,
)
.await?;
let mut user_ids = processors::state_events::dispatch_and_get_new_users(
&mut context,
(&raw_state_events, &state_events),
&mut room_info,
&mut ambiguity_cache,
)
.await?;
let timeline = self
.handle_timeline(

View File

@@ -12,14 +12,24 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use std::{
collections::{BTreeMap, BTreeSet},
iter,
};
use ruma::{
events::{AnyStrippedStateEvent, AnySyncStateEvent},
events::{room::member::MembershipState, AnyStrippedStateEvent, AnySyncStateEvent},
serde::Raw,
OwnedUserId,
};
use serde::Deserialize;
use tracing::warn;
use tracing::{instrument, warn};
use super::Context;
use super::{profiles, Context};
use crate::{
store::{ambiguity_map::AmbiguityCache, Result as StoreResult},
RoomInfo,
};
pub fn collect_sync(
_context: &mut Context,
@@ -50,3 +60,52 @@ where
})
.unzip()
}
/// Dispatch the state events and return the new users for this room.
///
/// `raw_events` and `events` must be generated from [`collect_sync`]. Events
/// must be exactly the same list of events that are in raw_events, but
/// deserialised. We demand them here to avoid deserialising multiple times.
#[instrument(skip_all, fields(room_id = ?room_info.room_id))]
pub async fn dispatch_and_get_new_users(
context: &mut Context,
(raw_events, events): (&[Raw<AnySyncStateEvent>], &[AnySyncStateEvent]),
room_info: &mut RoomInfo,
ambiguity_cache: &mut AmbiguityCache,
) -> StoreResult<BTreeSet<OwnedUserId>> {
let mut user_ids = BTreeSet::new();
if raw_events.is_empty() {
return Ok(user_ids);
}
let mut state_events = BTreeMap::new();
for (raw_event, event) in iter::zip(raw_events, events) {
room_info.handle_state_event(event);
if let AnySyncStateEvent::RoomMember(member) = event {
ambiguity_cache
.handle_event(&context.state_changes, &room_info.room_id, member)
.await?;
match member.membership() {
MembershipState::Join | MembershipState::Invite => {
user_ids.insert(member.state_key().to_owned());
}
_ => (),
}
profiles::upsert_or_delete(context, &room_info.room_id, member);
}
state_events
.entry(event.event_type())
.or_insert_with(BTreeMap::new)
.insert(event.state_key().to_owned(), raw_event.clone());
}
context.state_changes.state.insert(room_info.room_id.clone(), state_events);
Ok(user_ids)
}

View File

@@ -393,18 +393,13 @@ impl BaseClient {
room_info.mark_state_partially_synced();
room_info.handle_encryption_state(requested_required_states);
let mut new_user_ids = if !state_events.is_empty() {
self.handle_state(
context,
&raw_state_events,
&state_events,
&mut room_info,
ambiguity_cache,
)
.await?
} else {
Default::default()
};
let mut new_user_ids = processors::state_events::dispatch_and_get_new_users(
context,
(&raw_state_events, &state_events),
&mut room_info,
ambiguity_cache,
)
.await?;
let push_rules = self.get_push_rules(global_account_data_processor).await?;