Revert "feat(ffi): add subscribe_and_add_timeline_listener helper fn"

This reverts commit ba4e304cad.
This commit is contained in:
Damir Jelić
2023-01-13 14:39:37 +01:00
parent ece9252a3d
commit 386cbce344
2 changed files with 23 additions and 59 deletions

View File

@@ -116,7 +116,6 @@ interface SlidingSyncView {
};
interface SlidingSyncRoom {
StoppableSpawn? subscribe_and_add_timeline_listener(TimelineListener listener, RoomSubscription? settings);
StoppableSpawn? add_timeline_listener(TimelineListener listener);
};

View File

@@ -26,24 +26,17 @@ use crate::{
TimelineListener,
};
type StoppableSpawnCallback = Box<dyn FnOnce() + Send + Sync>;
pub struct StoppableSpawn {
handle: Arc<RwLock<Option<JoinHandle<()>>>>,
callback: Arc<RwLock<Option<StoppableSpawnCallback>>>,
}
impl StoppableSpawn {
fn with_handle(handle: JoinHandle<()>) -> StoppableSpawn {
StoppableSpawn { handle: Arc::new(RwLock::new(Some(handle))), callback: Default::default() }
StoppableSpawn { handle: Arc::new(RwLock::new(Some(handle))) }
}
fn with_handle_ref(handle: Arc<RwLock<Option<JoinHandle<()>>>>) -> StoppableSpawn {
StoppableSpawn { handle, callback: Default::default() }
}
fn set_callback(&mut self, f: StoppableSpawnCallback) {
*self.callback.write().unwrap() = Some(f)
StoppableSpawn { handle }
}
}
@@ -53,21 +46,12 @@ impl StoppableSpawn {
if let Some(handle) = self.handle.write().unwrap().take() {
handle.abort();
}
if let Some(callback) = self.callback.write().unwrap().take() {
callback();
}
}
pub fn is_cancelled(&self) -> bool {
self.handle.read().unwrap().is_none()
}
}
impl Drop for StoppableSpawn {
fn drop(&mut self) {
self.cancel();
}
}
#[derive(uniffi::Object)]
pub struct UnreadNotificationsCount {
highlight_count: u32,
@@ -105,7 +89,6 @@ impl From<RumaUnreadNotificationsCount> for UnreadNotificationsCount {
pub struct SlidingSyncRoom {
inner: matrix_sdk::SlidingSyncRoom,
timeline: TimelineLock,
runner: matrix_sdk::SlidingSync,
client: Client,
}
@@ -156,26 +139,6 @@ impl SlidingSyncRoom {
&self,
listener: Box<dyn TimelineListener>,
) -> Option<Arc<StoppableSpawn>> {
Some(Arc::new(self.add_timeline_listener_inner(listener)?))
}
pub fn subscribe_and_add_timeline_listener(
&self,
listener: Box<dyn TimelineListener>,
settings: Option<RoomSubscription>,
) -> Option<Arc<StoppableSpawn>> {
let mut spawner = self.add_timeline_listener_inner(listener)?;
let room_id = self.inner.room_id().clone();
self.runner.subscribe(room_id.clone(), settings.map(Into::into));
let runner = self.runner.clone();
spawner.set_callback(Box::new(move || runner.unsubscribe(room_id)));
Some(Arc::new(spawner))
}
fn add_timeline_listener_inner(
&self,
listener: Box<dyn TimelineListener>,
) -> Option<StoppableSpawn> {
let mut timeline_lock = self.timeline.write().unwrap();
let timeline_signal = match &*timeline_lock {
Some(timeline) => timeline.signal(),
@@ -192,17 +155,19 @@ impl SlidingSyncRoom {
};
let listener: Arc<dyn TimelineListener> = listener.into();
Some(StoppableSpawn::with_handle(RUNTIME.spawn(timeline_signal.for_each(move |diff| {
let listener = listener.clone();
let fut = RUNTIME
.spawn_blocking(move || listener.on_update(Arc::new(TimelineDiff::new(diff))));
Some(Arc::new(StoppableSpawn::with_handle(RUNTIME.spawn(timeline_signal.for_each(
move |diff| {
let listener = listener.clone();
let fut = RUNTIME
.spawn_blocking(move || listener.on_update(Arc::new(TimelineDiff::new(diff))));
async move {
if let Err(e) = fut.await {
error!("Timeline listener error: {e}");
async move {
if let Err(e) = fut.await {
error!("Timeline listener error: {e}");
}
}
}
}))))
},
)))))
}
}
@@ -223,14 +188,15 @@ pub struct RoomSubscription {
pub timeline_limit: Option<u32>,
}
impl From<RoomSubscription> for RumaRoomSubscription {
fn from(val: RoomSubscription) -> Self {
assign!(RumaRoomSubscription::default(), {
required_state: val.required_state.map(|r|
impl TryInto<RumaRoomSubscription> for RoomSubscription {
type Error = anyhow::Error;
fn try_into(self) -> anyhow::Result<RumaRoomSubscription> {
Ok(assign!(RumaRoomSubscription::default(), {
required_state: self.required_state.map(|r|
r.into_iter().map(|s| (s.key.into(), s.value)).collect()
).unwrap_or_default(),
timeline_limit: val.timeline_limit.map(|u| u.into())
})
timeline_limit: self.timeline_limit.map(|u| u.into())
}))
}
}
@@ -586,7 +552,9 @@ impl SlidingSync {
room_id: String,
settings: Option<RoomSubscription>,
) -> anyhow::Result<()> {
self.inner.subscribe(room_id.try_into()?, settings.map(Into::into));
let settings =
if let Some(settings) = settings { Some(settings.try_into()?) } else { None };
self.inner.subscribe(room_id.try_into()?, settings);
Ok(())
}
@@ -596,11 +564,9 @@ impl SlidingSync {
}
pub fn get_room(&self, room_id: String) -> anyhow::Result<Option<Arc<SlidingSyncRoom>>> {
let runner = self.inner.clone();
Ok(self.inner.get_room(OwnedRoomId::try_from(room_id)?).map(|inner| {
Arc::new(SlidingSyncRoom {
inner,
runner,
client: self.client.clone(),
timeline: Default::default(),
})
@@ -623,7 +589,6 @@ impl SlidingSync {
o.map(|inner| {
Arc::new(SlidingSyncRoom {
inner,
runner: self.inner.clone(),
client: self.client.clone(),
timeline: Default::default(),
})