From f4c6cee3329ff0dcd288ae82e4c7ca27abec1854 Mon Sep 17 00:00:00 2001 From: James Rich Date: Wed, 6 May 2026 12:04:07 -0500 Subject: [PATCH] feat: typed telemetry dispatch + MeshTopology service MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Replace magic int dispatch in requestTelemetry with TelemetryType enum - Update DataRequester interface: remove requestId param, use TelemetryType directly - Add HEALTH and TRAFFIC_MANAGEMENT to TelemetryType, remove stale PAX variant - Create MeshTopologyService wrapping SDK's MeshTopology with thread-safe Mutex - Wire NeighborInfo packet ingestion in SdkStateBridge → topology graph - Clear topology on node snapshot (reconnect) Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> --- .../core/data/radio/MeshTopologyService.kt | 93 +++++++++++++++++++ .../core/data/radio/SdkRadioController.kt | 22 ++--- .../core/data/radio/SdkStateBridge.kt | 23 +++++ .../core/data/radio/SdkStateBridgeTest.kt | 1 + .../meshtastic/core/model/DataRequester.kt | 2 +- .../meshtastic/core/model/TelemetryType.kt | 9 +- .../core/testing/FakeRadioController.kt | 3 +- .../component/TelemetricActionsSection.kt | 2 +- .../node/detail/CommonNodeRequestActions.kt | 6 +- .../feature/node/metrics/PaxMetrics.kt | 4 +- .../feature/widget/RefreshLocalStatsAction.kt | 4 +- 11 files changed, 147 insertions(+), 22 deletions(-) create mode 100644 core/data/src/commonMain/kotlin/org/meshtastic/core/data/radio/MeshTopologyService.kt diff --git a/core/data/src/commonMain/kotlin/org/meshtastic/core/data/radio/MeshTopologyService.kt b/core/data/src/commonMain/kotlin/org/meshtastic/core/data/radio/MeshTopologyService.kt new file mode 100644 index 000000000..6554a57d6 --- /dev/null +++ b/core/data/src/commonMain/kotlin/org/meshtastic/core/data/radio/MeshTopologyService.kt @@ -0,0 +1,93 @@ +/* + * Copyright (c) 2026 Meshtastic LLC + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see . + */ +package org.meshtastic.core.data.radio + +import co.touchlab.kermit.Logger +import kotlinx.coroutines.flow.MutableStateFlow +import kotlinx.coroutines.flow.StateFlow +import kotlinx.coroutines.sync.Mutex +import kotlinx.coroutines.sync.withLock +import org.koin.core.annotation.Single +import org.meshtastic.sdk.MeshTopology +import org.meshtastic.sdk.NeighborInfo +import org.meshtastic.sdk.NodeId + +/** + * Thread-safe wrapper around SDK's [MeshTopology] graph utility. + * + * Fed by [SdkStateBridge] whenever a NEIGHBORINFO_APP packet arrives. Exposes reactive + * topology state for feature modules (map visualization, route analysis, neighbor lists). + * + * The graph is incrementally built: each [ingestNeighborInfo] call replaces all edges from + * the reporting node, keeping the topology fresh as nodes broadcast their neighbor tables. + */ +@Single +class MeshTopologyService { + private val topology = MeshTopology() + private val mutex = Mutex() + + private val _edges = MutableStateFlow>(emptyList()) + /** All directed edges in the mesh topology graph. */ + val edges: StateFlow> = _edges + + private val _nodeCount = MutableStateFlow(0) + /** Total number of nodes participating in the topology (reporters + reported neighbors). */ + val nodeCount: StateFlow = _nodeCount + + /** + * Ingest a [NeighborInfo] report into the topology graph. + * Replaces all prior edges from the reporting node. + */ + suspend fun ingestNeighborInfo(info: NeighborInfo) { + mutex.withLock { + topology.addNeighborInfo(info) + _edges.value = topology.allEdges() + _nodeCount.value = topology.nodes.size + } + Logger.d { "[Topology] Ingested neighbors from ${info.nodeId}: ${info.neighbors.size} edges" } + } + + /** Remove a node from the topology (e.g., when it goes permanently offline). */ + suspend fun removeNode(nodeId: NodeId) { + mutex.withLock { + topology.removeNode(nodeId) + _edges.value = topology.allEdges() + _nodeCount.value = topology.nodes.size + } + } + + /** Get all neighbors of a specific node (thread-safe snapshot). */ + suspend fun getNeighbors(nodeId: NodeId): List = + mutex.withLock { topology.getNeighbors(nodeId) } + + /** Find the shortest path between two nodes via BFS. */ + suspend fun shortestPath(from: NodeId, to: NodeId): List = + mutex.withLock { topology.shortestPath(from, to) } + + /** Check if two nodes have a direct edge in either direction. */ + suspend fun isDirectReach(a: NodeId, b: NodeId): Boolean = + mutex.withLock { topology.isDirectReach(a, b) } + + /** Clear all topology data (e.g., on disconnect). */ + suspend fun clear() { + mutex.withLock { + topology.clear() + _edges.value = emptyList() + _nodeCount.value = 0 + } + } +} diff --git a/core/data/src/commonMain/kotlin/org/meshtastic/core/data/radio/SdkRadioController.kt b/core/data/src/commonMain/kotlin/org/meshtastic/core/data/radio/SdkRadioController.kt index 31d9a1134..2ce8c5302 100644 --- a/core/data/src/commonMain/kotlin/org/meshtastic/core/data/radio/SdkRadioController.kt +++ b/core/data/src/commonMain/kotlin/org/meshtastic/core/data/radio/SdkRadioController.kt @@ -32,6 +32,7 @@ import org.meshtastic.core.model.MessageSender import org.meshtastic.core.model.Position import org.meshtastic.core.model.RadioController import org.meshtastic.core.model.RemoteAdmin +import org.meshtastic.core.model.TelemetryType import org.meshtastic.core.repository.MeshLocationManager import org.meshtastic.core.repository.NodeRepository import org.meshtastic.core.repository.RadioPrefs @@ -322,19 +323,18 @@ class SdkRadioController( c.routing.traceRoute(NodeId(destNum)) } - override suspend fun requestTelemetry(requestId: Int, destNum: Int, typeValue: Int) { + override suspend fun requestTelemetry(destNum: Int, type: TelemetryType) { val c = requireClient() val node = NodeId(destNum) - when (typeValue) { - 0 -> c.telemetry.requestDevice(node) - 1 -> c.telemetry.requestEnvironment(node) - 2 -> c.telemetry.requestAirQuality(node) - 3 -> c.telemetry.requestPower(node) - 4 -> c.telemetry.requestLocalStats() - 5 -> c.telemetry.requestHealth(node) - 6 -> c.telemetry.requestHost(node) - 7 -> c.telemetry.requestTrafficManagement(node) - else -> Logger.w { "Unknown telemetry type: $typeValue" } + when (type) { + TelemetryType.DEVICE -> c.telemetry.requestDevice(node) + TelemetryType.ENVIRONMENT -> c.telemetry.requestEnvironment(node) + TelemetryType.AIR_QUALITY -> c.telemetry.requestAirQuality(node) + TelemetryType.POWER -> c.telemetry.requestPower(node) + TelemetryType.LOCAL_STATS -> c.telemetry.requestLocalStats() + TelemetryType.HEALTH -> c.telemetry.requestHealth(node) + TelemetryType.HOST -> c.telemetry.requestHost(node) + TelemetryType.TRAFFIC_MANAGEMENT -> c.telemetry.requestTrafficManagement(node) } } diff --git a/core/data/src/commonMain/kotlin/org/meshtastic/core/data/radio/SdkStateBridge.kt b/core/data/src/commonMain/kotlin/org/meshtastic/core/data/radio/SdkStateBridge.kt index dbc42267b..7beb100d8 100644 --- a/core/data/src/commonMain/kotlin/org/meshtastic/core/data/radio/SdkStateBridge.kt +++ b/core/data/src/commonMain/kotlin/org/meshtastic/core/data/radio/SdkStateBridge.kt @@ -21,6 +21,7 @@ import kotlinx.coroutines.CoroutineScope import kotlinx.coroutines.Job import kotlinx.coroutines.SupervisorJob import kotlinx.coroutines.flow.emptyFlow +import kotlinx.coroutines.flow.filter import kotlinx.coroutines.flow.flatMapLatest import kotlinx.coroutines.flow.flowOf import kotlinx.coroutines.flow.launchIn @@ -47,6 +48,7 @@ import org.meshtastic.sdk.AdminResult import org.meshtastic.sdk.ChannelIndex import org.meshtastic.sdk.ConnectionState as SdkConnectionState import org.meshtastic.sdk.MeshEvent +import org.meshtastic.sdk.NeighborInfo import org.meshtastic.sdk.NodeChange import org.meshtastic.sdk.NodeId import org.meshtastic.sdk.StoreForwardEvent @@ -70,6 +72,7 @@ class SdkStateBridge( private val nodeRepository: NodeRepository, private val packetRepository: Lazy, private val locationManager: MeshLocationManager, + private val topologyService: MeshTopologyService, private val uiPrefs: UiPrefs, private val dispatchers: CoroutineDispatchers, ) { @@ -95,6 +98,7 @@ class SdkStateBridge( when (change) { is NodeChange.Snapshot -> { nodeRepository.clear() + topologyService.clear() change.nodes.forEach { (_, nodeInfo) -> nodeRepository.installNodeInfo(nodeInfo, withBroadcast = false) } @@ -139,6 +143,25 @@ class SdkStateBridge( .onEach { packet -> serviceRepository.emitMeshPacket(packet) } .launchIn(scope) + // ── Topology: ingest NeighborInfo packets into MeshTopology graph ──── + accessor.client + .flatMapLatest { client -> client?.packets ?: flowOf() } + .filter { it.decoded?.portnum == PortNum.NEIGHBORINFO_APP } + .onEach { packet -> + val payload = packet.decoded?.payload?.toByteArray() ?: return@onEach + runCatching { + val proto = org.meshtastic.proto.NeighborInfo.ADAPTER.decode(payload) + val info = NeighborInfo.fromProto( + reportingNode = packet.from, + neighborNodeIds = proto.neighbors.map { it.node_id }, + snrValues = proto.neighbors.map { it.snr }, + timestamp = proto.last_sent_by_id, + ) + topologyService.ingestNeighborInfo(info) + }.onFailure { e -> Logger.w(e) { "[SdkBridge] Failed to parse NeighborInfo" } } + } + .launchIn(scope) + // ── Events (notifications, security, backpressure) ────────────────── accessor.client .flatMapLatest { client -> client?.events ?: flowOf() } diff --git a/core/data/src/commonTest/kotlin/org/meshtastic/core/data/radio/SdkStateBridgeTest.kt b/core/data/src/commonTest/kotlin/org/meshtastic/core/data/radio/SdkStateBridgeTest.kt index 671b73653..b987d9f5a 100644 --- a/core/data/src/commonTest/kotlin/org/meshtastic/core/data/radio/SdkStateBridgeTest.kt +++ b/core/data/src/commonTest/kotlin/org/meshtastic/core/data/radio/SdkStateBridgeTest.kt @@ -284,6 +284,7 @@ class SdkStateBridgeTest { nodeRepository = nodeRepository, packetRepository = lazyOf(packetRepository), locationManager = NoOpLocationManager, + topologyService = MeshTopologyService(), uiPrefs = FakeUiPrefs(), dispatchers = CoroutineDispatchers( io = backgroundScope.coroutineContext[kotlin.coroutines.ContinuationInterceptor] as kotlinx.coroutines.CoroutineDispatcher, diff --git a/core/model/src/commonMain/kotlin/org/meshtastic/core/model/DataRequester.kt b/core/model/src/commonMain/kotlin/org/meshtastic/core/model/DataRequester.kt index 59d5e04bb..76c693e20 100644 --- a/core/model/src/commonMain/kotlin/org/meshtastic/core/model/DataRequester.kt +++ b/core/model/src/commonMain/kotlin/org/meshtastic/core/model/DataRequester.kt @@ -21,7 +21,7 @@ interface DataRequester { suspend fun requestPosition(destNum: Int, currentPosition: Position) suspend fun requestUserInfo(destNum: Int) suspend fun requestTraceroute(requestId: Int, destNum: Int) - suspend fun requestTelemetry(requestId: Int, destNum: Int, typeValue: Int) + suspend fun requestTelemetry(destNum: Int, type: TelemetryType) suspend fun requestNeighborInfo(requestId: Int, destNum: Int) suspend fun requestStoreForwardHistory(since: Int? = null, serverNodeNum: Int? = null): Boolean } diff --git a/core/model/src/commonMain/kotlin/org/meshtastic/core/model/TelemetryType.kt b/core/model/src/commonMain/kotlin/org/meshtastic/core/model/TelemetryType.kt index 89eec6bbf..b7b532a72 100644 --- a/core/model/src/commonMain/kotlin/org/meshtastic/core/model/TelemetryType.kt +++ b/core/model/src/commonMain/kotlin/org/meshtastic/core/model/TelemetryType.kt @@ -16,12 +16,19 @@ */ package org.meshtastic.core.model +/** + * Typed enum for telemetry request categories. + * + * Ordinal values align with SDK telemetry dispatch ordering: + * 0=Device, 1=Environment, 2=AirQuality, 3=Power, 4=LocalStats, 5=Health, 6=Host, 7=TrafficManagement. + */ enum class TelemetryType { DEVICE, ENVIRONMENT, AIR_QUALITY, POWER, LOCAL_STATS, + HEALTH, HOST, - PAX, + TRAFFIC_MANAGEMENT, } diff --git a/core/testing/src/commonMain/kotlin/org/meshtastic/core/testing/FakeRadioController.kt b/core/testing/src/commonMain/kotlin/org/meshtastic/core/testing/FakeRadioController.kt index d891284e3..8579fd7ff 100644 --- a/core/testing/src/commonMain/kotlin/org/meshtastic/core/testing/FakeRadioController.kt +++ b/core/testing/src/commonMain/kotlin/org/meshtastic/core/testing/FakeRadioController.kt @@ -22,6 +22,7 @@ import org.meshtastic.core.model.DataPacket import org.meshtastic.core.model.DeviceAdminEdit import org.meshtastic.core.model.Position import org.meshtastic.core.model.RadioController +import org.meshtastic.core.model.TelemetryType import org.meshtastic.proto.Channel import org.meshtastic.proto.ClientNotification import org.meshtastic.proto.Config @@ -141,7 +142,7 @@ class FakeRadioController : override suspend fun requestTraceroute(requestId: Int, destNum: Int) {} - override suspend fun requestTelemetry(requestId: Int, destNum: Int, typeValue: Int) {} + override suspend fun requestTelemetry(destNum: Int, type: TelemetryType) {} override suspend fun requestNeighborInfo(requestId: Int, destNum: Int) {} diff --git a/feature/node/src/commonMain/kotlin/org/meshtastic/feature/node/component/TelemetricActionsSection.kt b/feature/node/src/commonMain/kotlin/org/meshtastic/feature/node/component/TelemetricActionsSection.kt index 2ff709b75..e1c2f7b0c 100644 --- a/feature/node/src/commonMain/kotlin/org/meshtastic/feature/node/component/TelemetricActionsSection.kt +++ b/feature/node/src/commonMain/kotlin/org/meshtastic/feature/node/component/TelemetricActionsSection.kt @@ -197,7 +197,7 @@ private fun rememberTelemetricFeatures( TelemetricFeature( titleRes = LogsType.PAX.titleRes, icon = LogsType.PAX.icon, - requestAction = { NodeMenuAction.RequestTelemetry(it, TelemetryType.PAX) }, + requestAction = { NodeMenuAction.RequestTelemetry(it, TelemetryType.DEVICE) }, logsType = LogsType.PAX, ), TelemetricFeature( diff --git a/feature/node/src/commonMain/kotlin/org/meshtastic/feature/node/detail/CommonNodeRequestActions.kt b/feature/node/src/commonMain/kotlin/org/meshtastic/feature/node/detail/CommonNodeRequestActions.kt index cfd95ac8a..7a60ac29b 100644 --- a/feature/node/src/commonMain/kotlin/org/meshtastic/feature/node/detail/CommonNodeRequestActions.kt +++ b/feature/node/src/commonMain/kotlin/org/meshtastic/feature/node/detail/CommonNodeRequestActions.kt @@ -100,8 +100,7 @@ constructor( scope.launch(ioDispatcher) { runCatching { Logger.i { "Requesting telemetry for '$destNum'" } - val packetId = messageSender.getPacketId() - dataRequester.requestTelemetry(packetId, destNum, type.ordinal) + dataRequester.requestTelemetry(destNum, type) val typeRes = when (type) { @@ -110,8 +109,9 @@ constructor( TelemetryType.AIR_QUALITY -> Res.string.request_air_quality_metrics TelemetryType.POWER -> Res.string.request_power_metrics TelemetryType.LOCAL_STATS -> Res.string.signal_quality + TelemetryType.HEALTH -> Res.string.request_device_metrics TelemetryType.HOST -> Res.string.request_host_metrics - TelemetryType.PAX -> Res.string.request_pax_metrics + TelemetryType.TRAFFIC_MANAGEMENT -> Res.string.request_device_metrics } showFeedback(UiText.Resource(Res.string.requesting_from, typeRes, longName)) diff --git a/feature/node/src/commonMain/kotlin/org/meshtastic/feature/node/metrics/PaxMetrics.kt b/feature/node/src/commonMain/kotlin/org/meshtastic/feature/node/metrics/PaxMetrics.kt index 492685b18..61367756a 100644 --- a/feature/node/src/commonMain/kotlin/org/meshtastic/feature/node/metrics/PaxMetrics.kt +++ b/feature/node/src/commonMain/kotlin/org/meshtastic/feature/node/metrics/PaxMetrics.kt @@ -191,12 +191,12 @@ fun PaxMetricsScreen(metricsViewModel: MetricsViewModel, onNavigateUp: () -> Uni BaseMetricScreen( onNavigateUp = onNavigateUp, - telemetryType = TelemetryType.PAX, + telemetryType = TelemetryType.DEVICE, titleRes = Res.string.pax_metrics_log, nodeName = state.node?.user?.long_name ?: "", data = paxMetrics, timeProvider = { (it.first.received_date / MS_PER_SEC).toDouble() }, - onRequestTelemetry = { metricsViewModel.requestTelemetry(TelemetryType.PAX) }, + onRequestTelemetry = { metricsViewModel.requestTelemetry(TelemetryType.DEVICE) }, controlPart = { TimeFrameSelector( selectedTimeFrame = timeFrame, diff --git a/feature/widget/src/main/kotlin/org/meshtastic/feature/widget/RefreshLocalStatsAction.kt b/feature/widget/src/main/kotlin/org/meshtastic/feature/widget/RefreshLocalStatsAction.kt index b382ba4ab..bcf0f42e5 100644 --- a/feature/widget/src/main/kotlin/org/meshtastic/feature/widget/RefreshLocalStatsAction.kt +++ b/feature/widget/src/main/kotlin/org/meshtastic/feature/widget/RefreshLocalStatsAction.kt @@ -41,7 +41,7 @@ class RefreshLocalStatsAction : return } - radioController.requestTelemetry(myNodeNum.hashCode(), myNodeNum, TelemetryType.LOCAL_STATS.ordinal) - radioController.requestTelemetry(myNodeNum.hashCode() + 1, myNodeNum, TelemetryType.DEVICE.ordinal) + radioController.requestTelemetry(myNodeNum, TelemetryType.LOCAL_STATS) + radioController.requestTelemetry(myNodeNum, TelemetryType.DEVICE) } }