Refactor SdkStateBridge bridges

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
This commit is contained in:
James Rich
2026-05-06 14:24:17 -05:00
parent 4340fc5045
commit 839d43ddf1
5 changed files with 356 additions and 199 deletions

View File

@@ -0,0 +1,60 @@
/*
* Copyright (c) 2026 Meshtastic LLC
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*/
package org.meshtastic.core.data.radio
import co.touchlab.kermit.Logger
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.flow.emptyFlow
import kotlinx.coroutines.flow.flatMapLatest
import kotlinx.coroutines.flow.launchIn
import kotlinx.coroutines.flow.onEach
import org.meshtastic.core.repository.ServiceRepository
import org.meshtastic.proto.ClientNotification
import org.meshtastic.sdk.MeshEvent
internal class SdkEventBridge(
private val serviceRepository: ServiceRepository,
) {
fun observe(accessor: RadioClientAccessor, scope: CoroutineScope) {
accessor.client
.flatMapLatest { client -> client?.events ?: emptyFlow() }
.onEach(::handleEvent)
.launchIn(scope)
}
private fun handleEvent(event: MeshEvent) {
when (event) {
is MeshEvent.DeviceRebooted -> {
Logger.i { "[SdkBridge] Device rebooted" }
serviceRepository.setClientNotification(
ClientNotification(message = "Device rebooted"),
)
}
is MeshEvent.CongestionWarning -> {
Logger.w {
"[SdkBridge] Congestion warning: level=${event.metrics.level}, airUtil=${event.metrics.airUtilTx}%, channelUtil=${event.metrics.channelUtil}%"
}
serviceRepository.setCongestionLevel(event.metrics.level)
}
is MeshEvent.SecurityWarning -> Logger.w { "[SdkBridge] Security warning: $event" }
is MeshEvent.PacketsDropped -> Logger.w { "[SdkBridge] Packets dropped: ${event.count} from ${event.flow}" }
else -> Logger.d { "[SdkBridge] Event: $event" }
}
}
}

View File

@@ -0,0 +1,99 @@
/*
* Copyright (c) 2026 Meshtastic LLC
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*/
package org.meshtastic.core.data.radio
import co.touchlab.kermit.Logger
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.flow.emptyFlow
import kotlinx.coroutines.flow.filter
import kotlinx.coroutines.flow.flatMapLatest
import kotlinx.coroutines.flow.flowOf
import kotlinx.coroutines.flow.launchIn
import kotlinx.coroutines.flow.onEach
import org.meshtastic.core.common.util.nowSeconds
import org.meshtastic.core.model.DataPacket
import org.meshtastic.core.model.util.onlineTimeThreshold
import org.meshtastic.core.repository.NodeRepository
import org.meshtastic.proto.PortNum
import org.meshtastic.sdk.NodeChange
internal class SdkNodeBridge(
private val nodeRepository: NodeRepository,
private val topologyService: MeshTopologyService,
) {
fun observe(accessor: RadioClientAccessor, scope: CoroutineScope) {
accessor.client
.flatMapLatest { client -> client?.nodes ?: emptyFlow() }
.onEach(::handleNodeChange)
.launchIn(scope)
accessor.client
.flatMapLatest { client -> client?.ownNode ?: flowOf(null) }
.onEach { ownNode -> if (ownNode != null) nodeRepository.setMyNodeNum(ownNode.num) }
.launchIn(scope)
accessor.client
.flatMapLatest { client -> client?.packets ?: emptyFlow() }
.filter { it.decoded?.portnum == PortNum.NODE_STATUS_APP }
.onEach { packet ->
val status = packet.decoded?.payload?.utf8() ?: return@onEach
nodeRepository.updateNode(packet.from) { it.copy(nodeStatus = status) }
}
.launchIn(scope)
}
private suspend fun handleNodeChange(change: NodeChange) {
when (change) {
is NodeChange.Snapshot -> {
nodeRepository.clear()
topologyService.clear()
change.nodes.forEach { (_, nodeInfo) ->
nodeRepository.installNodeInfo(nodeInfo, withBroadcast = false)
}
nodeRepository.setNodeDbReady(true)
}
is NodeChange.Added -> nodeRepository.installNodeInfo(change.node, withBroadcast = true)
is NodeChange.Updated -> nodeRepository.installNodeInfo(change.node, withBroadcast = true)
is NodeChange.Removed -> nodeRepository.removeByNodenum(change.nodeId.raw)
is NodeChange.WentOffline -> handleWentOffline(change)
is NodeChange.CameOnline -> handleCameOnline(change)
}
}
private fun handleWentOffline(change: NodeChange.WentOffline) {
val nodeNum = change.nodeId.raw
Logger.d {
"[SdkBridge] Node ${DataPacket.nodeNumToDefaultId(nodeNum)} went offline (last heard: ${change.lastHeard})"
}
if (nodeRepository.nodeDBbyNodeNum.containsKey(nodeNum)) {
nodeRepository.updateNode(nodeNum) { node ->
node.copy(lastHeard = minOf(node.lastHeard, change.lastHeard, onlineTimeThreshold()))
}
}
}
private fun handleCameOnline(change: NodeChange.CameOnline) {
val nodeNum = change.nodeId.raw
Logger.d { "[SdkBridge] Node ${DataPacket.nodeNumToDefaultId(nodeNum)} came online" }
if (nodeRepository.nodeDBbyNodeNum.containsKey(nodeNum)) {
nodeRepository.updateNode(nodeNum) { node ->
node.copy(lastHeard = maxOf(node.lastHeard, nowSeconds.toInt()))
}
}
}
}

View File

@@ -0,0 +1,118 @@
/*
* Copyright (c) 2026 Meshtastic LLC
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*/
package org.meshtastic.core.data.radio
import co.touchlab.kermit.Logger
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.flow.emptyFlow
import kotlinx.coroutines.flow.flatMapLatest
import kotlinx.coroutines.flow.flowOf
import kotlinx.coroutines.flow.launchIn
import kotlinx.coroutines.flow.map
import kotlinx.coroutines.flow.onEach
import org.meshtastic.core.model.DataPacket
import org.meshtastic.core.model.MessageStatus
import org.meshtastic.core.repository.NodeRepository
import org.meshtastic.core.repository.PacketRepository
import org.meshtastic.core.repository.ServiceRepository
import org.meshtastic.sdk.StoreForwardEvent
internal class SdkPacketBridge(
private val serviceRepository: ServiceRepository,
private val packetRepository: Lazy<PacketRepository>,
private val nodeRepository: NodeRepository,
) {
fun observe(accessor: RadioClientAccessor, scope: CoroutineScope) {
accessor.client
.flatMapLatest { client -> client?.packets ?: emptyFlow() }
.onEach { packet -> serviceRepository.emitMeshPacket(packet) }
.launchIn(scope)
accessor.client
.flatMapLatest { client ->
client?.storeForward?.servers
?.map { servers -> servers.map { it.raw } }
?: flowOf(emptyList())
}
.onEach { servers -> serviceRepository.setStoreForwardServers(servers) }
.launchIn(scope)
accessor.client
.flatMapLatest { client -> client?.storeForward?.events ?: emptyFlow() }
.onEach(::handleStoreForwardEvent)
.launchIn(scope)
}
private suspend fun handleStoreForwardEvent(event: StoreForwardEvent) {
when (event) {
is StoreForwardEvent.ServerDiscovered -> {
Logger.i {
"[SdkBridge] S&F server discovered: ${DataPacket.nodeNumToDefaultId(event.nodeId.raw)}"
}
}
is StoreForwardEvent.ServerLost -> {
Logger.i {
"[SdkBridge] S&F server lost: ${DataPacket.nodeNumToDefaultId(event.nodeId.raw)}"
}
}
is StoreForwardEvent.HistoryReplayStarted -> {
Logger.i {
"[SdkBridge] S&F history replay started from " +
"${DataPacket.nodeNumToDefaultId(event.server.raw)} count=${event.messageCount}"
}
}
is StoreForwardEvent.HistoryReplayComplete -> {
Logger.i {
"[SdkBridge] S&F history replay complete from " +
"${DataPacket.nodeNumToDefaultId(event.server.raw)} delivered=${event.delivered}"
}
}
is StoreForwardEvent.Heartbeat -> {
Logger.d {
"[SdkBridge] S&F heartbeat from ${DataPacket.nodeNumToDefaultId(event.server.raw)}"
}
}
is StoreForwardEvent.SfppLinkProvided -> {
event.messageHash?.let { hash ->
val status = if (event.confirmed) MessageStatus.SFPP_CONFIRMED else MessageStatus.SFPP_ROUTING
packetRepository.value.updateSFPPStatus(
packetId = event.packetId,
from = event.from,
to = event.to,
hash = hash,
status = status,
rxTime = 0L,
myNodeNum = nodeRepository.myNodeNum.value ?: 0,
)
}
}
is StoreForwardEvent.SfppCanonAnnounced -> {
packetRepository.value.updateSFPPStatusByHash(
hash = event.messageHash,
status = MessageStatus.SFPP_CONFIRMED,
rxTime = event.rxTime,
)
}
}
}
}

View File

@@ -20,28 +20,22 @@ import co.touchlab.kermit.Logger
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Job
import kotlinx.coroutines.SupervisorJob
import kotlinx.coroutines.flow.emptyFlow
import kotlinx.coroutines.flow.filter
import kotlinx.coroutines.flow.flatMapLatest
import kotlinx.coroutines.flow.flowOf
import kotlinx.coroutines.flow.launchIn
import kotlinx.coroutines.flow.map
import kotlinx.coroutines.flow.onEach
import kotlinx.coroutines.launch
import org.koin.core.annotation.Single
import org.meshtastic.core.common.util.nowSeconds
import org.meshtastic.core.di.CoroutineDispatchers
import org.meshtastic.core.model.ConnectionState as AppConnectionState
import org.meshtastic.core.model.DataPacket
import org.meshtastic.core.model.MessageStatus
import org.meshtastic.core.model.util.onlineTimeThreshold
import org.meshtastic.core.model.service.ServiceAction
import org.meshtastic.core.repository.MeshLocationManager
import org.meshtastic.core.repository.NodeRepository
import org.meshtastic.core.repository.PacketRepository
import org.meshtastic.core.repository.ServiceRepository
import org.meshtastic.core.repository.UiPrefs
import org.meshtastic.proto.ClientNotification
import org.meshtastic.proto.DeviceMetrics
import org.meshtastic.proto.PortNum
import org.meshtastic.proto.Position
@@ -49,11 +43,7 @@ import org.meshtastic.proto.User
import org.meshtastic.sdk.AdminResult
import org.meshtastic.sdk.ChannelIndex
import org.meshtastic.sdk.ConnectionState as SdkConnectionState
import org.meshtastic.sdk.MeshEvent
import org.meshtastic.sdk.NeighborInfo
import org.meshtastic.sdk.NodeChange
import org.meshtastic.sdk.NodeId
import org.meshtastic.sdk.StoreForwardEvent
/**
* Bridges SDK reactive flows into the repository layer and routes [ServiceAction]s
@@ -62,215 +52,59 @@ import org.meshtastic.sdk.StoreForwardEvent
* The SDK owns the transport and all state; this bridge maps SDK emissions into [ServiceRepository]
* and [NodeRepository] so that existing feature-module UI code (which observes those repositories)
* continues to work without modification.
*
* **Lifecycle:** Created as a Koin `@Single`. Automatically subscribes to [RadioClientAccessor.client]
* and starts/stops collection as clients come and go.
*/
@Single
@Suppress("TooManyFunctions")
class SdkStateBridge(
private val accessor: RadioClientAccessor,
private val serviceRepository: ServiceRepository,
private val nodeRepository: NodeRepository,
private val packetRepository: Lazy<PacketRepository>,
private val locationManager: MeshLocationManager,
private val topologyService: MeshTopologyService,
topologyService: MeshTopologyService,
private val uiPrefs: UiPrefs,
private val dispatchers: CoroutineDispatchers,
) {
private val scope = CoroutineScope(SupervisorJob() + dispatchers.default)
private var locationJob: Job? = null
private val nodeBridge = SdkNodeBridge(nodeRepository = nodeRepository, topologyService = topologyService)
private val packetBridge = SdkPacketBridge(
serviceRepository = serviceRepository,
packetRepository = packetRepository,
nodeRepository = nodeRepository,
)
private val topologyBridge = SdkTopologyBridge(topologyService = topologyService)
private val eventBridge = SdkEventBridge(serviceRepository = serviceRepository)
init {
startBridge()
bind()
}
private fun startBridge() {
private fun bind() {
bindConnectionState()
nodeBridge.observe(accessor, scope)
packetBridge.observe(accessor, scope)
topologyBridge.observe(accessor, scope)
eventBridge.observe(accessor, scope)
bindServiceActions()
bindLocationPublishing()
Logger.i { "SdkStateBridge started — SDK owns transport + ServiceAction dispatch" }
}
// ── Connection state ────────────────────────────────────────────────
private fun bindConnectionState() {
accessor.client
.flatMapLatest { client -> client?.connection ?: flowOf(SdkConnectionState.Disconnected) }
.onEach { sdkState -> serviceRepository.setConnectionState(mapConnectionState(sdkState)) }
.launchIn(scope)
}
// ── Node updates (position, telemetry, user all included in NodeInfo) ─
accessor.client
.flatMapLatest { client -> client?.nodes ?: flowOf() }
.onEach { change ->
when (change) {
is NodeChange.Snapshot -> {
nodeRepository.clear()
topologyService.clear()
change.nodes.forEach { (_, nodeInfo) ->
nodeRepository.installNodeInfo(nodeInfo, withBroadcast = false)
}
nodeRepository.setNodeDbReady(true)
}
is NodeChange.Added -> nodeRepository.installNodeInfo(change.node, withBroadcast = true)
is NodeChange.Updated -> nodeRepository.installNodeInfo(change.node, withBroadcast = true)
is NodeChange.Removed -> nodeRepository.removeByNodenum(change.nodeId.raw)
is NodeChange.WentOffline -> {
val nodeNum = change.nodeId.raw
Logger.d {
"[SdkBridge] Node ${DataPacket.nodeNumToDefaultId(nodeNum)} went offline (last heard: ${change.lastHeard})"
}
if (nodeRepository.nodeDBbyNodeNum.containsKey(nodeNum)) {
nodeRepository.updateNode(nodeNum) { node ->
node.copy(lastHeard = minOf(node.lastHeard, change.lastHeard, onlineTimeThreshold()))
}
}
}
is NodeChange.CameOnline -> {
val nodeNum = change.nodeId.raw
Logger.d { "[SdkBridge] Node ${DataPacket.nodeNumToDefaultId(nodeNum)} came online" }
if (nodeRepository.nodeDBbyNodeNum.containsKey(nodeNum)) {
nodeRepository.updateNode(nodeNum) { node ->
node.copy(lastHeard = maxOf(node.lastHeard, nowSeconds.toInt()))
}
}
}
}
}
.launchIn(scope)
// ── Own node identity ───────────────────────────────────────────────
accessor.client
.flatMapLatest { client -> client?.ownNode ?: flowOf(null) }
.onEach { ownNode -> if (ownNode != null) nodeRepository.setMyNodeNum(ownNode.num) }
.launchIn(scope)
// ── Raw packet forward (for RadioConfigViewModel + TAK) ─────────────
accessor.client
.flatMapLatest { client -> client?.packets ?: flowOf() }
.onEach { packet -> serviceRepository.emitMeshPacket(packet) }
.launchIn(scope)
// ── Topology: ingest NeighborInfo packets into MeshTopology graph ────
accessor.client
.flatMapLatest { client -> client?.packets ?: flowOf() }
.filter { it.decoded?.portnum == PortNum.NEIGHBORINFO_APP }
.onEach { packet ->
val payload = packet.decoded?.payload?.toByteArray() ?: return@onEach
runCatching {
val proto = org.meshtastic.proto.NeighborInfo.ADAPTER.decode(payload)
val info = NeighborInfo.fromProto(
reportingNode = packet.from,
neighborNodeIds = proto.neighbors.map { it.node_id },
snrValues = proto.neighbors.map { it.snr },
timestamp = proto.last_sent_by_id,
)
topologyService.ingestNeighborInfo(info)
}.onFailure { e -> Logger.w(e) { "[SdkBridge] Failed to parse NeighborInfo" } }
}
.launchIn(scope)
// ── NODE_STATUS_APP: update node status string ───────────────────────
accessor.client
.flatMapLatest { client -> client?.packets ?: flowOf() }
.filter { it.decoded?.portnum == PortNum.NODE_STATUS_APP }
.onEach { packet ->
val status = packet.decoded?.payload?.utf8() ?: return@onEach
nodeRepository.updateNode(packet.from) { it.copy(nodeStatus = status) }
}
.launchIn(scope)
// ── Events (notifications, security, backpressure) ──────────────────
accessor.client
.flatMapLatest { client -> client?.events ?: flowOf() }
.onEach { event ->
when (event) {
is MeshEvent.DeviceRebooted -> {
Logger.i { "[SdkBridge] Device rebooted" }
serviceRepository.setClientNotification(
ClientNotification(message = "Device rebooted"),
)
}
is MeshEvent.CongestionWarning -> {
Logger.w {
"[SdkBridge] Congestion warning: level=${event.metrics.level}, airUtil=${event.metrics.airUtilTx}%, channelUtil=${event.metrics.channelUtil}%"
}
serviceRepository.setCongestionLevel(event.metrics.level)
}
is MeshEvent.SecurityWarning -> Logger.w { "[SdkBridge] Security warning: $event" }
is MeshEvent.PacketsDropped -> Logger.w { "[SdkBridge] Packets dropped: ${event.count} from ${event.flow}" }
else -> Logger.d { "[SdkBridge] Event: $event" }
}
}
.launchIn(scope)
// ── Store-and-Forward (server discovery + replay lifecycle) ───────────
accessor.client
.flatMapLatest { client ->
client?.storeForward?.servers
?.map { servers -> servers.map { it.raw } }
?: flowOf(emptyList())
}
.onEach { servers -> serviceRepository.setStoreForwardServers(servers) }
.launchIn(scope)
accessor.client
.flatMapLatest { client -> client?.storeForward?.events ?: emptyFlow() }
.onEach { event ->
when (event) {
is StoreForwardEvent.ServerDiscovered -> {
Logger.i {
"[SdkBridge] S&F server discovered: ${DataPacket.nodeNumToDefaultId(event.nodeId.raw)}"
}
}
is StoreForwardEvent.ServerLost -> {
Logger.i {
"[SdkBridge] S&F server lost: ${DataPacket.nodeNumToDefaultId(event.nodeId.raw)}"
}
}
is StoreForwardEvent.HistoryReplayStarted -> {
Logger.i {
"[SdkBridge] S&F history replay started from " +
"${DataPacket.nodeNumToDefaultId(event.server.raw)} count=${event.messageCount}"
}
}
is StoreForwardEvent.HistoryReplayComplete -> {
Logger.i {
"[SdkBridge] S&F history replay complete from " +
"${DataPacket.nodeNumToDefaultId(event.server.raw)} delivered=${event.delivered}"
}
}
is StoreForwardEvent.Heartbeat -> {
Logger.d {
"[SdkBridge] S&F heartbeat from ${DataPacket.nodeNumToDefaultId(event.server.raw)}"
}
}
is StoreForwardEvent.SfppLinkProvided -> {
event.messageHash?.let { hash ->
val status = if (event.confirmed) MessageStatus.SFPP_CONFIRMED else MessageStatus.SFPP_ROUTING
packetRepository.value.updateSFPPStatus(
packetId = event.packetId,
from = event.from,
to = event.to,
hash = hash,
status = status,
rxTime = 0L,
myNodeNum = nodeRepository.myNodeNum.value ?: 0,
)
}
}
is StoreForwardEvent.SfppCanonAnnounced -> {
packetRepository.value.updateSFPPStatusByHash(
hash = event.messageHash,
status = MessageStatus.SFPP_CONFIRMED,
rxTime = event.rxTime,
)
}
else -> Logger.d { "[SdkBridge] S&F event: $event" }
}
}
.launchIn(scope)
// ── ServiceAction routing (replaces MeshServiceOrchestrator dispatch) ─
private fun bindServiceActions() {
serviceRepository.serviceAction
.onEach { action -> handleServiceAction(action) }
.launchIn(scope)
}
// ── Location publishing ─────────────────────────────────────────────
private fun bindLocationPublishing() {
accessor.client
.flatMapLatest { client -> client?.ownNode ?: flowOf(null) }
.onEach { ownNode ->
@@ -282,9 +116,9 @@ class SdkStateBridge(
if (shouldProvide) {
locationManager.start(scope) { pos ->
scope.launch {
val c = accessor.client.value ?: return@launch
val client = accessor.client.value ?: return@launch
val posBytes = org.meshtastic.proto.Position.ADAPTER.encode(pos)
c.send(
client.send(
portnum = PortNum.POSITION_APP,
payload = posBytes,
wantAck = false,
@@ -299,12 +133,8 @@ class SdkStateBridge(
}
}
.launchIn(scope)
Logger.i { "SdkStateBridge started — SDK owns transport + ServiceAction dispatch" }
}
// ── ServiceAction handling ───────────────────────────────────────────────
private suspend fun handleServiceAction(action: ServiceAction) {
val client = accessor.client.value
if (client == null) {
@@ -341,7 +171,6 @@ class SdkStateBridge(
if (result.isSuccess) {
nodeRepository.updateNode(node.num) { n ->
if (newIgnored) {
// Mirror firmware behavior: wipe position, device_metrics, zero public_key
n.copy(
isIgnored = true,
position = Position(),

View File

@@ -0,0 +1,51 @@
/*
* Copyright (c) 2026 Meshtastic LLC
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*/
package org.meshtastic.core.data.radio
import co.touchlab.kermit.Logger
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.flow.emptyFlow
import kotlinx.coroutines.flow.filter
import kotlinx.coroutines.flow.flatMapLatest
import kotlinx.coroutines.flow.launchIn
import kotlinx.coroutines.flow.onEach
import org.meshtastic.proto.PortNum
import org.meshtastic.sdk.NeighborInfo
internal class SdkTopologyBridge(
private val topologyService: MeshTopologyService,
) {
fun observe(accessor: RadioClientAccessor, scope: CoroutineScope) {
accessor.client
.flatMapLatest { client -> client?.packets ?: emptyFlow() }
.filter { it.decoded?.portnum == PortNum.NEIGHBORINFO_APP }
.onEach { packet ->
val payload = packet.decoded?.payload?.toByteArray() ?: return@onEach
runCatching {
val proto = org.meshtastic.proto.NeighborInfo.ADAPTER.decode(payload)
val info = NeighborInfo.fromProto(
reportingNode = packet.from,
neighborNodeIds = proto.neighbors.map { it.node_id },
snrValues = proto.neighbors.map { it.snr },
timestamp = proto.last_sent_by_id,
)
topologyService.ingestNeighborInfo(info)
}.onFailure { e -> Logger.w(e) { "[SdkBridge] Failed to parse NeighborInfo" } }
}
.launchIn(scope)
}
}