feat: typed telemetry dispatch + MeshTopology service

- Replace magic int dispatch in requestTelemetry with TelemetryType enum
- Update DataRequester interface: remove requestId param, use TelemetryType directly
- Add HEALTH and TRAFFIC_MANAGEMENT to TelemetryType, remove stale PAX variant
- Create MeshTopologyService wrapping SDK's MeshTopology with thread-safe Mutex
- Wire NeighborInfo packet ingestion in SdkStateBridge → topology graph
- Clear topology on node snapshot (reconnect)

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
This commit is contained in:
James Rich
2026-05-06 12:04:07 -05:00
parent 3fb45e05bd
commit f4c6cee332
11 changed files with 147 additions and 22 deletions

View File

@@ -0,0 +1,93 @@
/*
* Copyright (c) 2026 Meshtastic LLC
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*/
package org.meshtastic.core.data.radio
import co.touchlab.kermit.Logger
import kotlinx.coroutines.flow.MutableStateFlow
import kotlinx.coroutines.flow.StateFlow
import kotlinx.coroutines.sync.Mutex
import kotlinx.coroutines.sync.withLock
import org.koin.core.annotation.Single
import org.meshtastic.sdk.MeshTopology
import org.meshtastic.sdk.NeighborInfo
import org.meshtastic.sdk.NodeId
/**
* Thread-safe wrapper around SDK's [MeshTopology] graph utility.
*
* Fed by [SdkStateBridge] whenever a NEIGHBORINFO_APP packet arrives. Exposes reactive
* topology state for feature modules (map visualization, route analysis, neighbor lists).
*
* The graph is incrementally built: each [ingestNeighborInfo] call replaces all edges from
* the reporting node, keeping the topology fresh as nodes broadcast their neighbor tables.
*/
@Single
class MeshTopologyService {
private val topology = MeshTopology()
private val mutex = Mutex()
private val _edges = MutableStateFlow<List<MeshTopology.Edge>>(emptyList())
/** All directed edges in the mesh topology graph. */
val edges: StateFlow<List<MeshTopology.Edge>> = _edges
private val _nodeCount = MutableStateFlow(0)
/** Total number of nodes participating in the topology (reporters + reported neighbors). */
val nodeCount: StateFlow<Int> = _nodeCount
/**
* Ingest a [NeighborInfo] report into the topology graph.
* Replaces all prior edges from the reporting node.
*/
suspend fun ingestNeighborInfo(info: NeighborInfo) {
mutex.withLock {
topology.addNeighborInfo(info)
_edges.value = topology.allEdges()
_nodeCount.value = topology.nodes.size
}
Logger.d { "[Topology] Ingested neighbors from ${info.nodeId}: ${info.neighbors.size} edges" }
}
/** Remove a node from the topology (e.g., when it goes permanently offline). */
suspend fun removeNode(nodeId: NodeId) {
mutex.withLock {
topology.removeNode(nodeId)
_edges.value = topology.allEdges()
_nodeCount.value = topology.nodes.size
}
}
/** Get all neighbors of a specific node (thread-safe snapshot). */
suspend fun getNeighbors(nodeId: NodeId): List<MeshTopology.Edge> =
mutex.withLock { topology.getNeighbors(nodeId) }
/** Find the shortest path between two nodes via BFS. */
suspend fun shortestPath(from: NodeId, to: NodeId): List<NodeId> =
mutex.withLock { topology.shortestPath(from, to) }
/** Check if two nodes have a direct edge in either direction. */
suspend fun isDirectReach(a: NodeId, b: NodeId): Boolean =
mutex.withLock { topology.isDirectReach(a, b) }
/** Clear all topology data (e.g., on disconnect). */
suspend fun clear() {
mutex.withLock {
topology.clear()
_edges.value = emptyList()
_nodeCount.value = 0
}
}
}

View File

@@ -32,6 +32,7 @@ import org.meshtastic.core.model.MessageSender
import org.meshtastic.core.model.Position
import org.meshtastic.core.model.RadioController
import org.meshtastic.core.model.RemoteAdmin
import org.meshtastic.core.model.TelemetryType
import org.meshtastic.core.repository.MeshLocationManager
import org.meshtastic.core.repository.NodeRepository
import org.meshtastic.core.repository.RadioPrefs
@@ -322,19 +323,18 @@ class SdkRadioController(
c.routing.traceRoute(NodeId(destNum))
}
override suspend fun requestTelemetry(requestId: Int, destNum: Int, typeValue: Int) {
override suspend fun requestTelemetry(destNum: Int, type: TelemetryType) {
val c = requireClient()
val node = NodeId(destNum)
when (typeValue) {
0 -> c.telemetry.requestDevice(node)
1 -> c.telemetry.requestEnvironment(node)
2 -> c.telemetry.requestAirQuality(node)
3 -> c.telemetry.requestPower(node)
4 -> c.telemetry.requestLocalStats()
5 -> c.telemetry.requestHealth(node)
6 -> c.telemetry.requestHost(node)
7 -> c.telemetry.requestTrafficManagement(node)
else -> Logger.w { "Unknown telemetry type: $typeValue" }
when (type) {
TelemetryType.DEVICE -> c.telemetry.requestDevice(node)
TelemetryType.ENVIRONMENT -> c.telemetry.requestEnvironment(node)
TelemetryType.AIR_QUALITY -> c.telemetry.requestAirQuality(node)
TelemetryType.POWER -> c.telemetry.requestPower(node)
TelemetryType.LOCAL_STATS -> c.telemetry.requestLocalStats()
TelemetryType.HEALTH -> c.telemetry.requestHealth(node)
TelemetryType.HOST -> c.telemetry.requestHost(node)
TelemetryType.TRAFFIC_MANAGEMENT -> c.telemetry.requestTrafficManagement(node)
}
}

View File

@@ -21,6 +21,7 @@ import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Job
import kotlinx.coroutines.SupervisorJob
import kotlinx.coroutines.flow.emptyFlow
import kotlinx.coroutines.flow.filter
import kotlinx.coroutines.flow.flatMapLatest
import kotlinx.coroutines.flow.flowOf
import kotlinx.coroutines.flow.launchIn
@@ -47,6 +48,7 @@ import org.meshtastic.sdk.AdminResult
import org.meshtastic.sdk.ChannelIndex
import org.meshtastic.sdk.ConnectionState as SdkConnectionState
import org.meshtastic.sdk.MeshEvent
import org.meshtastic.sdk.NeighborInfo
import org.meshtastic.sdk.NodeChange
import org.meshtastic.sdk.NodeId
import org.meshtastic.sdk.StoreForwardEvent
@@ -70,6 +72,7 @@ class SdkStateBridge(
private val nodeRepository: NodeRepository,
private val packetRepository: Lazy<PacketRepository>,
private val locationManager: MeshLocationManager,
private val topologyService: MeshTopologyService,
private val uiPrefs: UiPrefs,
private val dispatchers: CoroutineDispatchers,
) {
@@ -95,6 +98,7 @@ class SdkStateBridge(
when (change) {
is NodeChange.Snapshot -> {
nodeRepository.clear()
topologyService.clear()
change.nodes.forEach { (_, nodeInfo) ->
nodeRepository.installNodeInfo(nodeInfo, withBroadcast = false)
}
@@ -139,6 +143,25 @@ class SdkStateBridge(
.onEach { packet -> serviceRepository.emitMeshPacket(packet) }
.launchIn(scope)
// ── Topology: ingest NeighborInfo packets into MeshTopology graph ────
accessor.client
.flatMapLatest { client -> client?.packets ?: flowOf() }
.filter { it.decoded?.portnum == PortNum.NEIGHBORINFO_APP }
.onEach { packet ->
val payload = packet.decoded?.payload?.toByteArray() ?: return@onEach
runCatching {
val proto = org.meshtastic.proto.NeighborInfo.ADAPTER.decode(payload)
val info = NeighborInfo.fromProto(
reportingNode = packet.from,
neighborNodeIds = proto.neighbors.map { it.node_id },
snrValues = proto.neighbors.map { it.snr },
timestamp = proto.last_sent_by_id,
)
topologyService.ingestNeighborInfo(info)
}.onFailure { e -> Logger.w(e) { "[SdkBridge] Failed to parse NeighborInfo" } }
}
.launchIn(scope)
// ── Events (notifications, security, backpressure) ──────────────────
accessor.client
.flatMapLatest { client -> client?.events ?: flowOf() }

View File

@@ -284,6 +284,7 @@ class SdkStateBridgeTest {
nodeRepository = nodeRepository,
packetRepository = lazyOf(packetRepository),
locationManager = NoOpLocationManager,
topologyService = MeshTopologyService(),
uiPrefs = FakeUiPrefs(),
dispatchers = CoroutineDispatchers(
io = backgroundScope.coroutineContext[kotlin.coroutines.ContinuationInterceptor] as kotlinx.coroutines.CoroutineDispatcher,