From 260eaeea2b33d475b8e8a17c884b918bfa5109fc Mon Sep 17 00:00:00 2001 From: ganfra Date: Mon, 13 Apr 2026 15:56:28 +0200 Subject: [PATCH] feat (live location): rewrite live location shares subscription logic --- .../matrix-sdk-ffi/src/live_location_share.rs | 149 +++- bindings/matrix-sdk-ffi/src/room/mod.rs | 62 +- crates/matrix-sdk/src/live_location_share.rs | 262 ++++-- crates/matrix-sdk/src/room/mod.rs | 24 +- .../tests/integration/room/beacon/mod.rs | 765 +++++++++++------- 5 files changed, 862 insertions(+), 400 deletions(-) diff --git a/bindings/matrix-sdk-ffi/src/live_location_share.rs b/bindings/matrix-sdk-ffi/src/live_location_share.rs index 044b2a197..e2b8ba1f8 100644 --- a/bindings/matrix-sdk-ffi/src/live_location_share.rs +++ b/bindings/matrix-sdk-ffi/src/live_location_share.rs @@ -12,22 +12,153 @@ // See the License for that specific language governing permissions and // limitations under the License. -use crate::ruma::LocationContent; +use std::{fmt::Debug, sync::Arc}; + +use eyeball_im::VectorDiff; +use futures_util::StreamExt as _; +use matrix_sdk::live_location_share::{ + LiveLocationShare as SdkLiveLocationShare, LiveLocationShares as SdkLiveLocationShares, +}; +use matrix_sdk_common::{SendOutsideWasm, SyncOutsideWasm}; + +use crate::{ruma::LocationContent, runtime::get_runtime_handle, task_handle::TaskHandle}; + +/// Details of the last known location beacon. #[derive(uniffi::Record)] pub struct LastLocation { - /// The most recent location content of the user. + /// The most recent location content shared for this asset. pub location: LocationContent, - /// A timestamp in milliseconds since Unix Epoch on that day in local - /// time. + /// The timestamp of when the location was updated. pub ts: u64, } -/// Details of a users live location share. + +/// Details of a user's live location share. #[derive(uniffi::Record)] pub struct LiveLocationShare { - /// The user's last known location. - pub last_location: LastLocation, - /// The live status of the live location share. - pub(crate) is_live: bool, + /// The asset's last known location. + pub last_location: Option, /// The user ID of the person sharing their live location. pub user_id: String, + /// The time when location sharing started. + pub start_ts: u64, + /// The duration that the location sharing will be live. + /// Meaning that the location will stop being shared at ts + timeout. + pub timeout: u64, +} + +/// An update to the list of active live location shares. +/// +/// Corresponds to a [`VectorDiff`] on the underlying [`ObservableVector`]. +/// +/// [`ObservableVector`]: eyeball_im::ObservableVector +#[derive(uniffi::Enum)] +pub enum LiveLocationShareUpdate { + Append { values: Vec }, + Clear, + PushFront { value: LiveLocationShare }, + PushBack { value: LiveLocationShare }, + PopFront, + PopBack, + Insert { index: u32, value: LiveLocationShare }, + Set { index: u32, value: LiveLocationShare }, + Remove { index: u32 }, + Truncate { length: u32 }, + Reset { values: Vec }, +} + +/// Listener for live location share updates. +#[matrix_sdk_ffi_macros::export(callback_interface)] +pub trait LiveLocationShareListener: SendOutsideWasm + SyncOutsideWasm + Debug { + /// Called with a batch of [`LiveLocationShareUpdate`]s whenever the list + /// of active shares changes. + fn on_update(&self, updates: Vec); +} + +/// Tracks active live location shares in a room. +/// +/// Holds the SDK [`SdkLiveLocationShares`] which keeps the beacon and +/// beacon_info event handlers registered for as long as this object is alive. +/// Call [`LiveLocationShares::subscribe`] to start receiving updates. +#[derive(uniffi::Object)] +pub struct LiveLocationShares { + inner: SdkLiveLocationShares, +} + +impl LiveLocationShares { + pub fn new(inner: SdkLiveLocationShares) -> Self { + Self { inner } + } +} + +#[matrix_sdk_ffi_macros::export] +impl LiveLocationShares { + /// Subscribe to changes in the list of active live location shares. + /// + /// Immediately calls `listener` with a `Reset` update containing the + /// current snapshot (if non-empty), then calls it again for every + /// subsequent change that arrives from sync. + /// + /// Returns a [`TaskHandle`] that, when dropped, stops the listener. + /// The event handlers remain registered for as long as this + /// [`LiveLocationShares`] object is alive. + pub fn subscribe(&self, listener: Box) -> Arc { + let (initial_values, mut stream) = self.inner.subscribe(); + + if !initial_values.is_empty() { + listener.on_update(vec![LiveLocationShareUpdate::Reset { + values: initial_values.into_iter().map(Into::into).collect(), + }]); + } + + Arc::new(TaskHandle::new(get_runtime_handle().spawn(async move { + while let Some(diffs) = stream.next().await { + listener.on_update(diffs.into_iter().map(Into::into).collect()); + } + }))) + } +} + +impl From for LiveLocationShare { + fn from(share: SdkLiveLocationShare) -> Self { + let start_ts = share.beacon_info.ts.0.into(); + let timeout = share.beacon_info.timeout.as_millis() as u64; + let asset = share.beacon_info.asset.type_.into(); + let last_location = share.last_location.map(|l| LastLocation { + location: LocationContent { + body: "".to_owned(), + geo_uri: l.location.uri.to_string(), + description: None, + zoom_level: None, + asset, + }, + ts: l.ts.0.into(), + }); + LiveLocationShare { user_id: share.user_id.to_string(), last_location, start_ts, timeout } + } +} + +impl From> for LiveLocationShareUpdate { + fn from(diff: VectorDiff) -> Self { + match diff { + VectorDiff::Append { values } => { + Self::Append { values: values.into_iter().map(Into::into).collect() } + } + VectorDiff::Clear => Self::Clear, + VectorDiff::PushFront { value } => Self::PushFront { value: value.into() }, + VectorDiff::PushBack { value } => Self::PushBack { value: value.into() }, + VectorDiff::PopFront => Self::PopFront, + VectorDiff::PopBack => Self::PopBack, + VectorDiff::Insert { index, value } => { + Self::Insert { index: index as u32, value: value.into() } + } + VectorDiff::Set { index, value } => { + Self::Set { index: index as u32, value: value.into() } + } + VectorDiff::Remove { index } => Self::Remove { index: index as u32 }, + VectorDiff::Truncate { length } => Self::Truncate { length: length as u32 }, + VectorDiff::Reset { values } => { + Self::Reset { values: values.into_iter().map(Into::into).collect() } + } + } + } } diff --git a/bindings/matrix-sdk-ffi/src/room/mod.rs b/bindings/matrix-sdk-ffi/src/room/mod.rs index 8c0cfc92f..48d0e132f 100644 --- a/bindings/matrix-sdk-ffi/src/room/mod.rs +++ b/bindings/matrix-sdk-ffi/src/room/mod.rs @@ -46,7 +46,7 @@ use ruma::{ }, }, }; -use tracing::{error, warn}; +use tracing::error; use self::{power_levels::RoomPowerLevels, room_info::RoomInfo}; use crate::{ @@ -56,12 +56,10 @@ use crate::{ error::{ClientError, MediaInfoError, NotYetImplemented, QueueWedgeError, RoomError}, event::TimelineEvent, identity_status_change::IdentityStatusChange, - live_location_share::{LastLocation, LiveLocationShare}, + live_location_share::LiveLocationShares, room_member::{RoomMember, RoomMemberWithSenderInfo}, room_preview::RoomPreview, - ruma::{ - AudioInfo, FileInfo, ImageInfo, LocationContent, MediaSource, ThumbnailInfo, VideoInfo, - }, + ruma::{AudioInfo, FileInfo, ImageInfo, MediaSource, ThumbnailInfo, VideoInfo}, runtime::get_runtime_handle, timeline::{ AbstractProgress, LatestEventValue, ReceiptType, SendHandle, Timeline, UploadSource, @@ -1138,46 +1136,16 @@ impl Room { })))) } - /// Subscribes to live location shares in this room, using a `listener` to - /// be notified of the changes. + /// Returns the active live location shares for this room. /// - /// The current live location shares will be emitted immediately when - /// subscribing, along with a [`TaskHandle`] to cancel the subscription. - pub fn subscribe_to_live_location_shares( - self: Arc, - listener: Box, - ) -> Arc { - let room = self.inner.clone(); - - Arc::new(TaskHandle::new(get_runtime_handle().spawn(async move { - let subscription = room.observe_live_location_shares(); - let stream = subscription.subscribe(); - let mut pinned_stream = pin!(stream); - - while let Some(event) = pinned_stream.next().await { - let last_location = LocationContent { - body: "".to_owned(), - geo_uri: event.last_location.location.uri.clone().to_string(), - description: None, - zoom_level: None, - asset: event.beacon_info.as_ref().map(|b| b.asset.type_.clone()).into(), - }; - - let Some(beacon_info) = event.beacon_info else { - warn!("Live location share is missing the associated beacon_info state, skipping event."); - continue; - }; - - listener.call(vec![LiveLocationShare { - last_location: LastLocation { - location: last_location, - ts: event.last_location.ts.0.into(), - }, - is_live: beacon_info.is_live(), - user_id: event.user_id.to_string(), - }]) - } - }))) + /// The returned [`LiveLocationShares`] object tracks which users are + /// currently sharing their live location. It keeps the underlying event + /// handlers registered — and therefore the share list up-to-date — for as + /// long as it is alive. Call [`LiveLocationShares::subscribe`] on it to + /// receive an initial snapshot and a stream of incremental updates. + pub async fn live_location_shares(&self) -> Arc { + let inner = self.inner.live_location_shares().await; + Arc::new(LiveLocationShares::new(inner)) } /// Forget this room. @@ -1300,12 +1268,6 @@ impl Room { } } -/// A listener for receiving new live location shares in a room. -#[matrix_sdk_ffi_macros::export(callback_interface)] -pub trait LiveLocationShareListener: SyncOutsideWasm + SendOutsideWasm { - fn call(&self, live_location_shares: Vec); -} - /// A listener for receiving call decline events in a room. #[matrix_sdk_ffi_macros::export(callback_interface)] pub trait CallDeclineListener: SyncOutsideWasm + SendOutsideWasm { diff --git a/crates/matrix-sdk/src/live_location_share.rs b/crates/matrix-sdk/src/live_location_share.rs index 45613fd26..459549f19 100644 --- a/crates/matrix-sdk/src/live_location_share.rs +++ b/crates/matrix-sdk/src/live_location_share.rs @@ -16,71 +16,237 @@ //! //! Live location sharing allows users to share their real-time location with //! others in a room via [MSC3489](https://github.com/matrix-org/matrix-spec-proposals/pull/3489). -use async_stream::stream; -use futures_util::Stream; + +use std::sync::Arc; + +use eyeball_im::{ObservableVector, VectorSubscriberBatchedStream}; +use imbl::Vector; +use matrix_sdk_base::{deserialized_responses::SyncOrStrippedState, event_cache::Event}; +use matrix_sdk_common::locks::Mutex; use ruma::{ - MilliSecondsSinceUnixEpoch, OwnedUserId, RoomId, + MilliSecondsSinceUnixEpoch, OwnedEventId, OwnedUserId, events::{ - beacon::OriginalSyncBeaconEvent, beacon_info::BeaconInfoEventContent, + AnySyncMessageLikeEvent, AnySyncTimelineEvent, SyncStateEvent, + beacon::OriginalSyncBeaconEvent, + beacon_info::{BeaconInfoEventContent, OriginalSyncBeaconInfoEvent}, location::LocationContent, + relation::RelationType, }, }; -use crate::{Client, Room, event_handler::ObservableEventHandler}; - -/// An observable live location. -#[derive(Debug)] -pub struct ObservableLiveLocation { - observable_room_events: ObservableEventHandler<(OriginalSyncBeaconEvent, Room)>, -} - -impl ObservableLiveLocation { - /// Create a new `ObservableLiveLocation` for a particular room. - pub fn new(client: &Client, room_id: &RoomId) -> Self { - Self { observable_room_events: client.observe_room_events(room_id) } - } - - /// Get a stream of [`LiveLocationShare`]. - pub fn subscribe(&self) -> impl Stream + use<> { - let stream = self.observable_room_events.subscribe(); - - stream! { - for await (event, room) in stream { - if event.sender != room.own_user_id() { - yield LiveLocationShare { - last_location: LastLocation { - location: event.content.location, - ts: event.origin_server_ts, - }, - beacon_info: room - .get_user_beacon_info(&event.sender) - .await - .ok() - .map(|info| info.content), - user_id: event.sender, - }; - } - } - } - } -} +use super::Room; +use crate::event_handler::EventHandlerDropGuard; /// Details of the last known location beacon. #[derive(Clone, Debug)] pub struct LastLocation { - /// The most recent location content of the user. + /// The most recent location content of the asset. pub location: LocationContent, /// The timestamp of when the location was updated. pub ts: MilliSecondsSinceUnixEpoch, } -/// Details of a users live location share. +/// Details of a user's live location share. #[derive(Clone, Debug)] pub struct LiveLocationShare { - /// The user's last known location. - pub last_location: LastLocation, - /// Information about the associated beacon event. - pub beacon_info: Option, /// The user ID of the person sharing their live location. pub user_id: OwnedUserId, + /// The asset's last known location, if any beacon has been received. + pub last_location: Option, + /// The event ID of the beacon_info state event for this share. + pub beacon_id: OwnedEventId, + /// Information about the associated beacon event. + pub beacon_info: BeaconInfoEventContent, +} + +/// Tracks active live location shares in a room using an [`ObservableVector`]. +/// +/// Registers event handlers for beacon (location update) and beacon info +/// (share started/stopped) events and reflects changes into a vector that +/// callers can subscribe to via [`LiveLocationShares::subscribe`]. +/// +/// Event handlers are automatically unregistered when this struct is dropped. +#[derive(Debug)] +pub struct LiveLocationShares { + shares: Arc>>, + _beacon_guard: EventHandlerDropGuard, + _beacon_info_guard: EventHandlerDropGuard, +} + +impl LiveLocationShares { + /// Create a new [`LiveLocationShares`] for the given room. + /// + /// Loads the current active shares from the event cache as initial state, + /// then begins listening for beacon events to keep the vector up-to-date. + pub(super) async fn new(room: Room) -> Self { + let mut shares = ObservableVector::new(); + let initial_shares = + Self::get_initial_live_location_shares(&room).await.unwrap_or_default(); + shares.append(initial_shares); + let shares = Arc::new(Mutex::new(shares)); + + let beacon_handle = room.add_event_handler({ + let shares = shares.clone(); + async move |event: OriginalSyncBeaconEvent| { + Self::handle_beacon_event(&shares, event); + } + }); + let beacon_guard = room.client.event_handler_drop_guard(beacon_handle); + let beacon_info_handle = room.add_event_handler({ + let shares = shares.clone(); + async move |event: OriginalSyncBeaconInfoEvent, room: Room| { + Self::handle_beacon_info_event(&shares, &room, event).await; + } + }); + let beacon_info_guard = room.client.event_handler_drop_guard(beacon_info_handle); + Self { shares, _beacon_guard: beacon_guard, _beacon_info_guard: beacon_info_guard } + } + + /// Subscribe to changes and updates in the live location shares. + /// + /// Returns a snapshot of the current items alongside a batched stream of + /// [`eyeball_im::VectorDiff`]s that describe subsequent changes. + pub fn subscribe( + &self, + ) -> (Vector, VectorSubscriberBatchedStream) { + self.shares.lock().subscribe().into_values_and_batched_stream() + } + + /// Get all currently active live location shares in a room. + async fn get_initial_live_location_shares( + room: &Room, + ) -> crate::Result> { + // Beacon infos are stored in the state store, not the event cache. + let beacon_infos = room.get_state_events_static::().await?; + // Event cache is only needed for finding last location (optional). + let event_cache = room.event_cache().await.ok(); + let mut shares = Vector::new(); + for raw_beacon_info in beacon_infos { + let Ok(event) = raw_beacon_info.deserialize() else { continue }; + let Some((user_id, beacon_info, event_id)) = Self::extract_live_beacon_info(event) + else { + continue; + }; + let last_location = match &event_cache { + Some((cache, _drop_handles)) => Self::find_last_location(cache, &event_id).await, + None => None, + }; + shares.push_back(LiveLocationShare { + user_id, + beacon_info, + beacon_id: event_id, + last_location, + }); + } + Ok(shares) + } + + /// Extracts a live beacon info from a state event. + /// + /// Returns `(user_id, content, event_id)`, or `None` if the event is + /// redacted/stripped or not currently live. + fn extract_live_beacon_info( + event: SyncOrStrippedState, + ) -> Option<(OwnedUserId, BeaconInfoEventContent, OwnedEventId)> { + let SyncOrStrippedState::Sync(SyncStateEvent::Original(ev)) = event else { + return None; + }; + if !ev.content.is_live() { + return None; + } + Some((ev.state_key, ev.content, ev.event_id)) + } + + /// Finds the most recent beacon event referencing the given beacon_info + /// event. + /// + /// Beacon events use an `m.reference` relation to point to their + /// originating `beacon_info` state event. The event cache's relation + /// index lets us look them up directly by ID without scanning all + /// cached events. + async fn find_last_location( + cache: &crate::event_cache::RoomEventCache, + beacon_info_event_id: &OwnedEventId, + ) -> Option { + cache + .find_event_relations(beacon_info_event_id, Some(vec![RelationType::Reference])) + .await + .ok()? + .into_iter() + .rev() + .find_map(|e| Self::event_to_last_location(&e)) + } + + /// Converts an [`Event`] to a [`LastLocation`] if it is a beacon event. + fn event_to_last_location(event: &Event) -> Option { + if let Ok(AnySyncTimelineEvent::MessageLike(AnySyncMessageLikeEvent::Beacon( + beacon_event, + ))) = event.kind.raw().deserialize() + { + beacon_event.as_original().map(|beacon| LastLocation { + location: beacon.content.location.clone(), + ts: beacon.origin_server_ts, + }) + } else { + None + } + } + + /// Handles a single beacon event (location update). + /// + /// Matches the beacon to its share via `relates_to.event_id`, which + /// references the originating `beacon_info` state event. + fn handle_beacon_event( + shares: &Mutex>, + event: OriginalSyncBeaconEvent, + ) { + let beacon_info_event_id = &event.content.relates_to.event_id; + let mut shares = shares.lock(); + if let Some(idx) = shares.iter().position(|s| s.beacon_id == *beacon_info_event_id) { + // Check if beacon info is still live, if not, remove the share and ignore the + // beacon event. + let mut share = shares[idx].clone(); + if !share.beacon_info.is_live() { + shares.remove(idx); + return; + } + let last_location = + LastLocation { location: event.content.location, ts: event.origin_server_ts }; + share.last_location = Some(last_location); + shares.set(idx, share); + } + } + + /// Handles a single beacon_info state event (share started or stopped). + /// + /// When a new beacon info is received for an already tracked user, the + /// share is removed from the vector. If the new beacon info is live, we add + /// it at the end of the vector, looking up the event cache to find any + /// beacon event that may have arrived before the beacon_info. + async fn handle_beacon_info_event( + shares: &Mutex>, + room: &Room, + event: OriginalSyncBeaconInfoEvent, + ) { + { + let mut shares = shares.lock(); + if let Some(idx) = shares.iter().position(|s| s.user_id == *event.state_key) { + shares.remove(idx); + } + } + if event.content.is_live() { + let last_location = if let Ok((cache, _drop_handles)) = room.event_cache().await { + Self::find_last_location(&cache, &event.event_id).await + } else { + None + }; + let share = LiveLocationShare { + user_id: event.state_key, + beacon_id: event.event_id, + beacon_info: event.content, + last_location, + }; + shares.lock().push_back(share); + } + } } diff --git a/crates/matrix-sdk/src/room/mod.rs b/crates/matrix-sdk/src/room/mod.rs index 7cbb617cc..d4389effc 100644 --- a/crates/matrix-sdk/src/room/mod.rs +++ b/crates/matrix-sdk/src/room/mod.rs @@ -181,7 +181,7 @@ use crate::{ error::{BeaconError, WrongRoomState}, event_cache::{self, EventCacheDropHandles, RoomEventCache}, event_handler::{EventHandler, EventHandlerDropGuard, EventHandlerHandle, SyncEvent}, - live_location_share::ObservableLiveLocation, + live_location_share::LiveLocationShares, media::{MediaFormat, MediaRequestParameters}, notification_settings::{IsEncrypted, IsOneToOne, RoomNotificationMode}, room::{ @@ -717,6 +717,17 @@ impl Room { IdentityStatusChanges::create_stream(self.clone()).await } + /// Subscribes to active live location shares in this room. + /// + /// Returns a [`LiveLocationShares`] that holds the current state and + /// exposes a stream of incremental [`eyeball_im::VectorDiff`] updates via + /// [`LiveLocationShares::subscribe`]. + /// + /// Event handlers are active for as long as the returned struct is alive. + pub async fn live_location_shares(&self) -> LiveLocationShares { + LiveLocationShares::new(self.clone()).await + } + /// Returns a wrapping `TimelineEvent` for the input `AnyTimelineEvent`, /// decrypted if needs be. /// @@ -4075,17 +4086,6 @@ impl Room { } } - /// Observe live location sharing events for this room. - /// - /// The returned observable will receive the newest event for each sync - /// response that contains an `m.beacon` event. - /// - /// Returns a stream of [`ObservableLiveLocation`] events from other users - /// in the room, excluding the live location events of the room's own user. - pub fn observe_live_location_shares(&self) -> ObservableLiveLocation { - ObservableLiveLocation::new(&self.client, self.room_id()) - } - /// Subscribe to knock requests in this `Room`. /// /// The current requests to join the room will be emitted immediately diff --git a/crates/matrix-sdk/tests/integration/room/beacon/mod.rs b/crates/matrix-sdk/tests/integration/room/beacon/mod.rs index 1aec44a55..e4af9f3e1 100644 --- a/crates/matrix-sdk/tests/integration/room/beacon/mod.rs +++ b/crates/matrix-sdk/tests/integration/room/beacon/mod.rs @@ -1,42 +1,31 @@ -use std::time::{Duration, UNIX_EPOCH}; +use std::time::Duration; use futures_util::{FutureExt, StreamExt as _, pin_mut}; use js_int::uint; use matrix_sdk::{ - config::{SyncSettings, SyncToken}, - live_location_share::LiveLocationShare, - test_utils::mocks::MatrixMockServer, + assert_let_timeout, live_location_share::LiveLocationShare, test_utils::mocks::MatrixMockServer, }; use matrix_sdk_test::{ - DEFAULT_TEST_ROOM_ID, JoinedRoomBuilder, SyncResponseBuilder, async_test, - event_factory::EventFactory, mocks::mock_encryption_state, test_json, + DEFAULT_TEST_ROOM_ID, JoinedRoomBuilder, async_test, event_factory::EventFactory, }; use ruma::{ - EventId, MilliSecondsSinceUnixEpoch, event_id, - events::{ - beacon::BeaconEventContent, beacon_info::BeaconInfoEventContent, location::AssetType, - }, - owned_event_id, room_id, - time::SystemTime, + EventId, MilliSecondsSinceUnixEpoch, event_id, events::location::AssetType, owned_event_id, user_id, }; use serde_json::json; -use wiremock::{ - Mock, ResponseTemplate, - matchers::{body_partial_json, header, method, path_regex}, -}; -use crate::{logged_in_client_with_server, mock_sync}; #[async_test] async fn test_send_location_beacon() { - let (client, server) = logged_in_client_with_server().await; + let server = MatrixMockServer::new().await; + let client = server.client_builder().build().await; + + server.mock_room_state_encryption().plain().mount().await; // Validate request body and response, partial body matching due to // auto-generated `org.matrix.msc3488.ts`. - Mock::given(method("PUT")) - .and(path_regex(r"^/_matrix/client/r0/rooms/.*/send/org.matrix.msc3672.beacon/.*")) - .and(header("authorization", "Bearer 1234")) - .and(body_partial_json(json!({ + server + .mock_room_send() + .body_matches_partial_json(json!({ "m.relates_to": { "event_id": "$15139375514XsgmR:localhost", "rel_type": "m.reference" @@ -44,57 +33,33 @@ async fn test_send_location_beacon() { "org.matrix.msc3488.location": { "uri": "geo:48.8588448,2.2943506" } - }))) - .respond_with(ResponseTemplate::new(200).set_body_json(&*test_json::EVENT_ID)) - .mount(&server) + })) + .ok(event_id!("$h29iv0s8:example.com")) + .mock_once() + .mount() .await; - let current_timestamp = - SystemTime::now().duration_since(UNIX_EPOCH).expect("Time went backwards").as_millis() - as u64; + let current_time = MilliSecondsSinceUnixEpoch::now(); + let f = EventFactory::new(); - mock_sync( - &server, - json!({ - "next_batch": "s526_47314_0_7_1_1_1_1_1", - "rooms": { - "join": { - *DEFAULT_TEST_ROOM_ID: { - "state": { - "events": [ - { - "content": { - "description": "Live Share", - "live": true, - "org.matrix.msc3488.ts": current_timestamp, - "timeout": 600_000, - "org.matrix.msc3488.asset": { "type": "m.self" } - }, - "event_id": "$15139375514XsgmR:localhost", - "origin_server_ts": 1_636_829_458, - "sender": "@example:localhost", - "state_key": "@example:localhost", - "type": "org.matrix.msc3672.beacon_info", - "unsigned": { - "age": 7034220 - } - }, - ] - } - } - } - } + let beacon_info_event = f + .beacon_info( + Some("Live Share".to_owned()), + Duration::from_millis(600_000), + true, + Some(current_time), + ) + .event_id(event_id!("$15139375514XsgmR:localhost")) + .sender(user_id!("@example:localhost")) + .state_key(user_id!("@example:localhost")) + .into_raw_sync_state(); - }), - None, - ) - .await; - - mock_encryption_state(&server, false).await; - - let sync_settings = SyncSettings::new().timeout(Duration::from_millis(3000)); - - client.sync_once(sync_settings).await.unwrap(); + server + .sync_room( + &client, + JoinedRoomBuilder::new(*DEFAULT_TEST_ROOM_ID).add_state_event(beacon_info_event), + ) + .await; let room = client.get_room(&DEFAULT_TEST_ROOM_ID).unwrap(); @@ -105,15 +70,10 @@ async fn test_send_location_beacon() { #[async_test] async fn test_send_location_beacon_fails_without_starting_live_share() { - let (client, server) = logged_in_client_with_server().await; - - mock_sync(&server, &*test_json::SYNC, None).await; - - let sync_settings = SyncSettings::new().timeout(Duration::from_millis(3000)); - client.sync_once(sync_settings).await.unwrap(); - - let room = client.get_room(&DEFAULT_TEST_ROOM_ID).unwrap(); + let server = MatrixMockServer::new().await; + let client = server.client_builder().build().await; + let room = server.sync_joined_room(&client, *DEFAULT_TEST_ROOM_ID).await; let response = room.send_location_beacon("geo:48.8588448,2.2943506".to_owned()).await; assert!(response.is_err()); @@ -121,48 +81,29 @@ async fn test_send_location_beacon_fails_without_starting_live_share() { #[async_test] async fn test_send_location_beacon_with_expired_live_share() { - let (client, server) = logged_in_client_with_server().await; + let server = MatrixMockServer::new().await; + let client = server.client_builder().build().await; - mock_sync( - &server, - json!({ - "next_batch": "s526_47314_0_7_1_1_1_1_1", - "rooms": { - "join": { - *DEFAULT_TEST_ROOM_ID: { - "state": { - "events": [ - { - "content": { - "description": "Live Share", - "live": false, - "org.matrix.msc3488.ts": 1_636_829_458, - "timeout": 3000, - "org.matrix.msc3488.asset": { "type": "m.self" } - }, - "event_id": "$15139375514XsgmR:localhost", - "origin_server_ts": 1_636_829_458, - "sender": "@example2:localhost", - "state_key": "@example2:localhost", - "type": "org.matrix.msc3672.beacon_info", - "unsigned": { - "age": 7034220 - } - }, - ] - } - } - } - } + let f = EventFactory::new(); - }), - None, - ) - .await; + let beacon_info_event = f + .beacon_info( + Some("Live Share".to_owned()), + Duration::from_millis(3000), + false, + Some(MilliSecondsSinceUnixEpoch(uint!(1_636_829_458))), + ) + .event_id(event_id!("$15139375514XsgmR:localhost")) + .sender(user_id!("@example2:localhost")) + .state_key(user_id!("@example2:localhost")) + .into_raw_sync_state(); - let sync_settings = SyncSettings::new().timeout(Duration::from_millis(3000)); - - client.sync_once(sync_settings).await.unwrap(); + server + .sync_room( + &client, + JoinedRoomBuilder::new(*DEFAULT_TEST_ROOM_ID).add_state_event(beacon_info_event), + ) + .await; let room = client.get_room(&DEFAULT_TEST_ROOM_ID).unwrap(); @@ -173,69 +114,39 @@ async fn test_send_location_beacon_with_expired_live_share() { #[async_test] async fn test_most_recent_event_in_stream() { - let (client, server) = logged_in_client_with_server().await; - - let mut sync_builder = SyncResponseBuilder::new(); + let server = MatrixMockServer::new().await; + let client = server.client_builder().build().await; let current_time = MilliSecondsSinceUnixEpoch::now(); - let millis_time = current_time - .to_system_time() - .unwrap() - .duration_since(UNIX_EPOCH) - .expect("Time went backwards") - .as_millis() as u64; + let f = EventFactory::new(); - mock_sync( - &server, - json!({ - "next_batch": "s526_47314_0_7_1_1_1_1_1", - "rooms": { - "join": { - *DEFAULT_TEST_ROOM_ID: { - "state": { - "events": [ - { - "content": { - "description": "Live Share", - "live": true, - "org.matrix.msc3488.ts": millis_time, - "timeout": 3000, - "org.matrix.msc3488.asset": { "type": "m.self" } - }, - "event_id": "$15139375514XsgmR:localhost", - "origin_server_ts": millis_time, - "sender": "@example2:localhost", - "state_key": "@example2:localhost", - "type": "org.matrix.msc3672.beacon_info", - "unsigned": { - "age": 7034220 - } - }, - ] - } - } - } - } + let beacon_info_event = f + .beacon_info( + Some("Live Share".to_owned()), + Duration::from_millis(3_600_000), + true, + Some(current_time), + ) + .event_id(event_id!("$15139375514XsgmR:localhost")) + .sender(user_id!("@example2:localhost")) + .state_key(user_id!("@example2:localhost")) + .into_raw_sync_state(); - }), - None, - ) - .await; - let sync_settings = - SyncSettings::new().timeout(Duration::from_millis(3000)).token(SyncToken::NoToken); - let _response = client.sync_once(sync_settings.clone()).await.unwrap(); - server.reset().await; + server + .sync_room( + &client, + JoinedRoomBuilder::new(*DEFAULT_TEST_ROOM_ID).add_state_event(beacon_info_event), + ) + .await; + + // Enable the event cache so the initial snapshot can be loaded from it. + client.event_cache().subscribe().unwrap(); let room = client.get_room(*DEFAULT_TEST_ROOM_ID).unwrap(); - let observable_live_location_shares = room.observe_live_location_shares(); - let stream = observable_live_location_shares.subscribe(); - pin_mut!(stream); - let mut timeline_events = Vec::new(); - let f = EventFactory::new(); - for nth in 0..25 { + for nth in 0..10 { let event_id = format!("$event_for_stream_{nth}"); timeline_events.push( f.beacon( @@ -253,175 +164,467 @@ async fn test_most_recent_event_in_stream() { ); } - sync_builder.add_joined_room( - JoinedRoomBuilder::new(*DEFAULT_TEST_ROOM_ID).add_timeline_bulk(timeline_events), - ); + let mut event_cache_updates_stream = client.event_cache().subscribe_to_room_generic_updates(); - mock_sync(&server, sync_builder.build_json_sync_response(), None).await; - let _response = client.sync_once(sync_settings.clone()).await.unwrap(); - server.reset().await; + server + .sync_room( + &client, + JoinedRoomBuilder::new(*DEFAULT_TEST_ROOM_ID).add_timeline_bulk(timeline_events), + ) + .await; - // Stream should only process the latest beacon event for the user, ignoring any - // previous events. - let LiveLocationShare { user_id, last_location, beacon_info } = - stream.next().await.expect("Another live location was expected"); + // Wait for the event cache background task to commit the events before + // querying. + assert_let_timeout!(Ok(_) = event_cache_updates_stream.recv()); + + // Create the stream after syncing all beacon events — the initial snapshot is + // loaded from the event cache and already reflects the latest beacon. + let live_location_shares = room.live_location_shares().await; + let (mut shares, _stream) = live_location_shares.subscribe(); + + assert_eq!(shares.len(), 1); + let LiveLocationShare { user_id, last_location, beacon_info, .. } = shares.remove(0); assert_eq!(user_id.to_string(), "@example2:localhost"); + let last_location = last_location.expect("Expected last location"); assert_eq!( last_location.location.uri, - format!("geo:{},{};u=24", 24.9575274619722, 12.494122581370175) + format!("geo:{},{};u=9", 9.9575274619722, 12.494122581370175) ); assert!(last_location.location.description.is_none()); assert!(last_location.location.zoom_level.is_none()); assert_eq!(last_location.ts, MilliSecondsSinceUnixEpoch(uint!(1_636_829_458))); - let beacon_info = beacon_info.expect("Live location share is missing the beacon_info"); - assert!(beacon_info.live); assert!(beacon_info.is_live()); assert_eq!(beacon_info.description, Some("Live Share".to_owned())); - assert_eq!(beacon_info.timeout, Duration::from_millis(3000)); + assert_eq!(beacon_info.timeout, Duration::from_millis(3_600_000)); assert_eq!(beacon_info.ts, current_time); assert_eq!(beacon_info.asset.type_, AssetType::Self_); } #[async_test] async fn test_observe_single_live_location_share() { - let (client, server) = logged_in_client_with_server().await; + let server = MatrixMockServer::new().await; + let client = server.client_builder().build().await; let current_time = MilliSecondsSinceUnixEpoch::now(); - let millis_time = current_time - .to_system_time() - .unwrap() - .duration_since(UNIX_EPOCH) - .expect("Time went backwards") - .as_millis() as u64; + let f = EventFactory::new(); - mock_sync( - &server, - json!({ - "next_batch": "s526_47314_0_7_1_1_1_1_1", - "rooms": { - "join": { - *DEFAULT_TEST_ROOM_ID: { - "state": { - "events": [ - { - "content": { - "description": "Test Live Share", - "live": true, - "org.matrix.msc3488.ts": millis_time, - "timeout": 3000, - "org.matrix.msc3488.asset": { "type": "m.self" } - }, - "event_id": "$test_beacon_info", - "origin_server_ts": millis_time, - "sender": "@example2:localhost", - "state_key": "@example2:localhost", - "type": "org.matrix.msc3672.beacon_info", - } - ] - } - } - } - } - }), - None, - ) - .await; + let beacon_info_event = f + .beacon_info( + Some("Test Live Share".to_owned()), + Duration::from_millis(3_600_000), + true, + Some(current_time), + ) + .event_id(event_id!("$test_beacon_info")) + .sender(user_id!("@example2:localhost")) + .state_key(user_id!("@example2:localhost")) + .into_raw_sync_state(); - let sync_settings = - SyncSettings::new().timeout(Duration::from_millis(3000)).token(SyncToken::NoToken); - let _response = client.sync_once(sync_settings.clone()).await.unwrap(); - server.reset().await; + server + .sync_room( + &client, + JoinedRoomBuilder::new(*DEFAULT_TEST_ROOM_ID).add_state_event(beacon_info_event), + ) + .await; let room = client.get_room(*DEFAULT_TEST_ROOM_ID).unwrap(); - let observable_live_location_shares = room.observe_live_location_shares(); - let stream = observable_live_location_shares.subscribe(); + let live_location_shares = room.live_location_shares().await; + let (initial, stream) = live_location_shares.subscribe(); pin_mut!(stream); - let timeline_event = EventFactory::new() - .beacon( - owned_event_id!("$test_beacon_info"), - 10.000000, - 20.000000, - 5, - Some(MilliSecondsSinceUnixEpoch(1_636_829_458u32.into())), - ) + // Initial snapshot contains the beacon_info from state (no last_location yet). + assert_eq!(initial.len(), 1); + assert!(initial[0].last_location.is_none()); + + let timeline_event = f + .beacon(owned_event_id!("$test_beacon_info"), 10.0, 20.0, 5, Some(current_time)) .event_id(event_id!("$location_event")) - .server_ts(millis_time) + .server_ts(current_time) .sender(user_id!("@example2:localhost")) .into_raw_sync(); - mock_sync( - &server, - SyncResponseBuilder::new() - .add_joined_room( - JoinedRoomBuilder::new(*DEFAULT_TEST_ROOM_ID).add_timeline_event(timeline_event), - ) - .build_json_sync_response(), - None, - ) - .await; + server + .sync_room( + &client, + JoinedRoomBuilder::new(*DEFAULT_TEST_ROOM_ID).add_timeline_event(timeline_event), + ) + .await; - let _response = client.sync_once(sync_settings.clone()).await.unwrap(); - server.reset().await; - - let LiveLocationShare { user_id, last_location, beacon_info } = - stream.next().await.expect("Another live location was expected"); + let diffs = stream.next().await.expect("Expected live location share update"); + let mut shares = diffs.into_iter().fold(initial, |mut v, diff| { + diff.apply(&mut v); + v + }); + // Beacon handler updates the existing share with last_location + assert_eq!(shares.len(), 1); + let LiveLocationShare { user_id, last_location, beacon_info, .. } = shares.remove(0); assert_eq!(user_id.to_string(), "@example2:localhost"); - assert_eq!(last_location.location.uri, format!("geo:{},{};u=5", 10.000000, 20.000000)); + let last_location = last_location.expect("Expected last location"); + assert_eq!(last_location.location.uri, "geo:10,20;u=5"); assert_eq!(last_location.ts, current_time); - let beacon_info = beacon_info.expect("Live location share is missing the beacon_info"); - assert!(beacon_info.live); assert!(beacon_info.is_live()); assert_eq!(beacon_info.description, Some("Test Live Share".to_owned())); - assert_eq!(beacon_info.timeout, Duration::from_millis(3000)); + assert_eq!(beacon_info.timeout, Duration::from_millis(3_600_000)); assert_eq!(beacon_info.ts, current_time); } #[async_test] -async fn test_observing_live_location_does_not_return_own_beacon_updates() { +async fn test_observing_live_location_does_not_return_non_live() { let server = MatrixMockServer::new().await; let client = server.client_builder().build().await; - let room_id = room_id!("!a:b.c"); - let event_id = event_id!("$a:b.c"); - let user_id = user_id!("@example:localhost"); - let f = EventFactory::new().room(room_id); + let f = EventFactory::new(); - let joined_room_builder = JoinedRoomBuilder::new(room_id).add_state_bulk(vec![ - f.event(BeaconInfoEventContent::new(None, Duration::from_secs(60), false, None)) - .event_id(event_id) - .sender(user_id) - .state_key(user_id) - .into(), - ]); + // The user's beacon_info is NOT live. + let beacon_info_event = f + .beacon_info(None, Duration::from_millis(60_000), false, None) + .event_id(event_id!("$15139375514XsgmR:localhost")) + .sender(user_id!("@example:localhost")) + .state_key(user_id!("@example:localhost")) + .into_raw_sync_state(); - let room = server.sync_room(&client, joined_room_builder).await; + server + .sync_room( + &client, + JoinedRoomBuilder::new(*DEFAULT_TEST_ROOM_ID).add_state_event(beacon_info_event), + ) + .await; - let observable_live_location_shares = room.observe_live_location_shares(); - let stream = observable_live_location_shares.subscribe(); + let room = client.get_room(*DEFAULT_TEST_ROOM_ID).unwrap(); + let live_location_shares = room.live_location_shares().await; + let (initial, stream) = live_location_shares.subscribe(); pin_mut!(stream); + // Initial is empty because beacon_info is not live. + assert!(initial.is_empty()); + + // A beacon event arrives, but beacon_info is not live — should not yield. let beacon_event = f - .event(BeaconEventContent::new( - owned_event_id!("$15139375514XsgmR:localhost"), - "geo:51.5008,0.1247;u=35".to_owned(), - Some(MilliSecondsSinceUnixEpoch(uint!(1_636_829_458))), - )) + .beacon(owned_event_id!("$15139375514XsgmR:localhost"), 51.5008, 0.1247, 35, None) .event_id(event_id!("$152037dfsef280074GZeOm:localhost")) - .sender(user_id) + .sender(user_id!("@example:localhost")) .into_raw_sync(); - let joined = JoinedRoomBuilder::new(room_id).add_timeline_event(beacon_event); - - let _ = server.sync_room(&client, joined).await; + server + .sync_room( + &client, + JoinedRoomBuilder::new(*DEFAULT_TEST_ROOM_ID).add_timeline_event(beacon_event), + ) + .await; assert!(stream.next().now_or_never().is_none()); } + +#[async_test] +async fn test_location_update_for_already_tracked_user() { + let server = MatrixMockServer::new().await; + let client = server.client_builder().build().await; + + let now = MilliSecondsSinceUnixEpoch::now(); + let f = EventFactory::new(); + + // Set up alice with a live beacon_info. + let beacon_info_event = f + .beacon_info( + Some("Alice location".to_owned()), + Duration::from_millis(300_000), + true, + Some(now), + ) + .event_id(event_id!("$alice_beacon_info")) + .sender(user_id!("@alice:localhost")) + .state_key(user_id!("@alice:localhost")) + .into_raw_sync_state(); + + server + .sync_room( + &client, + JoinedRoomBuilder::new(*DEFAULT_TEST_ROOM_ID).add_state_event(beacon_info_event), + ) + .await; + + let room = client.get_room(*DEFAULT_TEST_ROOM_ID).unwrap(); + let live_location_shares = room.live_location_shares().await; + let (initial, stream) = live_location_shares.subscribe(); + pin_mut!(stream); + + // Initial snapshot contains the beacon_info from state (no last_location yet). + assert_eq!(initial.len(), 1); + assert!(initial[0].last_location.is_none()); + + // Alice's first beacon — updates the existing share with last_location. + server + .sync_room( + &client, + JoinedRoomBuilder::new(*DEFAULT_TEST_ROOM_ID).add_timeline_event( + f.beacon(owned_event_id!("$alice_beacon_info"), 10.0, 20.0, 5, None) + .event_id(event_id!("$loc1")) + .sender(user_id!("@alice:localhost")) + .server_ts(now) + .into_raw_sync(), + ), + ) + .await; + + let diffs = stream.next().await.expect("Expected first location"); + let shares = diffs.into_iter().fold(initial, |mut v, diff| { + diff.apply(&mut v); + v + }); + assert_eq!(shares.len(), 1); + let share = &shares[0]; + assert_eq!(share.user_id, user_id!("@alice:localhost")); + let last_location = share.last_location.as_ref().expect("Expected last location"); + assert_eq!(last_location.location.uri, "geo:10,20;u=5"); + assert_eq!(share.beacon_info.description, Some("Alice location".to_owned())); + + // Alice's second beacon — already tracked, beacon_info is reused from cache. + server + .sync_room( + &client, + JoinedRoomBuilder::new(*DEFAULT_TEST_ROOM_ID).add_timeline_event( + f.beacon(owned_event_id!("$alice_beacon_info"), 30.0, 40.0, 10, None) + .event_id(event_id!("$loc2")) + .sender(user_id!("@alice:localhost")) + .server_ts(now) + .into_raw_sync(), + ), + ) + .await; + + let diffs = stream.next().await.expect("Expected second location update"); + let shares = diffs.into_iter().fold(shares, |mut v, diff| { + diff.apply(&mut v); + v + }); + assert_eq!(shares.len(), 1); + let LiveLocationShare { user_id, last_location, beacon_info, .. } = shares[0].clone(); + assert_eq!(user_id, user_id!("@alice:localhost")); + let last_location = last_location.expect("Expected last location"); + assert_eq!(last_location.location.uri, "geo:30,40;u=10"); + // beacon_info is preserved from the initial share — not re-fetched from state. + assert_eq!(beacon_info.description, Some("Alice location".to_owned())); +} + +#[async_test] +async fn test_beacon_info_stop_removes_user_from_stream() { + let server = MatrixMockServer::new().await; + let client = server.client_builder().build().await; + + let now = MilliSecondsSinceUnixEpoch::now(); + let f = EventFactory::new(); + + // Alice starts with a live beacon_info. + let beacon_info_event = f + .beacon_info(None, Duration::from_millis(300_000), true, Some(now)) + .event_id(event_id!("$alice_beacon_info")) + .sender(user_id!("@alice:localhost")) + .state_key(user_id!("@alice:localhost")) + .into_raw_sync_state(); + + server + .sync_room( + &client, + JoinedRoomBuilder::new(*DEFAULT_TEST_ROOM_ID).add_state_event(beacon_info_event), + ) + .await; + + let room = client.get_room(*DEFAULT_TEST_ROOM_ID).unwrap(); + let live_location_shares = room.live_location_shares().await; + let (initial, stream) = live_location_shares.subscribe(); + pin_mut!(stream); + + // Initial snapshot contains the beacon_info from state (no last_location yet). + assert_eq!(initial.len(), 1); + assert!(initial[0].last_location.is_none()); + + // Alice stops her share — beacon_info timeline event with live: false. + server + .sync_room( + &client, + JoinedRoomBuilder::new(*DEFAULT_TEST_ROOM_ID).add_timeline_event( + f.beacon_info(None, Duration::from_secs(300), false, None) + .event_id(event_id!("$alice_beacon_info_stop")) + .sender(user_id!("@alice:localhost")) + .state_key(user_id!("@alice:localhost")) + .into_raw_sync(), + ), + ) + .await; + + // Stream emits an update with an empty list — alice is removed. + let diffs = stream.next().await.expect("Expected share removal"); + let shares = diffs.into_iter().fold(initial, |mut v, diff| { + diff.apply(&mut v); + v + }); + assert!(shares.is_empty()); +} + +#[async_test] +async fn test_multiple_users_in_stream() { + let server = MatrixMockServer::new().await; + let client = server.client_builder().build().await; + + let now = MilliSecondsSinceUnixEpoch::now(); + let f = EventFactory::new(); + + // Both alice and bob have live beacon_infos. + let alice_beacon_info = f + .beacon_info(None, Duration::from_millis(300_000), true, Some(now)) + .event_id(event_id!("$alice_beacon_info")) + .sender(user_id!("@alice:localhost")) + .state_key(user_id!("@alice:localhost")) + .into_raw_sync_state(); + + let bob_beacon_info = f + .beacon_info(None, Duration::from_millis(300_000), true, Some(now)) + .event_id(event_id!("$bob_beacon_info")) + .sender(user_id!("@bob:localhost")) + .state_key(user_id!("@bob:localhost")) + .into_raw_sync_state(); + + server + .sync_room( + &client, + JoinedRoomBuilder::new(*DEFAULT_TEST_ROOM_ID) + .add_state_event(alice_beacon_info) + .add_state_event(bob_beacon_info), + ) + .await; + + let room = client.get_room(*DEFAULT_TEST_ROOM_ID).unwrap(); + let live_location_shares = room.live_location_shares().await; + let (initial, stream) = live_location_shares.subscribe(); + pin_mut!(stream); + + // Initial snapshot contains both alice and bob beacon_infos from state. + assert_eq!(initial.len(), 2); + + // Alice's beacon arrives — updates her existing share with last_location. + server + .sync_room( + &client, + JoinedRoomBuilder::new(*DEFAULT_TEST_ROOM_ID).add_timeline_event( + f.beacon(owned_event_id!("$alice_beacon_info"), 10.0, 20.0, 5, None) + .event_id(event_id!("$alice_loc")) + .sender(user_id!("@alice:localhost")) + .server_ts(now) + .into_raw_sync(), + ), + ) + .await; + + let diffs = stream.next().await.expect("Expected alice's location"); + let shares = diffs.into_iter().fold(initial, |mut v, diff| { + diff.apply(&mut v); + v + }); + assert_eq!(shares.len(), 2); + + // Bob's beacon arrives — updates his existing share with last_location. + server + .sync_room( + &client, + JoinedRoomBuilder::new(*DEFAULT_TEST_ROOM_ID).add_timeline_event( + f.beacon(owned_event_id!("$bob_beacon_info"), 50.0, 60.0, 8, None) + .event_id(event_id!("$bob_loc")) + .sender(user_id!("@bob:localhost")) + .server_ts(now) + .into_raw_sync(), + ), + ) + .await; + + let diffs = stream.next().await.expect("Expected both users"); + let shares = diffs.into_iter().fold(shares, |mut v, diff| { + diff.apply(&mut v); + v + }); + assert_eq!(shares.len(), 2); + let mut shares: Vec<_> = shares.into_iter().collect(); + shares.sort_by_key(|s| s.user_id.clone()); + + assert_eq!(shares[0].user_id, user_id!("@alice:localhost")); + assert_eq!( + shares[0].last_location.as_ref().expect("Expected last location").location.uri, + "geo:10,20;u=5" + ); + + assert_eq!(shares[1].user_id, user_id!("@bob:localhost")); + assert_eq!( + shares[1].last_location.as_ref().expect("Expected last location").location.uri, + "geo:50,60;u=8" + ); +} + +#[async_test] +async fn test_initial_load_contains_location_from_event_cache() { + let server = MatrixMockServer::new().await; + let client = server.client_builder().build().await; + + // Enable event cache BEFORE syncing so events get cached. + client.event_cache().subscribe().unwrap(); + + let now = MilliSecondsSinceUnixEpoch::now(); + let f = EventFactory::new(); + + // Alice has a live beacon_info. + let beacon_info_event = f + .beacon_info( + Some("Alice location".to_owned()), + Duration::from_millis(300_000), + true, + Some(now), + ) + .event_id(event_id!("$alice_beacon_info")) + .sender(user_id!("@alice:localhost")) + .state_key(user_id!("@alice:localhost")) + .into_raw_sync_state(); + + // A beacon event in the timeline. + let beacon_event = f + .beacon(owned_event_id!("$alice_beacon_info"), 48.8566, 2.3522, 10, Some(now)) + .event_id(event_id!("$alice_location")) + .sender(user_id!("@alice:localhost")) + .server_ts(now) + .into_raw_sync(); + + let mut event_cache_updates_stream = client.event_cache().subscribe_to_room_generic_updates(); + + server + .sync_room( + &client, + JoinedRoomBuilder::new(*DEFAULT_TEST_ROOM_ID) + .add_state_event(beacon_info_event) + .add_timeline_event(beacon_event), + ) + .await; + + // Wait for the event cache background task to commit the events before + // querying. + assert_let_timeout!(Ok(_) = event_cache_updates_stream.recv()); + + let room = client.get_room(*DEFAULT_TEST_ROOM_ID).unwrap(); + let live_location_shares = room.live_location_shares().await; + let (initial, _stream) = live_location_shares.subscribe(); + + // Initial snapshot should contain both beacon_info AND last_location. + assert_eq!(initial.len(), 1); + let share = &initial[0]; + assert_eq!(share.user_id, user_id!("@alice:localhost")); + assert_eq!(share.beacon_info.description, Some("Alice location".to_owned())); + + let last_location = + share.last_location.as_ref().expect("Expected last_location in initial load"); + assert_eq!(last_location.location.uri, "geo:48.8566,2.3522;u=10"); + assert_eq!(last_location.ts, now); +}