From 839d43ddf189618b108123e35f8cf1294d53ebac Mon Sep 17 00:00:00 2001 From: James Rich Date: Wed, 6 May 2026 14:24:17 -0500 Subject: [PATCH] Refactor SdkStateBridge bridges Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> --- .../core/data/radio/SdkEventBridge.kt | 60 +++++ .../core/data/radio/SdkNodeBridge.kt | 99 ++++++++ .../core/data/radio/SdkPacketBridge.kt | 118 +++++++++ .../core/data/radio/SdkStateBridge.kt | 227 +++--------------- .../core/data/radio/SdkTopologyBridge.kt | 51 ++++ 5 files changed, 356 insertions(+), 199 deletions(-) create mode 100644 core/data/src/commonMain/kotlin/org/meshtastic/core/data/radio/SdkEventBridge.kt create mode 100644 core/data/src/commonMain/kotlin/org/meshtastic/core/data/radio/SdkNodeBridge.kt create mode 100644 core/data/src/commonMain/kotlin/org/meshtastic/core/data/radio/SdkPacketBridge.kt create mode 100644 core/data/src/commonMain/kotlin/org/meshtastic/core/data/radio/SdkTopologyBridge.kt diff --git a/core/data/src/commonMain/kotlin/org/meshtastic/core/data/radio/SdkEventBridge.kt b/core/data/src/commonMain/kotlin/org/meshtastic/core/data/radio/SdkEventBridge.kt new file mode 100644 index 000000000..3c4b1c2f9 --- /dev/null +++ b/core/data/src/commonMain/kotlin/org/meshtastic/core/data/radio/SdkEventBridge.kt @@ -0,0 +1,60 @@ +/* + * Copyright (c) 2026 Meshtastic LLC + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see . + */ +package org.meshtastic.core.data.radio + +import co.touchlab.kermit.Logger +import kotlinx.coroutines.CoroutineScope +import kotlinx.coroutines.flow.emptyFlow +import kotlinx.coroutines.flow.flatMapLatest +import kotlinx.coroutines.flow.launchIn +import kotlinx.coroutines.flow.onEach +import org.meshtastic.core.repository.ServiceRepository +import org.meshtastic.proto.ClientNotification +import org.meshtastic.sdk.MeshEvent + +internal class SdkEventBridge( + private val serviceRepository: ServiceRepository, +) { + fun observe(accessor: RadioClientAccessor, scope: CoroutineScope) { + accessor.client + .flatMapLatest { client -> client?.events ?: emptyFlow() } + .onEach(::handleEvent) + .launchIn(scope) + } + + private fun handleEvent(event: MeshEvent) { + when (event) { + is MeshEvent.DeviceRebooted -> { + Logger.i { "[SdkBridge] Device rebooted" } + serviceRepository.setClientNotification( + ClientNotification(message = "Device rebooted"), + ) + } + + is MeshEvent.CongestionWarning -> { + Logger.w { + "[SdkBridge] Congestion warning: level=${event.metrics.level}, airUtil=${event.metrics.airUtilTx}%, channelUtil=${event.metrics.channelUtil}%" + } + serviceRepository.setCongestionLevel(event.metrics.level) + } + + is MeshEvent.SecurityWarning -> Logger.w { "[SdkBridge] Security warning: $event" } + is MeshEvent.PacketsDropped -> Logger.w { "[SdkBridge] Packets dropped: ${event.count} from ${event.flow}" } + else -> Logger.d { "[SdkBridge] Event: $event" } + } + } +} diff --git a/core/data/src/commonMain/kotlin/org/meshtastic/core/data/radio/SdkNodeBridge.kt b/core/data/src/commonMain/kotlin/org/meshtastic/core/data/radio/SdkNodeBridge.kt new file mode 100644 index 000000000..106a02f7d --- /dev/null +++ b/core/data/src/commonMain/kotlin/org/meshtastic/core/data/radio/SdkNodeBridge.kt @@ -0,0 +1,99 @@ +/* + * Copyright (c) 2026 Meshtastic LLC + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see . + */ +package org.meshtastic.core.data.radio + +import co.touchlab.kermit.Logger +import kotlinx.coroutines.CoroutineScope +import kotlinx.coroutines.flow.emptyFlow +import kotlinx.coroutines.flow.filter +import kotlinx.coroutines.flow.flatMapLatest +import kotlinx.coroutines.flow.flowOf +import kotlinx.coroutines.flow.launchIn +import kotlinx.coroutines.flow.onEach +import org.meshtastic.core.common.util.nowSeconds +import org.meshtastic.core.model.DataPacket +import org.meshtastic.core.model.util.onlineTimeThreshold +import org.meshtastic.core.repository.NodeRepository +import org.meshtastic.proto.PortNum +import org.meshtastic.sdk.NodeChange + +internal class SdkNodeBridge( + private val nodeRepository: NodeRepository, + private val topologyService: MeshTopologyService, +) { + fun observe(accessor: RadioClientAccessor, scope: CoroutineScope) { + accessor.client + .flatMapLatest { client -> client?.nodes ?: emptyFlow() } + .onEach(::handleNodeChange) + .launchIn(scope) + + accessor.client + .flatMapLatest { client -> client?.ownNode ?: flowOf(null) } + .onEach { ownNode -> if (ownNode != null) nodeRepository.setMyNodeNum(ownNode.num) } + .launchIn(scope) + + accessor.client + .flatMapLatest { client -> client?.packets ?: emptyFlow() } + .filter { it.decoded?.portnum == PortNum.NODE_STATUS_APP } + .onEach { packet -> + val status = packet.decoded?.payload?.utf8() ?: return@onEach + nodeRepository.updateNode(packet.from) { it.copy(nodeStatus = status) } + } + .launchIn(scope) + } + + private suspend fun handleNodeChange(change: NodeChange) { + when (change) { + is NodeChange.Snapshot -> { + nodeRepository.clear() + topologyService.clear() + change.nodes.forEach { (_, nodeInfo) -> + nodeRepository.installNodeInfo(nodeInfo, withBroadcast = false) + } + nodeRepository.setNodeDbReady(true) + } + + is NodeChange.Added -> nodeRepository.installNodeInfo(change.node, withBroadcast = true) + is NodeChange.Updated -> nodeRepository.installNodeInfo(change.node, withBroadcast = true) + is NodeChange.Removed -> nodeRepository.removeByNodenum(change.nodeId.raw) + is NodeChange.WentOffline -> handleWentOffline(change) + is NodeChange.CameOnline -> handleCameOnline(change) + } + } + + private fun handleWentOffline(change: NodeChange.WentOffline) { + val nodeNum = change.nodeId.raw + Logger.d { + "[SdkBridge] Node ${DataPacket.nodeNumToDefaultId(nodeNum)} went offline (last heard: ${change.lastHeard})" + } + if (nodeRepository.nodeDBbyNodeNum.containsKey(nodeNum)) { + nodeRepository.updateNode(nodeNum) { node -> + node.copy(lastHeard = minOf(node.lastHeard, change.lastHeard, onlineTimeThreshold())) + } + } + } + + private fun handleCameOnline(change: NodeChange.CameOnline) { + val nodeNum = change.nodeId.raw + Logger.d { "[SdkBridge] Node ${DataPacket.nodeNumToDefaultId(nodeNum)} came online" } + if (nodeRepository.nodeDBbyNodeNum.containsKey(nodeNum)) { + nodeRepository.updateNode(nodeNum) { node -> + node.copy(lastHeard = maxOf(node.lastHeard, nowSeconds.toInt())) + } + } + } +} diff --git a/core/data/src/commonMain/kotlin/org/meshtastic/core/data/radio/SdkPacketBridge.kt b/core/data/src/commonMain/kotlin/org/meshtastic/core/data/radio/SdkPacketBridge.kt new file mode 100644 index 000000000..97e5b4aa7 --- /dev/null +++ b/core/data/src/commonMain/kotlin/org/meshtastic/core/data/radio/SdkPacketBridge.kt @@ -0,0 +1,118 @@ +/* + * Copyright (c) 2026 Meshtastic LLC + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see . + */ +package org.meshtastic.core.data.radio + +import co.touchlab.kermit.Logger +import kotlinx.coroutines.CoroutineScope +import kotlinx.coroutines.flow.emptyFlow +import kotlinx.coroutines.flow.flatMapLatest +import kotlinx.coroutines.flow.flowOf +import kotlinx.coroutines.flow.launchIn +import kotlinx.coroutines.flow.map +import kotlinx.coroutines.flow.onEach +import org.meshtastic.core.model.DataPacket +import org.meshtastic.core.model.MessageStatus +import org.meshtastic.core.repository.NodeRepository +import org.meshtastic.core.repository.PacketRepository +import org.meshtastic.core.repository.ServiceRepository +import org.meshtastic.sdk.StoreForwardEvent + +internal class SdkPacketBridge( + private val serviceRepository: ServiceRepository, + private val packetRepository: Lazy, + private val nodeRepository: NodeRepository, +) { + fun observe(accessor: RadioClientAccessor, scope: CoroutineScope) { + accessor.client + .flatMapLatest { client -> client?.packets ?: emptyFlow() } + .onEach { packet -> serviceRepository.emitMeshPacket(packet) } + .launchIn(scope) + + accessor.client + .flatMapLatest { client -> + client?.storeForward?.servers + ?.map { servers -> servers.map { it.raw } } + ?: flowOf(emptyList()) + } + .onEach { servers -> serviceRepository.setStoreForwardServers(servers) } + .launchIn(scope) + + accessor.client + .flatMapLatest { client -> client?.storeForward?.events ?: emptyFlow() } + .onEach(::handleStoreForwardEvent) + .launchIn(scope) + } + + private suspend fun handleStoreForwardEvent(event: StoreForwardEvent) { + when (event) { + is StoreForwardEvent.ServerDiscovered -> { + Logger.i { + "[SdkBridge] S&F server discovered: ${DataPacket.nodeNumToDefaultId(event.nodeId.raw)}" + } + } + + is StoreForwardEvent.ServerLost -> { + Logger.i { + "[SdkBridge] S&F server lost: ${DataPacket.nodeNumToDefaultId(event.nodeId.raw)}" + } + } + + is StoreForwardEvent.HistoryReplayStarted -> { + Logger.i { + "[SdkBridge] S&F history replay started from " + + "${DataPacket.nodeNumToDefaultId(event.server.raw)} count=${event.messageCount}" + } + } + + is StoreForwardEvent.HistoryReplayComplete -> { + Logger.i { + "[SdkBridge] S&F history replay complete from " + + "${DataPacket.nodeNumToDefaultId(event.server.raw)} delivered=${event.delivered}" + } + } + + is StoreForwardEvent.Heartbeat -> { + Logger.d { + "[SdkBridge] S&F heartbeat from ${DataPacket.nodeNumToDefaultId(event.server.raw)}" + } + } + + is StoreForwardEvent.SfppLinkProvided -> { + event.messageHash?.let { hash -> + val status = if (event.confirmed) MessageStatus.SFPP_CONFIRMED else MessageStatus.SFPP_ROUTING + packetRepository.value.updateSFPPStatus( + packetId = event.packetId, + from = event.from, + to = event.to, + hash = hash, + status = status, + rxTime = 0L, + myNodeNum = nodeRepository.myNodeNum.value ?: 0, + ) + } + } + + is StoreForwardEvent.SfppCanonAnnounced -> { + packetRepository.value.updateSFPPStatusByHash( + hash = event.messageHash, + status = MessageStatus.SFPP_CONFIRMED, + rxTime = event.rxTime, + ) + } + } + } +} diff --git a/core/data/src/commonMain/kotlin/org/meshtastic/core/data/radio/SdkStateBridge.kt b/core/data/src/commonMain/kotlin/org/meshtastic/core/data/radio/SdkStateBridge.kt index dde0c355c..a506394f5 100644 --- a/core/data/src/commonMain/kotlin/org/meshtastic/core/data/radio/SdkStateBridge.kt +++ b/core/data/src/commonMain/kotlin/org/meshtastic/core/data/radio/SdkStateBridge.kt @@ -20,28 +20,22 @@ import co.touchlab.kermit.Logger import kotlinx.coroutines.CoroutineScope import kotlinx.coroutines.Job import kotlinx.coroutines.SupervisorJob -import kotlinx.coroutines.flow.emptyFlow -import kotlinx.coroutines.flow.filter import kotlinx.coroutines.flow.flatMapLatest import kotlinx.coroutines.flow.flowOf import kotlinx.coroutines.flow.launchIn -import kotlinx.coroutines.flow.map import kotlinx.coroutines.flow.onEach import kotlinx.coroutines.launch import org.koin.core.annotation.Single -import org.meshtastic.core.common.util.nowSeconds import org.meshtastic.core.di.CoroutineDispatchers import org.meshtastic.core.model.ConnectionState as AppConnectionState import org.meshtastic.core.model.DataPacket import org.meshtastic.core.model.MessageStatus -import org.meshtastic.core.model.util.onlineTimeThreshold import org.meshtastic.core.model.service.ServiceAction import org.meshtastic.core.repository.MeshLocationManager import org.meshtastic.core.repository.NodeRepository import org.meshtastic.core.repository.PacketRepository import org.meshtastic.core.repository.ServiceRepository import org.meshtastic.core.repository.UiPrefs -import org.meshtastic.proto.ClientNotification import org.meshtastic.proto.DeviceMetrics import org.meshtastic.proto.PortNum import org.meshtastic.proto.Position @@ -49,11 +43,7 @@ import org.meshtastic.proto.User import org.meshtastic.sdk.AdminResult import org.meshtastic.sdk.ChannelIndex import org.meshtastic.sdk.ConnectionState as SdkConnectionState -import org.meshtastic.sdk.MeshEvent -import org.meshtastic.sdk.NeighborInfo -import org.meshtastic.sdk.NodeChange import org.meshtastic.sdk.NodeId -import org.meshtastic.sdk.StoreForwardEvent /** * Bridges SDK reactive flows into the repository layer and routes [ServiceAction]s @@ -62,215 +52,59 @@ import org.meshtastic.sdk.StoreForwardEvent * The SDK owns the transport and all state; this bridge maps SDK emissions into [ServiceRepository] * and [NodeRepository] so that existing feature-module UI code (which observes those repositories) * continues to work without modification. - * - * **Lifecycle:** Created as a Koin `@Single`. Automatically subscribes to [RadioClientAccessor.client] - * and starts/stops collection as clients come and go. */ @Single -@Suppress("TooManyFunctions") class SdkStateBridge( private val accessor: RadioClientAccessor, private val serviceRepository: ServiceRepository, private val nodeRepository: NodeRepository, private val packetRepository: Lazy, private val locationManager: MeshLocationManager, - private val topologyService: MeshTopologyService, + topologyService: MeshTopologyService, private val uiPrefs: UiPrefs, private val dispatchers: CoroutineDispatchers, ) { private val scope = CoroutineScope(SupervisorJob() + dispatchers.default) private var locationJob: Job? = null + private val nodeBridge = SdkNodeBridge(nodeRepository = nodeRepository, topologyService = topologyService) + private val packetBridge = SdkPacketBridge( + serviceRepository = serviceRepository, + packetRepository = packetRepository, + nodeRepository = nodeRepository, + ) + private val topologyBridge = SdkTopologyBridge(topologyService = topologyService) + private val eventBridge = SdkEventBridge(serviceRepository = serviceRepository) + init { - startBridge() + bind() } - private fun startBridge() { + private fun bind() { + bindConnectionState() + nodeBridge.observe(accessor, scope) + packetBridge.observe(accessor, scope) + topologyBridge.observe(accessor, scope) + eventBridge.observe(accessor, scope) + bindServiceActions() + bindLocationPublishing() + Logger.i { "SdkStateBridge started — SDK owns transport + ServiceAction dispatch" } + } - // ── Connection state ──────────────────────────────────────────────── + private fun bindConnectionState() { accessor.client .flatMapLatest { client -> client?.connection ?: flowOf(SdkConnectionState.Disconnected) } .onEach { sdkState -> serviceRepository.setConnectionState(mapConnectionState(sdkState)) } .launchIn(scope) + } - // ── Node updates (position, telemetry, user all included in NodeInfo) ─ - accessor.client - .flatMapLatest { client -> client?.nodes ?: flowOf() } - .onEach { change -> - when (change) { - is NodeChange.Snapshot -> { - nodeRepository.clear() - topologyService.clear() - change.nodes.forEach { (_, nodeInfo) -> - nodeRepository.installNodeInfo(nodeInfo, withBroadcast = false) - } - nodeRepository.setNodeDbReady(true) - } - is NodeChange.Added -> nodeRepository.installNodeInfo(change.node, withBroadcast = true) - is NodeChange.Updated -> nodeRepository.installNodeInfo(change.node, withBroadcast = true) - is NodeChange.Removed -> nodeRepository.removeByNodenum(change.nodeId.raw) - is NodeChange.WentOffline -> { - val nodeNum = change.nodeId.raw - Logger.d { - "[SdkBridge] Node ${DataPacket.nodeNumToDefaultId(nodeNum)} went offline (last heard: ${change.lastHeard})" - } - if (nodeRepository.nodeDBbyNodeNum.containsKey(nodeNum)) { - nodeRepository.updateNode(nodeNum) { node -> - node.copy(lastHeard = minOf(node.lastHeard, change.lastHeard, onlineTimeThreshold())) - } - } - } - is NodeChange.CameOnline -> { - val nodeNum = change.nodeId.raw - Logger.d { "[SdkBridge] Node ${DataPacket.nodeNumToDefaultId(nodeNum)} came online" } - if (nodeRepository.nodeDBbyNodeNum.containsKey(nodeNum)) { - nodeRepository.updateNode(nodeNum) { node -> - node.copy(lastHeard = maxOf(node.lastHeard, nowSeconds.toInt())) - } - } - } - } - } - .launchIn(scope) - - // ── Own node identity ─────────────────────────────────────────────── - accessor.client - .flatMapLatest { client -> client?.ownNode ?: flowOf(null) } - .onEach { ownNode -> if (ownNode != null) nodeRepository.setMyNodeNum(ownNode.num) } - .launchIn(scope) - - // ── Raw packet forward (for RadioConfigViewModel + TAK) ───────────── - accessor.client - .flatMapLatest { client -> client?.packets ?: flowOf() } - .onEach { packet -> serviceRepository.emitMeshPacket(packet) } - .launchIn(scope) - - // ── Topology: ingest NeighborInfo packets into MeshTopology graph ──── - accessor.client - .flatMapLatest { client -> client?.packets ?: flowOf() } - .filter { it.decoded?.portnum == PortNum.NEIGHBORINFO_APP } - .onEach { packet -> - val payload = packet.decoded?.payload?.toByteArray() ?: return@onEach - runCatching { - val proto = org.meshtastic.proto.NeighborInfo.ADAPTER.decode(payload) - val info = NeighborInfo.fromProto( - reportingNode = packet.from, - neighborNodeIds = proto.neighbors.map { it.node_id }, - snrValues = proto.neighbors.map { it.snr }, - timestamp = proto.last_sent_by_id, - ) - topologyService.ingestNeighborInfo(info) - }.onFailure { e -> Logger.w(e) { "[SdkBridge] Failed to parse NeighborInfo" } } - } - .launchIn(scope) - - // ── NODE_STATUS_APP: update node status string ─────────────────────── - accessor.client - .flatMapLatest { client -> client?.packets ?: flowOf() } - .filter { it.decoded?.portnum == PortNum.NODE_STATUS_APP } - .onEach { packet -> - val status = packet.decoded?.payload?.utf8() ?: return@onEach - nodeRepository.updateNode(packet.from) { it.copy(nodeStatus = status) } - } - .launchIn(scope) - - // ── Events (notifications, security, backpressure) ────────────────── - accessor.client - .flatMapLatest { client -> client?.events ?: flowOf() } - .onEach { event -> - when (event) { - is MeshEvent.DeviceRebooted -> { - Logger.i { "[SdkBridge] Device rebooted" } - serviceRepository.setClientNotification( - ClientNotification(message = "Device rebooted"), - ) - } - is MeshEvent.CongestionWarning -> { - Logger.w { - "[SdkBridge] Congestion warning: level=${event.metrics.level}, airUtil=${event.metrics.airUtilTx}%, channelUtil=${event.metrics.channelUtil}%" - } - serviceRepository.setCongestionLevel(event.metrics.level) - } - is MeshEvent.SecurityWarning -> Logger.w { "[SdkBridge] Security warning: $event" } - is MeshEvent.PacketsDropped -> Logger.w { "[SdkBridge] Packets dropped: ${event.count} from ${event.flow}" } - else -> Logger.d { "[SdkBridge] Event: $event" } - } - } - .launchIn(scope) - - // ── Store-and-Forward (server discovery + replay lifecycle) ─────────── - accessor.client - .flatMapLatest { client -> - client?.storeForward?.servers - ?.map { servers -> servers.map { it.raw } } - ?: flowOf(emptyList()) - } - .onEach { servers -> serviceRepository.setStoreForwardServers(servers) } - .launchIn(scope) - - accessor.client - .flatMapLatest { client -> client?.storeForward?.events ?: emptyFlow() } - .onEach { event -> - when (event) { - is StoreForwardEvent.ServerDiscovered -> { - Logger.i { - "[SdkBridge] S&F server discovered: ${DataPacket.nodeNumToDefaultId(event.nodeId.raw)}" - } - } - is StoreForwardEvent.ServerLost -> { - Logger.i { - "[SdkBridge] S&F server lost: ${DataPacket.nodeNumToDefaultId(event.nodeId.raw)}" - } - } - is StoreForwardEvent.HistoryReplayStarted -> { - Logger.i { - "[SdkBridge] S&F history replay started from " + - "${DataPacket.nodeNumToDefaultId(event.server.raw)} count=${event.messageCount}" - } - } - is StoreForwardEvent.HistoryReplayComplete -> { - Logger.i { - "[SdkBridge] S&F history replay complete from " + - "${DataPacket.nodeNumToDefaultId(event.server.raw)} delivered=${event.delivered}" - } - } - is StoreForwardEvent.Heartbeat -> { - Logger.d { - "[SdkBridge] S&F heartbeat from ${DataPacket.nodeNumToDefaultId(event.server.raw)}" - } - } - is StoreForwardEvent.SfppLinkProvided -> { - event.messageHash?.let { hash -> - val status = if (event.confirmed) MessageStatus.SFPP_CONFIRMED else MessageStatus.SFPP_ROUTING - packetRepository.value.updateSFPPStatus( - packetId = event.packetId, - from = event.from, - to = event.to, - hash = hash, - status = status, - rxTime = 0L, - myNodeNum = nodeRepository.myNodeNum.value ?: 0, - ) - } - } - is StoreForwardEvent.SfppCanonAnnounced -> { - packetRepository.value.updateSFPPStatusByHash( - hash = event.messageHash, - status = MessageStatus.SFPP_CONFIRMED, - rxTime = event.rxTime, - ) - } - else -> Logger.d { "[SdkBridge] S&F event: $event" } - } - } - .launchIn(scope) - - // ── ServiceAction routing (replaces MeshServiceOrchestrator dispatch) ─ + private fun bindServiceActions() { serviceRepository.serviceAction .onEach { action -> handleServiceAction(action) } .launchIn(scope) + } - // ── Location publishing ───────────────────────────────────────────── + private fun bindLocationPublishing() { accessor.client .flatMapLatest { client -> client?.ownNode ?: flowOf(null) } .onEach { ownNode -> @@ -282,9 +116,9 @@ class SdkStateBridge( if (shouldProvide) { locationManager.start(scope) { pos -> scope.launch { - val c = accessor.client.value ?: return@launch + val client = accessor.client.value ?: return@launch val posBytes = org.meshtastic.proto.Position.ADAPTER.encode(pos) - c.send( + client.send( portnum = PortNum.POSITION_APP, payload = posBytes, wantAck = false, @@ -299,12 +133,8 @@ class SdkStateBridge( } } .launchIn(scope) - - Logger.i { "SdkStateBridge started — SDK owns transport + ServiceAction dispatch" } } - // ── ServiceAction handling ─────────────────────────────────────────────── - private suspend fun handleServiceAction(action: ServiceAction) { val client = accessor.client.value if (client == null) { @@ -341,7 +171,6 @@ class SdkStateBridge( if (result.isSuccess) { nodeRepository.updateNode(node.num) { n -> if (newIgnored) { - // Mirror firmware behavior: wipe position, device_metrics, zero public_key n.copy( isIgnored = true, position = Position(), diff --git a/core/data/src/commonMain/kotlin/org/meshtastic/core/data/radio/SdkTopologyBridge.kt b/core/data/src/commonMain/kotlin/org/meshtastic/core/data/radio/SdkTopologyBridge.kt new file mode 100644 index 000000000..cfe312444 --- /dev/null +++ b/core/data/src/commonMain/kotlin/org/meshtastic/core/data/radio/SdkTopologyBridge.kt @@ -0,0 +1,51 @@ +/* + * Copyright (c) 2026 Meshtastic LLC + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see . + */ +package org.meshtastic.core.data.radio + +import co.touchlab.kermit.Logger +import kotlinx.coroutines.CoroutineScope +import kotlinx.coroutines.flow.emptyFlow +import kotlinx.coroutines.flow.filter +import kotlinx.coroutines.flow.flatMapLatest +import kotlinx.coroutines.flow.launchIn +import kotlinx.coroutines.flow.onEach +import org.meshtastic.proto.PortNum +import org.meshtastic.sdk.NeighborInfo + +internal class SdkTopologyBridge( + private val topologyService: MeshTopologyService, +) { + fun observe(accessor: RadioClientAccessor, scope: CoroutineScope) { + accessor.client + .flatMapLatest { client -> client?.packets ?: emptyFlow() } + .filter { it.decoded?.portnum == PortNum.NEIGHBORINFO_APP } + .onEach { packet -> + val payload = packet.decoded?.payload?.toByteArray() ?: return@onEach + runCatching { + val proto = org.meshtastic.proto.NeighborInfo.ADAPTER.decode(payload) + val info = NeighborInfo.fromProto( + reportingNode = packet.from, + neighborNodeIds = proto.neighbors.map { it.node_id }, + snrValues = proto.neighbors.map { it.snr }, + timestamp = proto.last_sent_by_id, + ) + topologyService.ingestNeighborInfo(info) + }.onFailure { e -> Logger.w(e) { "[SdkBridge] Failed to parse NeighborInfo" } } + } + .launchIn(scope) + } +}