ffi: call the initial events in the same task that listens to updates

This avoids a race condition where the caller hasn't set up the initial
items or the listener, and the listener is called *before* the initial
items have been used.
This commit is contained in:
Benjamin Bouvier
2024-06-11 18:36:36 +02:00
parent 871116eee9
commit 2a79956caa

View File

@@ -133,24 +133,28 @@ impl Timeline {
#[uniffi::export(async_runtime = "tokio")]
impl Timeline {
pub async fn add_listener(
&self,
listener: Box<dyn TimelineListener>,
) -> RoomTimelineListenerResult {
pub async fn add_listener(&self, listener: Box<dyn TimelineListener>) -> Arc<TaskHandle> {
let (timeline_items, timeline_stream) = self.inner.subscribe_batched().await;
let timeline_stream = TaskHandle::new(RUNTIME.spawn(async move {
Arc::new(TaskHandle::new(RUNTIME.spawn(async move {
pin_mut!(timeline_stream);
// It's important that the initial items are passed *before* we forward the
// stream updates, with a guaranteed ordering. Otherwise, it could
// be that the listener be called before the initial items have been
// handled by the caller. See #3535 for details.
// First, pass all the items as a reset update.
listener.on_update(vec![Arc::new(TimelineDiff::new(VectorDiff::Reset {
values: timeline_items,
}))]);
// Then forward new items.
while let Some(diffs) = timeline_stream.next().await {
listener
.on_update(diffs.into_iter().map(|d| Arc::new(TimelineDiff::new(d))).collect());
}
}));
RoomTimelineListenerResult {
items: timeline_items.into_iter().map(TimelineItem::from_arc).collect(),
items_stream: Arc::new(timeline_stream),
}
})))
}
pub fn retry_decryption(self: Arc<Self>, session_ids: Vec<String>) {
@@ -671,12 +675,6 @@ pub enum FocusEventError {
Other { msg: String },
}
#[derive(uniffi::Record)]
pub struct RoomTimelineListenerResult {
pub items: Vec<Arc<TimelineItem>>,
pub items_stream: Arc<TaskHandle>,
}
#[uniffi::export(callback_interface)]
pub trait TimelineListener: Sync + Send {
fn on_update(&self, diff: Vec<Arc<TimelineDiff>>);