mirror of
https://github.com/matrix-org/matrix-rust-sdk.git
synced 2026-04-23 16:47:54 -04:00
feat (live location): rewrite live location shares subscription logic
This commit is contained in:
@@ -12,22 +12,153 @@
|
||||
// See the License for that specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
use crate::ruma::LocationContent;
|
||||
use std::{fmt::Debug, sync::Arc};
|
||||
|
||||
use eyeball_im::VectorDiff;
|
||||
use futures_util::StreamExt as _;
|
||||
use matrix_sdk::live_location_share::{
|
||||
LiveLocationShare as SdkLiveLocationShare, LiveLocationShares as SdkLiveLocationShares,
|
||||
};
|
||||
use matrix_sdk_common::{SendOutsideWasm, SyncOutsideWasm};
|
||||
|
||||
use crate::{ruma::LocationContent, runtime::get_runtime_handle, task_handle::TaskHandle};
|
||||
|
||||
/// Details of the last known location beacon.
|
||||
#[derive(uniffi::Record)]
|
||||
pub struct LastLocation {
|
||||
/// The most recent location content of the user.
|
||||
/// The most recent location content shared for this asset.
|
||||
pub location: LocationContent,
|
||||
/// A timestamp in milliseconds since Unix Epoch on that day in local
|
||||
/// time.
|
||||
/// The timestamp of when the location was updated.
|
||||
pub ts: u64,
|
||||
}
|
||||
/// Details of a users live location share.
|
||||
|
||||
/// Details of a user's live location share.
|
||||
#[derive(uniffi::Record)]
|
||||
pub struct LiveLocationShare {
|
||||
/// The user's last known location.
|
||||
pub last_location: LastLocation,
|
||||
/// The live status of the live location share.
|
||||
pub(crate) is_live: bool,
|
||||
/// The asset's last known location.
|
||||
pub last_location: Option<LastLocation>,
|
||||
/// The user ID of the person sharing their live location.
|
||||
pub user_id: String,
|
||||
/// The time when location sharing started.
|
||||
pub start_ts: u64,
|
||||
/// The duration that the location sharing will be live.
|
||||
/// Meaning that the location will stop being shared at ts + timeout.
|
||||
pub timeout: u64,
|
||||
}
|
||||
|
||||
/// An update to the list of active live location shares.
|
||||
///
|
||||
/// Corresponds to a [`VectorDiff`] on the underlying [`ObservableVector`].
|
||||
///
|
||||
/// [`ObservableVector`]: eyeball_im::ObservableVector
|
||||
#[derive(uniffi::Enum)]
|
||||
pub enum LiveLocationShareUpdate {
|
||||
Append { values: Vec<LiveLocationShare> },
|
||||
Clear,
|
||||
PushFront { value: LiveLocationShare },
|
||||
PushBack { value: LiveLocationShare },
|
||||
PopFront,
|
||||
PopBack,
|
||||
Insert { index: u32, value: LiveLocationShare },
|
||||
Set { index: u32, value: LiveLocationShare },
|
||||
Remove { index: u32 },
|
||||
Truncate { length: u32 },
|
||||
Reset { values: Vec<LiveLocationShare> },
|
||||
}
|
||||
|
||||
/// Listener for live location share updates.
|
||||
#[matrix_sdk_ffi_macros::export(callback_interface)]
|
||||
pub trait LiveLocationShareListener: SendOutsideWasm + SyncOutsideWasm + Debug {
|
||||
/// Called with a batch of [`LiveLocationShareUpdate`]s whenever the list
|
||||
/// of active shares changes.
|
||||
fn on_update(&self, updates: Vec<LiveLocationShareUpdate>);
|
||||
}
|
||||
|
||||
/// Tracks active live location shares in a room.
|
||||
///
|
||||
/// Holds the SDK [`SdkLiveLocationShares`] which keeps the beacon and
|
||||
/// beacon_info event handlers registered for as long as this object is alive.
|
||||
/// Call [`LiveLocationShares::subscribe`] to start receiving updates.
|
||||
#[derive(uniffi::Object)]
|
||||
pub struct LiveLocationShares {
|
||||
inner: SdkLiveLocationShares,
|
||||
}
|
||||
|
||||
impl LiveLocationShares {
|
||||
pub fn new(inner: SdkLiveLocationShares) -> Self {
|
||||
Self { inner }
|
||||
}
|
||||
}
|
||||
|
||||
#[matrix_sdk_ffi_macros::export]
|
||||
impl LiveLocationShares {
|
||||
/// Subscribe to changes in the list of active live location shares.
|
||||
///
|
||||
/// Immediately calls `listener` with a `Reset` update containing the
|
||||
/// current snapshot (if non-empty), then calls it again for every
|
||||
/// subsequent change that arrives from sync.
|
||||
///
|
||||
/// Returns a [`TaskHandle`] that, when dropped, stops the listener.
|
||||
/// The event handlers remain registered for as long as this
|
||||
/// [`LiveLocationShares`] object is alive.
|
||||
pub fn subscribe(&self, listener: Box<dyn LiveLocationShareListener>) -> Arc<TaskHandle> {
|
||||
let (initial_values, mut stream) = self.inner.subscribe();
|
||||
|
||||
if !initial_values.is_empty() {
|
||||
listener.on_update(vec![LiveLocationShareUpdate::Reset {
|
||||
values: initial_values.into_iter().map(Into::into).collect(),
|
||||
}]);
|
||||
}
|
||||
|
||||
Arc::new(TaskHandle::new(get_runtime_handle().spawn(async move {
|
||||
while let Some(diffs) = stream.next().await {
|
||||
listener.on_update(diffs.into_iter().map(Into::into).collect());
|
||||
}
|
||||
})))
|
||||
}
|
||||
}
|
||||
|
||||
impl From<SdkLiveLocationShare> for LiveLocationShare {
|
||||
fn from(share: SdkLiveLocationShare) -> Self {
|
||||
let start_ts = share.beacon_info.ts.0.into();
|
||||
let timeout = share.beacon_info.timeout.as_millis() as u64;
|
||||
let asset = share.beacon_info.asset.type_.into();
|
||||
let last_location = share.last_location.map(|l| LastLocation {
|
||||
location: LocationContent {
|
||||
body: "".to_owned(),
|
||||
geo_uri: l.location.uri.to_string(),
|
||||
description: None,
|
||||
zoom_level: None,
|
||||
asset,
|
||||
},
|
||||
ts: l.ts.0.into(),
|
||||
});
|
||||
LiveLocationShare { user_id: share.user_id.to_string(), last_location, start_ts, timeout }
|
||||
}
|
||||
}
|
||||
|
||||
impl From<VectorDiff<SdkLiveLocationShare>> for LiveLocationShareUpdate {
|
||||
fn from(diff: VectorDiff<SdkLiveLocationShare>) -> Self {
|
||||
match diff {
|
||||
VectorDiff::Append { values } => {
|
||||
Self::Append { values: values.into_iter().map(Into::into).collect() }
|
||||
}
|
||||
VectorDiff::Clear => Self::Clear,
|
||||
VectorDiff::PushFront { value } => Self::PushFront { value: value.into() },
|
||||
VectorDiff::PushBack { value } => Self::PushBack { value: value.into() },
|
||||
VectorDiff::PopFront => Self::PopFront,
|
||||
VectorDiff::PopBack => Self::PopBack,
|
||||
VectorDiff::Insert { index, value } => {
|
||||
Self::Insert { index: index as u32, value: value.into() }
|
||||
}
|
||||
VectorDiff::Set { index, value } => {
|
||||
Self::Set { index: index as u32, value: value.into() }
|
||||
}
|
||||
VectorDiff::Remove { index } => Self::Remove { index: index as u32 },
|
||||
VectorDiff::Truncate { length } => Self::Truncate { length: length as u32 },
|
||||
VectorDiff::Reset { values } => {
|
||||
Self::Reset { values: values.into_iter().map(Into::into).collect() }
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -46,7 +46,7 @@ use ruma::{
|
||||
},
|
||||
},
|
||||
};
|
||||
use tracing::{error, warn};
|
||||
use tracing::error;
|
||||
|
||||
use self::{power_levels::RoomPowerLevels, room_info::RoomInfo};
|
||||
use crate::{
|
||||
@@ -56,12 +56,10 @@ use crate::{
|
||||
error::{ClientError, MediaInfoError, NotYetImplemented, QueueWedgeError, RoomError},
|
||||
event::TimelineEvent,
|
||||
identity_status_change::IdentityStatusChange,
|
||||
live_location_share::{LastLocation, LiveLocationShare},
|
||||
live_location_share::LiveLocationShares,
|
||||
room_member::{RoomMember, RoomMemberWithSenderInfo},
|
||||
room_preview::RoomPreview,
|
||||
ruma::{
|
||||
AudioInfo, FileInfo, ImageInfo, LocationContent, MediaSource, ThumbnailInfo, VideoInfo,
|
||||
},
|
||||
ruma::{AudioInfo, FileInfo, ImageInfo, MediaSource, ThumbnailInfo, VideoInfo},
|
||||
runtime::get_runtime_handle,
|
||||
timeline::{
|
||||
AbstractProgress, LatestEventValue, ReceiptType, SendHandle, Timeline, UploadSource,
|
||||
@@ -1138,46 +1136,16 @@ impl Room {
|
||||
}))))
|
||||
}
|
||||
|
||||
/// Subscribes to live location shares in this room, using a `listener` to
|
||||
/// be notified of the changes.
|
||||
/// Returns the active live location shares for this room.
|
||||
///
|
||||
/// The current live location shares will be emitted immediately when
|
||||
/// subscribing, along with a [`TaskHandle`] to cancel the subscription.
|
||||
pub fn subscribe_to_live_location_shares(
|
||||
self: Arc<Self>,
|
||||
listener: Box<dyn LiveLocationShareListener>,
|
||||
) -> Arc<TaskHandle> {
|
||||
let room = self.inner.clone();
|
||||
|
||||
Arc::new(TaskHandle::new(get_runtime_handle().spawn(async move {
|
||||
let subscription = room.observe_live_location_shares();
|
||||
let stream = subscription.subscribe();
|
||||
let mut pinned_stream = pin!(stream);
|
||||
|
||||
while let Some(event) = pinned_stream.next().await {
|
||||
let last_location = LocationContent {
|
||||
body: "".to_owned(),
|
||||
geo_uri: event.last_location.location.uri.clone().to_string(),
|
||||
description: None,
|
||||
zoom_level: None,
|
||||
asset: event.beacon_info.as_ref().map(|b| b.asset.type_.clone()).into(),
|
||||
};
|
||||
|
||||
let Some(beacon_info) = event.beacon_info else {
|
||||
warn!("Live location share is missing the associated beacon_info state, skipping event.");
|
||||
continue;
|
||||
};
|
||||
|
||||
listener.call(vec![LiveLocationShare {
|
||||
last_location: LastLocation {
|
||||
location: last_location,
|
||||
ts: event.last_location.ts.0.into(),
|
||||
},
|
||||
is_live: beacon_info.is_live(),
|
||||
user_id: event.user_id.to_string(),
|
||||
}])
|
||||
}
|
||||
})))
|
||||
/// The returned [`LiveLocationShares`] object tracks which users are
|
||||
/// currently sharing their live location. It keeps the underlying event
|
||||
/// handlers registered — and therefore the share list up-to-date — for as
|
||||
/// long as it is alive. Call [`LiveLocationShares::subscribe`] on it to
|
||||
/// receive an initial snapshot and a stream of incremental updates.
|
||||
pub async fn live_location_shares(&self) -> Arc<LiveLocationShares> {
|
||||
let inner = self.inner.live_location_shares().await;
|
||||
Arc::new(LiveLocationShares::new(inner))
|
||||
}
|
||||
|
||||
/// Forget this room.
|
||||
@@ -1300,12 +1268,6 @@ impl Room {
|
||||
}
|
||||
}
|
||||
|
||||
/// A listener for receiving new live location shares in a room.
|
||||
#[matrix_sdk_ffi_macros::export(callback_interface)]
|
||||
pub trait LiveLocationShareListener: SyncOutsideWasm + SendOutsideWasm {
|
||||
fn call(&self, live_location_shares: Vec<LiveLocationShare>);
|
||||
}
|
||||
|
||||
/// A listener for receiving call decline events in a room.
|
||||
#[matrix_sdk_ffi_macros::export(callback_interface)]
|
||||
pub trait CallDeclineListener: SyncOutsideWasm + SendOutsideWasm {
|
||||
|
||||
@@ -16,71 +16,237 @@
|
||||
//!
|
||||
//! Live location sharing allows users to share their real-time location with
|
||||
//! others in a room via [MSC3489](https://github.com/matrix-org/matrix-spec-proposals/pull/3489).
|
||||
use async_stream::stream;
|
||||
use futures_util::Stream;
|
||||
|
||||
use std::sync::Arc;
|
||||
|
||||
use eyeball_im::{ObservableVector, VectorSubscriberBatchedStream};
|
||||
use imbl::Vector;
|
||||
use matrix_sdk_base::{deserialized_responses::SyncOrStrippedState, event_cache::Event};
|
||||
use matrix_sdk_common::locks::Mutex;
|
||||
use ruma::{
|
||||
MilliSecondsSinceUnixEpoch, OwnedUserId, RoomId,
|
||||
MilliSecondsSinceUnixEpoch, OwnedEventId, OwnedUserId,
|
||||
events::{
|
||||
beacon::OriginalSyncBeaconEvent, beacon_info::BeaconInfoEventContent,
|
||||
AnySyncMessageLikeEvent, AnySyncTimelineEvent, SyncStateEvent,
|
||||
beacon::OriginalSyncBeaconEvent,
|
||||
beacon_info::{BeaconInfoEventContent, OriginalSyncBeaconInfoEvent},
|
||||
location::LocationContent,
|
||||
relation::RelationType,
|
||||
},
|
||||
};
|
||||
|
||||
use crate::{Client, Room, event_handler::ObservableEventHandler};
|
||||
|
||||
/// An observable live location.
|
||||
#[derive(Debug)]
|
||||
pub struct ObservableLiveLocation {
|
||||
observable_room_events: ObservableEventHandler<(OriginalSyncBeaconEvent, Room)>,
|
||||
}
|
||||
|
||||
impl ObservableLiveLocation {
|
||||
/// Create a new `ObservableLiveLocation` for a particular room.
|
||||
pub fn new(client: &Client, room_id: &RoomId) -> Self {
|
||||
Self { observable_room_events: client.observe_room_events(room_id) }
|
||||
}
|
||||
|
||||
/// Get a stream of [`LiveLocationShare`].
|
||||
pub fn subscribe(&self) -> impl Stream<Item = LiveLocationShare> + use<> {
|
||||
let stream = self.observable_room_events.subscribe();
|
||||
|
||||
stream! {
|
||||
for await (event, room) in stream {
|
||||
if event.sender != room.own_user_id() {
|
||||
yield LiveLocationShare {
|
||||
last_location: LastLocation {
|
||||
location: event.content.location,
|
||||
ts: event.origin_server_ts,
|
||||
},
|
||||
beacon_info: room
|
||||
.get_user_beacon_info(&event.sender)
|
||||
.await
|
||||
.ok()
|
||||
.map(|info| info.content),
|
||||
user_id: event.sender,
|
||||
};
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
use super::Room;
|
||||
use crate::event_handler::EventHandlerDropGuard;
|
||||
|
||||
/// Details of the last known location beacon.
|
||||
#[derive(Clone, Debug)]
|
||||
pub struct LastLocation {
|
||||
/// The most recent location content of the user.
|
||||
/// The most recent location content of the asset.
|
||||
pub location: LocationContent,
|
||||
/// The timestamp of when the location was updated.
|
||||
pub ts: MilliSecondsSinceUnixEpoch,
|
||||
}
|
||||
|
||||
/// Details of a users live location share.
|
||||
/// Details of a user's live location share.
|
||||
#[derive(Clone, Debug)]
|
||||
pub struct LiveLocationShare {
|
||||
/// The user's last known location.
|
||||
pub last_location: LastLocation,
|
||||
/// Information about the associated beacon event.
|
||||
pub beacon_info: Option<BeaconInfoEventContent>,
|
||||
/// The user ID of the person sharing their live location.
|
||||
pub user_id: OwnedUserId,
|
||||
/// The asset's last known location, if any beacon has been received.
|
||||
pub last_location: Option<LastLocation>,
|
||||
/// The event ID of the beacon_info state event for this share.
|
||||
pub beacon_id: OwnedEventId,
|
||||
/// Information about the associated beacon event.
|
||||
pub beacon_info: BeaconInfoEventContent,
|
||||
}
|
||||
|
||||
/// Tracks active live location shares in a room using an [`ObservableVector`].
|
||||
///
|
||||
/// Registers event handlers for beacon (location update) and beacon info
|
||||
/// (share started/stopped) events and reflects changes into a vector that
|
||||
/// callers can subscribe to via [`LiveLocationShares::subscribe`].
|
||||
///
|
||||
/// Event handlers are automatically unregistered when this struct is dropped.
|
||||
#[derive(Debug)]
|
||||
pub struct LiveLocationShares {
|
||||
shares: Arc<Mutex<ObservableVector<LiveLocationShare>>>,
|
||||
_beacon_guard: EventHandlerDropGuard,
|
||||
_beacon_info_guard: EventHandlerDropGuard,
|
||||
}
|
||||
|
||||
impl LiveLocationShares {
|
||||
/// Create a new [`LiveLocationShares`] for the given room.
|
||||
///
|
||||
/// Loads the current active shares from the event cache as initial state,
|
||||
/// then begins listening for beacon events to keep the vector up-to-date.
|
||||
pub(super) async fn new(room: Room) -> Self {
|
||||
let mut shares = ObservableVector::new();
|
||||
let initial_shares =
|
||||
Self::get_initial_live_location_shares(&room).await.unwrap_or_default();
|
||||
shares.append(initial_shares);
|
||||
let shares = Arc::new(Mutex::new(shares));
|
||||
|
||||
let beacon_handle = room.add_event_handler({
|
||||
let shares = shares.clone();
|
||||
async move |event: OriginalSyncBeaconEvent| {
|
||||
Self::handle_beacon_event(&shares, event);
|
||||
}
|
||||
});
|
||||
let beacon_guard = room.client.event_handler_drop_guard(beacon_handle);
|
||||
let beacon_info_handle = room.add_event_handler({
|
||||
let shares = shares.clone();
|
||||
async move |event: OriginalSyncBeaconInfoEvent, room: Room| {
|
||||
Self::handle_beacon_info_event(&shares, &room, event).await;
|
||||
}
|
||||
});
|
||||
let beacon_info_guard = room.client.event_handler_drop_guard(beacon_info_handle);
|
||||
Self { shares, _beacon_guard: beacon_guard, _beacon_info_guard: beacon_info_guard }
|
||||
}
|
||||
|
||||
/// Subscribe to changes and updates in the live location shares.
|
||||
///
|
||||
/// Returns a snapshot of the current items alongside a batched stream of
|
||||
/// [`eyeball_im::VectorDiff`]s that describe subsequent changes.
|
||||
pub fn subscribe(
|
||||
&self,
|
||||
) -> (Vector<LiveLocationShare>, VectorSubscriberBatchedStream<LiveLocationShare>) {
|
||||
self.shares.lock().subscribe().into_values_and_batched_stream()
|
||||
}
|
||||
|
||||
/// Get all currently active live location shares in a room.
|
||||
async fn get_initial_live_location_shares(
|
||||
room: &Room,
|
||||
) -> crate::Result<Vector<LiveLocationShare>> {
|
||||
// Beacon infos are stored in the state store, not the event cache.
|
||||
let beacon_infos = room.get_state_events_static::<BeaconInfoEventContent>().await?;
|
||||
// Event cache is only needed for finding last location (optional).
|
||||
let event_cache = room.event_cache().await.ok();
|
||||
let mut shares = Vector::new();
|
||||
for raw_beacon_info in beacon_infos {
|
||||
let Ok(event) = raw_beacon_info.deserialize() else { continue };
|
||||
let Some((user_id, beacon_info, event_id)) = Self::extract_live_beacon_info(event)
|
||||
else {
|
||||
continue;
|
||||
};
|
||||
let last_location = match &event_cache {
|
||||
Some((cache, _drop_handles)) => Self::find_last_location(cache, &event_id).await,
|
||||
None => None,
|
||||
};
|
||||
shares.push_back(LiveLocationShare {
|
||||
user_id,
|
||||
beacon_info,
|
||||
beacon_id: event_id,
|
||||
last_location,
|
||||
});
|
||||
}
|
||||
Ok(shares)
|
||||
}
|
||||
|
||||
/// Extracts a live beacon info from a state event.
|
||||
///
|
||||
/// Returns `(user_id, content, event_id)`, or `None` if the event is
|
||||
/// redacted/stripped or not currently live.
|
||||
fn extract_live_beacon_info(
|
||||
event: SyncOrStrippedState<BeaconInfoEventContent>,
|
||||
) -> Option<(OwnedUserId, BeaconInfoEventContent, OwnedEventId)> {
|
||||
let SyncOrStrippedState::Sync(SyncStateEvent::Original(ev)) = event else {
|
||||
return None;
|
||||
};
|
||||
if !ev.content.is_live() {
|
||||
return None;
|
||||
}
|
||||
Some((ev.state_key, ev.content, ev.event_id))
|
||||
}
|
||||
|
||||
/// Finds the most recent beacon event referencing the given beacon_info
|
||||
/// event.
|
||||
///
|
||||
/// Beacon events use an `m.reference` relation to point to their
|
||||
/// originating `beacon_info` state event. The event cache's relation
|
||||
/// index lets us look them up directly by ID without scanning all
|
||||
/// cached events.
|
||||
async fn find_last_location(
|
||||
cache: &crate::event_cache::RoomEventCache,
|
||||
beacon_info_event_id: &OwnedEventId,
|
||||
) -> Option<LastLocation> {
|
||||
cache
|
||||
.find_event_relations(beacon_info_event_id, Some(vec![RelationType::Reference]))
|
||||
.await
|
||||
.ok()?
|
||||
.into_iter()
|
||||
.rev()
|
||||
.find_map(|e| Self::event_to_last_location(&e))
|
||||
}
|
||||
|
||||
/// Converts an [`Event`] to a [`LastLocation`] if it is a beacon event.
|
||||
fn event_to_last_location(event: &Event) -> Option<LastLocation> {
|
||||
if let Ok(AnySyncTimelineEvent::MessageLike(AnySyncMessageLikeEvent::Beacon(
|
||||
beacon_event,
|
||||
))) = event.kind.raw().deserialize()
|
||||
{
|
||||
beacon_event.as_original().map(|beacon| LastLocation {
|
||||
location: beacon.content.location.clone(),
|
||||
ts: beacon.origin_server_ts,
|
||||
})
|
||||
} else {
|
||||
None
|
||||
}
|
||||
}
|
||||
|
||||
/// Handles a single beacon event (location update).
|
||||
///
|
||||
/// Matches the beacon to its share via `relates_to.event_id`, which
|
||||
/// references the originating `beacon_info` state event.
|
||||
fn handle_beacon_event(
|
||||
shares: &Mutex<ObservableVector<LiveLocationShare>>,
|
||||
event: OriginalSyncBeaconEvent,
|
||||
) {
|
||||
let beacon_info_event_id = &event.content.relates_to.event_id;
|
||||
let mut shares = shares.lock();
|
||||
if let Some(idx) = shares.iter().position(|s| s.beacon_id == *beacon_info_event_id) {
|
||||
// Check if beacon info is still live, if not, remove the share and ignore the
|
||||
// beacon event.
|
||||
let mut share = shares[idx].clone();
|
||||
if !share.beacon_info.is_live() {
|
||||
shares.remove(idx);
|
||||
return;
|
||||
}
|
||||
let last_location =
|
||||
LastLocation { location: event.content.location, ts: event.origin_server_ts };
|
||||
share.last_location = Some(last_location);
|
||||
shares.set(idx, share);
|
||||
}
|
||||
}
|
||||
|
||||
/// Handles a single beacon_info state event (share started or stopped).
|
||||
///
|
||||
/// When a new beacon info is received for an already tracked user, the
|
||||
/// share is removed from the vector. If the new beacon info is live, we add
|
||||
/// it at the end of the vector, looking up the event cache to find any
|
||||
/// beacon event that may have arrived before the beacon_info.
|
||||
async fn handle_beacon_info_event(
|
||||
shares: &Mutex<ObservableVector<LiveLocationShare>>,
|
||||
room: &Room,
|
||||
event: OriginalSyncBeaconInfoEvent,
|
||||
) {
|
||||
{
|
||||
let mut shares = shares.lock();
|
||||
if let Some(idx) = shares.iter().position(|s| s.user_id == *event.state_key) {
|
||||
shares.remove(idx);
|
||||
}
|
||||
}
|
||||
if event.content.is_live() {
|
||||
let last_location = if let Ok((cache, _drop_handles)) = room.event_cache().await {
|
||||
Self::find_last_location(&cache, &event.event_id).await
|
||||
} else {
|
||||
None
|
||||
};
|
||||
let share = LiveLocationShare {
|
||||
user_id: event.state_key,
|
||||
beacon_id: event.event_id,
|
||||
beacon_info: event.content,
|
||||
last_location,
|
||||
};
|
||||
shares.lock().push_back(share);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -181,7 +181,7 @@ use crate::{
|
||||
error::{BeaconError, WrongRoomState},
|
||||
event_cache::{self, EventCacheDropHandles, RoomEventCache},
|
||||
event_handler::{EventHandler, EventHandlerDropGuard, EventHandlerHandle, SyncEvent},
|
||||
live_location_share::ObservableLiveLocation,
|
||||
live_location_share::LiveLocationShares,
|
||||
media::{MediaFormat, MediaRequestParameters},
|
||||
notification_settings::{IsEncrypted, IsOneToOne, RoomNotificationMode},
|
||||
room::{
|
||||
@@ -717,6 +717,17 @@ impl Room {
|
||||
IdentityStatusChanges::create_stream(self.clone()).await
|
||||
}
|
||||
|
||||
/// Subscribes to active live location shares in this room.
|
||||
///
|
||||
/// Returns a [`LiveLocationShares`] that holds the current state and
|
||||
/// exposes a stream of incremental [`eyeball_im::VectorDiff`] updates via
|
||||
/// [`LiveLocationShares::subscribe`].
|
||||
///
|
||||
/// Event handlers are active for as long as the returned struct is alive.
|
||||
pub async fn live_location_shares(&self) -> LiveLocationShares {
|
||||
LiveLocationShares::new(self.clone()).await
|
||||
}
|
||||
|
||||
/// Returns a wrapping `TimelineEvent` for the input `AnyTimelineEvent`,
|
||||
/// decrypted if needs be.
|
||||
///
|
||||
@@ -4075,17 +4086,6 @@ impl Room {
|
||||
}
|
||||
}
|
||||
|
||||
/// Observe live location sharing events for this room.
|
||||
///
|
||||
/// The returned observable will receive the newest event for each sync
|
||||
/// response that contains an `m.beacon` event.
|
||||
///
|
||||
/// Returns a stream of [`ObservableLiveLocation`] events from other users
|
||||
/// in the room, excluding the live location events of the room's own user.
|
||||
pub fn observe_live_location_shares(&self) -> ObservableLiveLocation {
|
||||
ObservableLiveLocation::new(&self.client, self.room_id())
|
||||
}
|
||||
|
||||
/// Subscribe to knock requests in this `Room`.
|
||||
///
|
||||
/// The current requests to join the room will be emitted immediately
|
||||
|
||||
@@ -1,42 +1,31 @@
|
||||
use std::time::{Duration, UNIX_EPOCH};
|
||||
use std::time::Duration;
|
||||
|
||||
use futures_util::{FutureExt, StreamExt as _, pin_mut};
|
||||
use js_int::uint;
|
||||
use matrix_sdk::{
|
||||
config::{SyncSettings, SyncToken},
|
||||
live_location_share::LiveLocationShare,
|
||||
test_utils::mocks::MatrixMockServer,
|
||||
assert_let_timeout, live_location_share::LiveLocationShare, test_utils::mocks::MatrixMockServer,
|
||||
};
|
||||
use matrix_sdk_test::{
|
||||
DEFAULT_TEST_ROOM_ID, JoinedRoomBuilder, SyncResponseBuilder, async_test,
|
||||
event_factory::EventFactory, mocks::mock_encryption_state, test_json,
|
||||
DEFAULT_TEST_ROOM_ID, JoinedRoomBuilder, async_test, event_factory::EventFactory,
|
||||
};
|
||||
use ruma::{
|
||||
EventId, MilliSecondsSinceUnixEpoch, event_id,
|
||||
events::{
|
||||
beacon::BeaconEventContent, beacon_info::BeaconInfoEventContent, location::AssetType,
|
||||
},
|
||||
owned_event_id, room_id,
|
||||
time::SystemTime,
|
||||
EventId, MilliSecondsSinceUnixEpoch, event_id, events::location::AssetType, owned_event_id,
|
||||
user_id,
|
||||
};
|
||||
use serde_json::json;
|
||||
use wiremock::{
|
||||
Mock, ResponseTemplate,
|
||||
matchers::{body_partial_json, header, method, path_regex},
|
||||
};
|
||||
|
||||
use crate::{logged_in_client_with_server, mock_sync};
|
||||
#[async_test]
|
||||
async fn test_send_location_beacon() {
|
||||
let (client, server) = logged_in_client_with_server().await;
|
||||
let server = MatrixMockServer::new().await;
|
||||
let client = server.client_builder().build().await;
|
||||
|
||||
server.mock_room_state_encryption().plain().mount().await;
|
||||
|
||||
// Validate request body and response, partial body matching due to
|
||||
// auto-generated `org.matrix.msc3488.ts`.
|
||||
Mock::given(method("PUT"))
|
||||
.and(path_regex(r"^/_matrix/client/r0/rooms/.*/send/org.matrix.msc3672.beacon/.*"))
|
||||
.and(header("authorization", "Bearer 1234"))
|
||||
.and(body_partial_json(json!({
|
||||
server
|
||||
.mock_room_send()
|
||||
.body_matches_partial_json(json!({
|
||||
"m.relates_to": {
|
||||
"event_id": "$15139375514XsgmR:localhost",
|
||||
"rel_type": "m.reference"
|
||||
@@ -44,57 +33,33 @@ async fn test_send_location_beacon() {
|
||||
"org.matrix.msc3488.location": {
|
||||
"uri": "geo:48.8588448,2.2943506"
|
||||
}
|
||||
})))
|
||||
.respond_with(ResponseTemplate::new(200).set_body_json(&*test_json::EVENT_ID))
|
||||
.mount(&server)
|
||||
}))
|
||||
.ok(event_id!("$h29iv0s8:example.com"))
|
||||
.mock_once()
|
||||
.mount()
|
||||
.await;
|
||||
|
||||
let current_timestamp =
|
||||
SystemTime::now().duration_since(UNIX_EPOCH).expect("Time went backwards").as_millis()
|
||||
as u64;
|
||||
let current_time = MilliSecondsSinceUnixEpoch::now();
|
||||
let f = EventFactory::new();
|
||||
|
||||
mock_sync(
|
||||
&server,
|
||||
json!({
|
||||
"next_batch": "s526_47314_0_7_1_1_1_1_1",
|
||||
"rooms": {
|
||||
"join": {
|
||||
*DEFAULT_TEST_ROOM_ID: {
|
||||
"state": {
|
||||
"events": [
|
||||
{
|
||||
"content": {
|
||||
"description": "Live Share",
|
||||
"live": true,
|
||||
"org.matrix.msc3488.ts": current_timestamp,
|
||||
"timeout": 600_000,
|
||||
"org.matrix.msc3488.asset": { "type": "m.self" }
|
||||
},
|
||||
"event_id": "$15139375514XsgmR:localhost",
|
||||
"origin_server_ts": 1_636_829_458,
|
||||
"sender": "@example:localhost",
|
||||
"state_key": "@example:localhost",
|
||||
"type": "org.matrix.msc3672.beacon_info",
|
||||
"unsigned": {
|
||||
"age": 7034220
|
||||
}
|
||||
},
|
||||
]
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
let beacon_info_event = f
|
||||
.beacon_info(
|
||||
Some("Live Share".to_owned()),
|
||||
Duration::from_millis(600_000),
|
||||
true,
|
||||
Some(current_time),
|
||||
)
|
||||
.event_id(event_id!("$15139375514XsgmR:localhost"))
|
||||
.sender(user_id!("@example:localhost"))
|
||||
.state_key(user_id!("@example:localhost"))
|
||||
.into_raw_sync_state();
|
||||
|
||||
}),
|
||||
None,
|
||||
)
|
||||
.await;
|
||||
|
||||
mock_encryption_state(&server, false).await;
|
||||
|
||||
let sync_settings = SyncSettings::new().timeout(Duration::from_millis(3000));
|
||||
|
||||
client.sync_once(sync_settings).await.unwrap();
|
||||
server
|
||||
.sync_room(
|
||||
&client,
|
||||
JoinedRoomBuilder::new(*DEFAULT_TEST_ROOM_ID).add_state_event(beacon_info_event),
|
||||
)
|
||||
.await;
|
||||
|
||||
let room = client.get_room(&DEFAULT_TEST_ROOM_ID).unwrap();
|
||||
|
||||
@@ -105,15 +70,10 @@ async fn test_send_location_beacon() {
|
||||
|
||||
#[async_test]
|
||||
async fn test_send_location_beacon_fails_without_starting_live_share() {
|
||||
let (client, server) = logged_in_client_with_server().await;
|
||||
|
||||
mock_sync(&server, &*test_json::SYNC, None).await;
|
||||
|
||||
let sync_settings = SyncSettings::new().timeout(Duration::from_millis(3000));
|
||||
client.sync_once(sync_settings).await.unwrap();
|
||||
|
||||
let room = client.get_room(&DEFAULT_TEST_ROOM_ID).unwrap();
|
||||
let server = MatrixMockServer::new().await;
|
||||
let client = server.client_builder().build().await;
|
||||
|
||||
let room = server.sync_joined_room(&client, *DEFAULT_TEST_ROOM_ID).await;
|
||||
let response = room.send_location_beacon("geo:48.8588448,2.2943506".to_owned()).await;
|
||||
|
||||
assert!(response.is_err());
|
||||
@@ -121,48 +81,29 @@ async fn test_send_location_beacon_fails_without_starting_live_share() {
|
||||
|
||||
#[async_test]
|
||||
async fn test_send_location_beacon_with_expired_live_share() {
|
||||
let (client, server) = logged_in_client_with_server().await;
|
||||
let server = MatrixMockServer::new().await;
|
||||
let client = server.client_builder().build().await;
|
||||
|
||||
mock_sync(
|
||||
&server,
|
||||
json!({
|
||||
"next_batch": "s526_47314_0_7_1_1_1_1_1",
|
||||
"rooms": {
|
||||
"join": {
|
||||
*DEFAULT_TEST_ROOM_ID: {
|
||||
"state": {
|
||||
"events": [
|
||||
{
|
||||
"content": {
|
||||
"description": "Live Share",
|
||||
"live": false,
|
||||
"org.matrix.msc3488.ts": 1_636_829_458,
|
||||
"timeout": 3000,
|
||||
"org.matrix.msc3488.asset": { "type": "m.self" }
|
||||
},
|
||||
"event_id": "$15139375514XsgmR:localhost",
|
||||
"origin_server_ts": 1_636_829_458,
|
||||
"sender": "@example2:localhost",
|
||||
"state_key": "@example2:localhost",
|
||||
"type": "org.matrix.msc3672.beacon_info",
|
||||
"unsigned": {
|
||||
"age": 7034220
|
||||
}
|
||||
},
|
||||
]
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
let f = EventFactory::new();
|
||||
|
||||
}),
|
||||
None,
|
||||
)
|
||||
.await;
|
||||
let beacon_info_event = f
|
||||
.beacon_info(
|
||||
Some("Live Share".to_owned()),
|
||||
Duration::from_millis(3000),
|
||||
false,
|
||||
Some(MilliSecondsSinceUnixEpoch(uint!(1_636_829_458))),
|
||||
)
|
||||
.event_id(event_id!("$15139375514XsgmR:localhost"))
|
||||
.sender(user_id!("@example2:localhost"))
|
||||
.state_key(user_id!("@example2:localhost"))
|
||||
.into_raw_sync_state();
|
||||
|
||||
let sync_settings = SyncSettings::new().timeout(Duration::from_millis(3000));
|
||||
|
||||
client.sync_once(sync_settings).await.unwrap();
|
||||
server
|
||||
.sync_room(
|
||||
&client,
|
||||
JoinedRoomBuilder::new(*DEFAULT_TEST_ROOM_ID).add_state_event(beacon_info_event),
|
||||
)
|
||||
.await;
|
||||
|
||||
let room = client.get_room(&DEFAULT_TEST_ROOM_ID).unwrap();
|
||||
|
||||
@@ -173,69 +114,39 @@ async fn test_send_location_beacon_with_expired_live_share() {
|
||||
|
||||
#[async_test]
|
||||
async fn test_most_recent_event_in_stream() {
|
||||
let (client, server) = logged_in_client_with_server().await;
|
||||
|
||||
let mut sync_builder = SyncResponseBuilder::new();
|
||||
let server = MatrixMockServer::new().await;
|
||||
let client = server.client_builder().build().await;
|
||||
|
||||
let current_time = MilliSecondsSinceUnixEpoch::now();
|
||||
let millis_time = current_time
|
||||
.to_system_time()
|
||||
.unwrap()
|
||||
.duration_since(UNIX_EPOCH)
|
||||
.expect("Time went backwards")
|
||||
.as_millis() as u64;
|
||||
let f = EventFactory::new();
|
||||
|
||||
mock_sync(
|
||||
&server,
|
||||
json!({
|
||||
"next_batch": "s526_47314_0_7_1_1_1_1_1",
|
||||
"rooms": {
|
||||
"join": {
|
||||
*DEFAULT_TEST_ROOM_ID: {
|
||||
"state": {
|
||||
"events": [
|
||||
{
|
||||
"content": {
|
||||
"description": "Live Share",
|
||||
"live": true,
|
||||
"org.matrix.msc3488.ts": millis_time,
|
||||
"timeout": 3000,
|
||||
"org.matrix.msc3488.asset": { "type": "m.self" }
|
||||
},
|
||||
"event_id": "$15139375514XsgmR:localhost",
|
||||
"origin_server_ts": millis_time,
|
||||
"sender": "@example2:localhost",
|
||||
"state_key": "@example2:localhost",
|
||||
"type": "org.matrix.msc3672.beacon_info",
|
||||
"unsigned": {
|
||||
"age": 7034220
|
||||
}
|
||||
},
|
||||
]
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
let beacon_info_event = f
|
||||
.beacon_info(
|
||||
Some("Live Share".to_owned()),
|
||||
Duration::from_millis(3_600_000),
|
||||
true,
|
||||
Some(current_time),
|
||||
)
|
||||
.event_id(event_id!("$15139375514XsgmR:localhost"))
|
||||
.sender(user_id!("@example2:localhost"))
|
||||
.state_key(user_id!("@example2:localhost"))
|
||||
.into_raw_sync_state();
|
||||
|
||||
}),
|
||||
None,
|
||||
)
|
||||
.await;
|
||||
let sync_settings =
|
||||
SyncSettings::new().timeout(Duration::from_millis(3000)).token(SyncToken::NoToken);
|
||||
let _response = client.sync_once(sync_settings.clone()).await.unwrap();
|
||||
server.reset().await;
|
||||
server
|
||||
.sync_room(
|
||||
&client,
|
||||
JoinedRoomBuilder::new(*DEFAULT_TEST_ROOM_ID).add_state_event(beacon_info_event),
|
||||
)
|
||||
.await;
|
||||
|
||||
// Enable the event cache so the initial snapshot can be loaded from it.
|
||||
client.event_cache().subscribe().unwrap();
|
||||
|
||||
let room = client.get_room(*DEFAULT_TEST_ROOM_ID).unwrap();
|
||||
|
||||
let observable_live_location_shares = room.observe_live_location_shares();
|
||||
let stream = observable_live_location_shares.subscribe();
|
||||
pin_mut!(stream);
|
||||
|
||||
let mut timeline_events = Vec::new();
|
||||
|
||||
let f = EventFactory::new();
|
||||
for nth in 0..25 {
|
||||
for nth in 0..10 {
|
||||
let event_id = format!("$event_for_stream_{nth}");
|
||||
timeline_events.push(
|
||||
f.beacon(
|
||||
@@ -253,175 +164,467 @@ async fn test_most_recent_event_in_stream() {
|
||||
);
|
||||
}
|
||||
|
||||
sync_builder.add_joined_room(
|
||||
JoinedRoomBuilder::new(*DEFAULT_TEST_ROOM_ID).add_timeline_bulk(timeline_events),
|
||||
);
|
||||
let mut event_cache_updates_stream = client.event_cache().subscribe_to_room_generic_updates();
|
||||
|
||||
mock_sync(&server, sync_builder.build_json_sync_response(), None).await;
|
||||
let _response = client.sync_once(sync_settings.clone()).await.unwrap();
|
||||
server.reset().await;
|
||||
server
|
||||
.sync_room(
|
||||
&client,
|
||||
JoinedRoomBuilder::new(*DEFAULT_TEST_ROOM_ID).add_timeline_bulk(timeline_events),
|
||||
)
|
||||
.await;
|
||||
|
||||
// Stream should only process the latest beacon event for the user, ignoring any
|
||||
// previous events.
|
||||
let LiveLocationShare { user_id, last_location, beacon_info } =
|
||||
stream.next().await.expect("Another live location was expected");
|
||||
// Wait for the event cache background task to commit the events before
|
||||
// querying.
|
||||
assert_let_timeout!(Ok(_) = event_cache_updates_stream.recv());
|
||||
|
||||
// Create the stream after syncing all beacon events — the initial snapshot is
|
||||
// loaded from the event cache and already reflects the latest beacon.
|
||||
let live_location_shares = room.live_location_shares().await;
|
||||
let (mut shares, _stream) = live_location_shares.subscribe();
|
||||
|
||||
assert_eq!(shares.len(), 1);
|
||||
let LiveLocationShare { user_id, last_location, beacon_info, .. } = shares.remove(0);
|
||||
|
||||
assert_eq!(user_id.to_string(), "@example2:localhost");
|
||||
|
||||
let last_location = last_location.expect("Expected last location");
|
||||
assert_eq!(
|
||||
last_location.location.uri,
|
||||
format!("geo:{},{};u=24", 24.9575274619722, 12.494122581370175)
|
||||
format!("geo:{},{};u=9", 9.9575274619722, 12.494122581370175)
|
||||
);
|
||||
|
||||
assert!(last_location.location.description.is_none());
|
||||
assert!(last_location.location.zoom_level.is_none());
|
||||
assert_eq!(last_location.ts, MilliSecondsSinceUnixEpoch(uint!(1_636_829_458)));
|
||||
|
||||
let beacon_info = beacon_info.expect("Live location share is missing the beacon_info");
|
||||
|
||||
assert!(beacon_info.live);
|
||||
assert!(beacon_info.is_live());
|
||||
assert_eq!(beacon_info.description, Some("Live Share".to_owned()));
|
||||
assert_eq!(beacon_info.timeout, Duration::from_millis(3000));
|
||||
assert_eq!(beacon_info.timeout, Duration::from_millis(3_600_000));
|
||||
assert_eq!(beacon_info.ts, current_time);
|
||||
assert_eq!(beacon_info.asset.type_, AssetType::Self_);
|
||||
}
|
||||
|
||||
#[async_test]
|
||||
async fn test_observe_single_live_location_share() {
|
||||
let (client, server) = logged_in_client_with_server().await;
|
||||
let server = MatrixMockServer::new().await;
|
||||
let client = server.client_builder().build().await;
|
||||
|
||||
let current_time = MilliSecondsSinceUnixEpoch::now();
|
||||
let millis_time = current_time
|
||||
.to_system_time()
|
||||
.unwrap()
|
||||
.duration_since(UNIX_EPOCH)
|
||||
.expect("Time went backwards")
|
||||
.as_millis() as u64;
|
||||
let f = EventFactory::new();
|
||||
|
||||
mock_sync(
|
||||
&server,
|
||||
json!({
|
||||
"next_batch": "s526_47314_0_7_1_1_1_1_1",
|
||||
"rooms": {
|
||||
"join": {
|
||||
*DEFAULT_TEST_ROOM_ID: {
|
||||
"state": {
|
||||
"events": [
|
||||
{
|
||||
"content": {
|
||||
"description": "Test Live Share",
|
||||
"live": true,
|
||||
"org.matrix.msc3488.ts": millis_time,
|
||||
"timeout": 3000,
|
||||
"org.matrix.msc3488.asset": { "type": "m.self" }
|
||||
},
|
||||
"event_id": "$test_beacon_info",
|
||||
"origin_server_ts": millis_time,
|
||||
"sender": "@example2:localhost",
|
||||
"state_key": "@example2:localhost",
|
||||
"type": "org.matrix.msc3672.beacon_info",
|
||||
}
|
||||
]
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}),
|
||||
None,
|
||||
)
|
||||
.await;
|
||||
let beacon_info_event = f
|
||||
.beacon_info(
|
||||
Some("Test Live Share".to_owned()),
|
||||
Duration::from_millis(3_600_000),
|
||||
true,
|
||||
Some(current_time),
|
||||
)
|
||||
.event_id(event_id!("$test_beacon_info"))
|
||||
.sender(user_id!("@example2:localhost"))
|
||||
.state_key(user_id!("@example2:localhost"))
|
||||
.into_raw_sync_state();
|
||||
|
||||
let sync_settings =
|
||||
SyncSettings::new().timeout(Duration::from_millis(3000)).token(SyncToken::NoToken);
|
||||
let _response = client.sync_once(sync_settings.clone()).await.unwrap();
|
||||
server.reset().await;
|
||||
server
|
||||
.sync_room(
|
||||
&client,
|
||||
JoinedRoomBuilder::new(*DEFAULT_TEST_ROOM_ID).add_state_event(beacon_info_event),
|
||||
)
|
||||
.await;
|
||||
|
||||
let room = client.get_room(*DEFAULT_TEST_ROOM_ID).unwrap();
|
||||
let observable_live_location_shares = room.observe_live_location_shares();
|
||||
let stream = observable_live_location_shares.subscribe();
|
||||
let live_location_shares = room.live_location_shares().await;
|
||||
let (initial, stream) = live_location_shares.subscribe();
|
||||
pin_mut!(stream);
|
||||
|
||||
let timeline_event = EventFactory::new()
|
||||
.beacon(
|
||||
owned_event_id!("$test_beacon_info"),
|
||||
10.000000,
|
||||
20.000000,
|
||||
5,
|
||||
Some(MilliSecondsSinceUnixEpoch(1_636_829_458u32.into())),
|
||||
)
|
||||
// Initial snapshot contains the beacon_info from state (no last_location yet).
|
||||
assert_eq!(initial.len(), 1);
|
||||
assert!(initial[0].last_location.is_none());
|
||||
|
||||
let timeline_event = f
|
||||
.beacon(owned_event_id!("$test_beacon_info"), 10.0, 20.0, 5, Some(current_time))
|
||||
.event_id(event_id!("$location_event"))
|
||||
.server_ts(millis_time)
|
||||
.server_ts(current_time)
|
||||
.sender(user_id!("@example2:localhost"))
|
||||
.into_raw_sync();
|
||||
|
||||
mock_sync(
|
||||
&server,
|
||||
SyncResponseBuilder::new()
|
||||
.add_joined_room(
|
||||
JoinedRoomBuilder::new(*DEFAULT_TEST_ROOM_ID).add_timeline_event(timeline_event),
|
||||
)
|
||||
.build_json_sync_response(),
|
||||
None,
|
||||
)
|
||||
.await;
|
||||
server
|
||||
.sync_room(
|
||||
&client,
|
||||
JoinedRoomBuilder::new(*DEFAULT_TEST_ROOM_ID).add_timeline_event(timeline_event),
|
||||
)
|
||||
.await;
|
||||
|
||||
let _response = client.sync_once(sync_settings.clone()).await.unwrap();
|
||||
server.reset().await;
|
||||
|
||||
let LiveLocationShare { user_id, last_location, beacon_info } =
|
||||
stream.next().await.expect("Another live location was expected");
|
||||
let diffs = stream.next().await.expect("Expected live location share update");
|
||||
let mut shares = diffs.into_iter().fold(initial, |mut v, diff| {
|
||||
diff.apply(&mut v);
|
||||
v
|
||||
});
|
||||
// Beacon handler updates the existing share with last_location
|
||||
assert_eq!(shares.len(), 1);
|
||||
let LiveLocationShare { user_id, last_location, beacon_info, .. } = shares.remove(0);
|
||||
|
||||
assert_eq!(user_id.to_string(), "@example2:localhost");
|
||||
assert_eq!(last_location.location.uri, format!("geo:{},{};u=5", 10.000000, 20.000000));
|
||||
let last_location = last_location.expect("Expected last location");
|
||||
assert_eq!(last_location.location.uri, "geo:10,20;u=5");
|
||||
assert_eq!(last_location.ts, current_time);
|
||||
|
||||
let beacon_info = beacon_info.expect("Live location share is missing the beacon_info");
|
||||
|
||||
assert!(beacon_info.live);
|
||||
assert!(beacon_info.is_live());
|
||||
assert_eq!(beacon_info.description, Some("Test Live Share".to_owned()));
|
||||
assert_eq!(beacon_info.timeout, Duration::from_millis(3000));
|
||||
assert_eq!(beacon_info.timeout, Duration::from_millis(3_600_000));
|
||||
assert_eq!(beacon_info.ts, current_time);
|
||||
}
|
||||
|
||||
#[async_test]
|
||||
async fn test_observing_live_location_does_not_return_own_beacon_updates() {
|
||||
async fn test_observing_live_location_does_not_return_non_live() {
|
||||
let server = MatrixMockServer::new().await;
|
||||
let client = server.client_builder().build().await;
|
||||
let room_id = room_id!("!a:b.c");
|
||||
let event_id = event_id!("$a:b.c");
|
||||
let user_id = user_id!("@example:localhost");
|
||||
|
||||
let f = EventFactory::new().room(room_id);
|
||||
let f = EventFactory::new();
|
||||
|
||||
let joined_room_builder = JoinedRoomBuilder::new(room_id).add_state_bulk(vec![
|
||||
f.event(BeaconInfoEventContent::new(None, Duration::from_secs(60), false, None))
|
||||
.event_id(event_id)
|
||||
.sender(user_id)
|
||||
.state_key(user_id)
|
||||
.into(),
|
||||
]);
|
||||
// The user's beacon_info is NOT live.
|
||||
let beacon_info_event = f
|
||||
.beacon_info(None, Duration::from_millis(60_000), false, None)
|
||||
.event_id(event_id!("$15139375514XsgmR:localhost"))
|
||||
.sender(user_id!("@example:localhost"))
|
||||
.state_key(user_id!("@example:localhost"))
|
||||
.into_raw_sync_state();
|
||||
|
||||
let room = server.sync_room(&client, joined_room_builder).await;
|
||||
server
|
||||
.sync_room(
|
||||
&client,
|
||||
JoinedRoomBuilder::new(*DEFAULT_TEST_ROOM_ID).add_state_event(beacon_info_event),
|
||||
)
|
||||
.await;
|
||||
|
||||
let observable_live_location_shares = room.observe_live_location_shares();
|
||||
let stream = observable_live_location_shares.subscribe();
|
||||
let room = client.get_room(*DEFAULT_TEST_ROOM_ID).unwrap();
|
||||
let live_location_shares = room.live_location_shares().await;
|
||||
let (initial, stream) = live_location_shares.subscribe();
|
||||
pin_mut!(stream);
|
||||
|
||||
// Initial is empty because beacon_info is not live.
|
||||
assert!(initial.is_empty());
|
||||
|
||||
// A beacon event arrives, but beacon_info is not live — should not yield.
|
||||
let beacon_event = f
|
||||
.event(BeaconEventContent::new(
|
||||
owned_event_id!("$15139375514XsgmR:localhost"),
|
||||
"geo:51.5008,0.1247;u=35".to_owned(),
|
||||
Some(MilliSecondsSinceUnixEpoch(uint!(1_636_829_458))),
|
||||
))
|
||||
.beacon(owned_event_id!("$15139375514XsgmR:localhost"), 51.5008, 0.1247, 35, None)
|
||||
.event_id(event_id!("$152037dfsef280074GZeOm:localhost"))
|
||||
.sender(user_id)
|
||||
.sender(user_id!("@example:localhost"))
|
||||
.into_raw_sync();
|
||||
|
||||
let joined = JoinedRoomBuilder::new(room_id).add_timeline_event(beacon_event);
|
||||
|
||||
let _ = server.sync_room(&client, joined).await;
|
||||
server
|
||||
.sync_room(
|
||||
&client,
|
||||
JoinedRoomBuilder::new(*DEFAULT_TEST_ROOM_ID).add_timeline_event(beacon_event),
|
||||
)
|
||||
.await;
|
||||
|
||||
assert!(stream.next().now_or_never().is_none());
|
||||
}
|
||||
|
||||
#[async_test]
|
||||
async fn test_location_update_for_already_tracked_user() {
|
||||
let server = MatrixMockServer::new().await;
|
||||
let client = server.client_builder().build().await;
|
||||
|
||||
let now = MilliSecondsSinceUnixEpoch::now();
|
||||
let f = EventFactory::new();
|
||||
|
||||
// Set up alice with a live beacon_info.
|
||||
let beacon_info_event = f
|
||||
.beacon_info(
|
||||
Some("Alice location".to_owned()),
|
||||
Duration::from_millis(300_000),
|
||||
true,
|
||||
Some(now),
|
||||
)
|
||||
.event_id(event_id!("$alice_beacon_info"))
|
||||
.sender(user_id!("@alice:localhost"))
|
||||
.state_key(user_id!("@alice:localhost"))
|
||||
.into_raw_sync_state();
|
||||
|
||||
server
|
||||
.sync_room(
|
||||
&client,
|
||||
JoinedRoomBuilder::new(*DEFAULT_TEST_ROOM_ID).add_state_event(beacon_info_event),
|
||||
)
|
||||
.await;
|
||||
|
||||
let room = client.get_room(*DEFAULT_TEST_ROOM_ID).unwrap();
|
||||
let live_location_shares = room.live_location_shares().await;
|
||||
let (initial, stream) = live_location_shares.subscribe();
|
||||
pin_mut!(stream);
|
||||
|
||||
// Initial snapshot contains the beacon_info from state (no last_location yet).
|
||||
assert_eq!(initial.len(), 1);
|
||||
assert!(initial[0].last_location.is_none());
|
||||
|
||||
// Alice's first beacon — updates the existing share with last_location.
|
||||
server
|
||||
.sync_room(
|
||||
&client,
|
||||
JoinedRoomBuilder::new(*DEFAULT_TEST_ROOM_ID).add_timeline_event(
|
||||
f.beacon(owned_event_id!("$alice_beacon_info"), 10.0, 20.0, 5, None)
|
||||
.event_id(event_id!("$loc1"))
|
||||
.sender(user_id!("@alice:localhost"))
|
||||
.server_ts(now)
|
||||
.into_raw_sync(),
|
||||
),
|
||||
)
|
||||
.await;
|
||||
|
||||
let diffs = stream.next().await.expect("Expected first location");
|
||||
let shares = diffs.into_iter().fold(initial, |mut v, diff| {
|
||||
diff.apply(&mut v);
|
||||
v
|
||||
});
|
||||
assert_eq!(shares.len(), 1);
|
||||
let share = &shares[0];
|
||||
assert_eq!(share.user_id, user_id!("@alice:localhost"));
|
||||
let last_location = share.last_location.as_ref().expect("Expected last location");
|
||||
assert_eq!(last_location.location.uri, "geo:10,20;u=5");
|
||||
assert_eq!(share.beacon_info.description, Some("Alice location".to_owned()));
|
||||
|
||||
// Alice's second beacon — already tracked, beacon_info is reused from cache.
|
||||
server
|
||||
.sync_room(
|
||||
&client,
|
||||
JoinedRoomBuilder::new(*DEFAULT_TEST_ROOM_ID).add_timeline_event(
|
||||
f.beacon(owned_event_id!("$alice_beacon_info"), 30.0, 40.0, 10, None)
|
||||
.event_id(event_id!("$loc2"))
|
||||
.sender(user_id!("@alice:localhost"))
|
||||
.server_ts(now)
|
||||
.into_raw_sync(),
|
||||
),
|
||||
)
|
||||
.await;
|
||||
|
||||
let diffs = stream.next().await.expect("Expected second location update");
|
||||
let shares = diffs.into_iter().fold(shares, |mut v, diff| {
|
||||
diff.apply(&mut v);
|
||||
v
|
||||
});
|
||||
assert_eq!(shares.len(), 1);
|
||||
let LiveLocationShare { user_id, last_location, beacon_info, .. } = shares[0].clone();
|
||||
assert_eq!(user_id, user_id!("@alice:localhost"));
|
||||
let last_location = last_location.expect("Expected last location");
|
||||
assert_eq!(last_location.location.uri, "geo:30,40;u=10");
|
||||
// beacon_info is preserved from the initial share — not re-fetched from state.
|
||||
assert_eq!(beacon_info.description, Some("Alice location".to_owned()));
|
||||
}
|
||||
|
||||
#[async_test]
|
||||
async fn test_beacon_info_stop_removes_user_from_stream() {
|
||||
let server = MatrixMockServer::new().await;
|
||||
let client = server.client_builder().build().await;
|
||||
|
||||
let now = MilliSecondsSinceUnixEpoch::now();
|
||||
let f = EventFactory::new();
|
||||
|
||||
// Alice starts with a live beacon_info.
|
||||
let beacon_info_event = f
|
||||
.beacon_info(None, Duration::from_millis(300_000), true, Some(now))
|
||||
.event_id(event_id!("$alice_beacon_info"))
|
||||
.sender(user_id!("@alice:localhost"))
|
||||
.state_key(user_id!("@alice:localhost"))
|
||||
.into_raw_sync_state();
|
||||
|
||||
server
|
||||
.sync_room(
|
||||
&client,
|
||||
JoinedRoomBuilder::new(*DEFAULT_TEST_ROOM_ID).add_state_event(beacon_info_event),
|
||||
)
|
||||
.await;
|
||||
|
||||
let room = client.get_room(*DEFAULT_TEST_ROOM_ID).unwrap();
|
||||
let live_location_shares = room.live_location_shares().await;
|
||||
let (initial, stream) = live_location_shares.subscribe();
|
||||
pin_mut!(stream);
|
||||
|
||||
// Initial snapshot contains the beacon_info from state (no last_location yet).
|
||||
assert_eq!(initial.len(), 1);
|
||||
assert!(initial[0].last_location.is_none());
|
||||
|
||||
// Alice stops her share — beacon_info timeline event with live: false.
|
||||
server
|
||||
.sync_room(
|
||||
&client,
|
||||
JoinedRoomBuilder::new(*DEFAULT_TEST_ROOM_ID).add_timeline_event(
|
||||
f.beacon_info(None, Duration::from_secs(300), false, None)
|
||||
.event_id(event_id!("$alice_beacon_info_stop"))
|
||||
.sender(user_id!("@alice:localhost"))
|
||||
.state_key(user_id!("@alice:localhost"))
|
||||
.into_raw_sync(),
|
||||
),
|
||||
)
|
||||
.await;
|
||||
|
||||
// Stream emits an update with an empty list — alice is removed.
|
||||
let diffs = stream.next().await.expect("Expected share removal");
|
||||
let shares = diffs.into_iter().fold(initial, |mut v, diff| {
|
||||
diff.apply(&mut v);
|
||||
v
|
||||
});
|
||||
assert!(shares.is_empty());
|
||||
}
|
||||
|
||||
#[async_test]
|
||||
async fn test_multiple_users_in_stream() {
|
||||
let server = MatrixMockServer::new().await;
|
||||
let client = server.client_builder().build().await;
|
||||
|
||||
let now = MilliSecondsSinceUnixEpoch::now();
|
||||
let f = EventFactory::new();
|
||||
|
||||
// Both alice and bob have live beacon_infos.
|
||||
let alice_beacon_info = f
|
||||
.beacon_info(None, Duration::from_millis(300_000), true, Some(now))
|
||||
.event_id(event_id!("$alice_beacon_info"))
|
||||
.sender(user_id!("@alice:localhost"))
|
||||
.state_key(user_id!("@alice:localhost"))
|
||||
.into_raw_sync_state();
|
||||
|
||||
let bob_beacon_info = f
|
||||
.beacon_info(None, Duration::from_millis(300_000), true, Some(now))
|
||||
.event_id(event_id!("$bob_beacon_info"))
|
||||
.sender(user_id!("@bob:localhost"))
|
||||
.state_key(user_id!("@bob:localhost"))
|
||||
.into_raw_sync_state();
|
||||
|
||||
server
|
||||
.sync_room(
|
||||
&client,
|
||||
JoinedRoomBuilder::new(*DEFAULT_TEST_ROOM_ID)
|
||||
.add_state_event(alice_beacon_info)
|
||||
.add_state_event(bob_beacon_info),
|
||||
)
|
||||
.await;
|
||||
|
||||
let room = client.get_room(*DEFAULT_TEST_ROOM_ID).unwrap();
|
||||
let live_location_shares = room.live_location_shares().await;
|
||||
let (initial, stream) = live_location_shares.subscribe();
|
||||
pin_mut!(stream);
|
||||
|
||||
// Initial snapshot contains both alice and bob beacon_infos from state.
|
||||
assert_eq!(initial.len(), 2);
|
||||
|
||||
// Alice's beacon arrives — updates her existing share with last_location.
|
||||
server
|
||||
.sync_room(
|
||||
&client,
|
||||
JoinedRoomBuilder::new(*DEFAULT_TEST_ROOM_ID).add_timeline_event(
|
||||
f.beacon(owned_event_id!("$alice_beacon_info"), 10.0, 20.0, 5, None)
|
||||
.event_id(event_id!("$alice_loc"))
|
||||
.sender(user_id!("@alice:localhost"))
|
||||
.server_ts(now)
|
||||
.into_raw_sync(),
|
||||
),
|
||||
)
|
||||
.await;
|
||||
|
||||
let diffs = stream.next().await.expect("Expected alice's location");
|
||||
let shares = diffs.into_iter().fold(initial, |mut v, diff| {
|
||||
diff.apply(&mut v);
|
||||
v
|
||||
});
|
||||
assert_eq!(shares.len(), 2);
|
||||
|
||||
// Bob's beacon arrives — updates his existing share with last_location.
|
||||
server
|
||||
.sync_room(
|
||||
&client,
|
||||
JoinedRoomBuilder::new(*DEFAULT_TEST_ROOM_ID).add_timeline_event(
|
||||
f.beacon(owned_event_id!("$bob_beacon_info"), 50.0, 60.0, 8, None)
|
||||
.event_id(event_id!("$bob_loc"))
|
||||
.sender(user_id!("@bob:localhost"))
|
||||
.server_ts(now)
|
||||
.into_raw_sync(),
|
||||
),
|
||||
)
|
||||
.await;
|
||||
|
||||
let diffs = stream.next().await.expect("Expected both users");
|
||||
let shares = diffs.into_iter().fold(shares, |mut v, diff| {
|
||||
diff.apply(&mut v);
|
||||
v
|
||||
});
|
||||
assert_eq!(shares.len(), 2);
|
||||
let mut shares: Vec<_> = shares.into_iter().collect();
|
||||
shares.sort_by_key(|s| s.user_id.clone());
|
||||
|
||||
assert_eq!(shares[0].user_id, user_id!("@alice:localhost"));
|
||||
assert_eq!(
|
||||
shares[0].last_location.as_ref().expect("Expected last location").location.uri,
|
||||
"geo:10,20;u=5"
|
||||
);
|
||||
|
||||
assert_eq!(shares[1].user_id, user_id!("@bob:localhost"));
|
||||
assert_eq!(
|
||||
shares[1].last_location.as_ref().expect("Expected last location").location.uri,
|
||||
"geo:50,60;u=8"
|
||||
);
|
||||
}
|
||||
|
||||
#[async_test]
|
||||
async fn test_initial_load_contains_location_from_event_cache() {
|
||||
let server = MatrixMockServer::new().await;
|
||||
let client = server.client_builder().build().await;
|
||||
|
||||
// Enable event cache BEFORE syncing so events get cached.
|
||||
client.event_cache().subscribe().unwrap();
|
||||
|
||||
let now = MilliSecondsSinceUnixEpoch::now();
|
||||
let f = EventFactory::new();
|
||||
|
||||
// Alice has a live beacon_info.
|
||||
let beacon_info_event = f
|
||||
.beacon_info(
|
||||
Some("Alice location".to_owned()),
|
||||
Duration::from_millis(300_000),
|
||||
true,
|
||||
Some(now),
|
||||
)
|
||||
.event_id(event_id!("$alice_beacon_info"))
|
||||
.sender(user_id!("@alice:localhost"))
|
||||
.state_key(user_id!("@alice:localhost"))
|
||||
.into_raw_sync_state();
|
||||
|
||||
// A beacon event in the timeline.
|
||||
let beacon_event = f
|
||||
.beacon(owned_event_id!("$alice_beacon_info"), 48.8566, 2.3522, 10, Some(now))
|
||||
.event_id(event_id!("$alice_location"))
|
||||
.sender(user_id!("@alice:localhost"))
|
||||
.server_ts(now)
|
||||
.into_raw_sync();
|
||||
|
||||
let mut event_cache_updates_stream = client.event_cache().subscribe_to_room_generic_updates();
|
||||
|
||||
server
|
||||
.sync_room(
|
||||
&client,
|
||||
JoinedRoomBuilder::new(*DEFAULT_TEST_ROOM_ID)
|
||||
.add_state_event(beacon_info_event)
|
||||
.add_timeline_event(beacon_event),
|
||||
)
|
||||
.await;
|
||||
|
||||
// Wait for the event cache background task to commit the events before
|
||||
// querying.
|
||||
assert_let_timeout!(Ok(_) = event_cache_updates_stream.recv());
|
||||
|
||||
let room = client.get_room(*DEFAULT_TEST_ROOM_ID).unwrap();
|
||||
let live_location_shares = room.live_location_shares().await;
|
||||
let (initial, _stream) = live_location_shares.subscribe();
|
||||
|
||||
// Initial snapshot should contain both beacon_info AND last_location.
|
||||
assert_eq!(initial.len(), 1);
|
||||
let share = &initial[0];
|
||||
assert_eq!(share.user_id, user_id!("@alice:localhost"));
|
||||
assert_eq!(share.beacon_info.description, Some("Alice location".to_owned()));
|
||||
|
||||
let last_location =
|
||||
share.last_location.as_ref().expect("Expected last_location in initial load");
|
||||
assert_eq!(last_location.location.uri, "geo:48.8566,2.3522;u=10");
|
||||
assert_eq!(last_location.ts, now);
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user