diff --git a/core/data/src/commonTest/kotlin/org/meshtastic/core/data/radio/MeshTopologyServiceTest.kt b/core/data/src/commonTest/kotlin/org/meshtastic/core/data/radio/MeshTopologyServiceTest.kt new file mode 100644 index 000000000..5e5715dd1 --- /dev/null +++ b/core/data/src/commonTest/kotlin/org/meshtastic/core/data/radio/MeshTopologyServiceTest.kt @@ -0,0 +1,172 @@ +/* + * Copyright (c) 2026 Meshtastic LLC + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see . + */ +package org.meshtastic.core.data.radio + +import kotlinx.coroutines.coroutineScope +import kotlinx.coroutines.launch +import kotlinx.coroutines.test.runTest +import kotlinx.coroutines.withTimeout +import org.meshtastic.sdk.MeshTopology +import org.meshtastic.sdk.NeighborInfo +import org.meshtastic.sdk.NodeId +import kotlin.test.Test +import kotlin.test.assertEquals +import kotlin.test.assertFalse +import kotlin.test.assertTrue +import kotlin.time.Duration.Companion.seconds + +class MeshTopologyServiceTest { + + @Test + fun `ingest neighbor info creates graph edges and node count`() = runTest { + val service = MeshTopologyService() + + service.ingestNeighborInfo(neighborInfo(1, 2 to 7.5f, 3 to -1.0f, lastUpdated = 99)) + + assertEquals( + setOf( + edge(1, 2, 7.5f, 99), + edge(1, 3, -1.0f, 99), + ), + service.edges.value.toSet(), + ) + assertEquals(3, service.nodeCount.value) + } + + @Test + fun `shortest path returns the bfs route`() = runTest { + val service = MeshTopologyService() + + service.ingestNeighborInfo(neighborInfo(1, 2 to 5.0f, 3 to 4.0f)) + service.ingestNeighborInfo(neighborInfo(2, 4 to 3.0f)) + service.ingestNeighborInfo(neighborInfo(3, 5 to 2.0f)) + service.ingestNeighborInfo(neighborInfo(5, 4 to 1.0f)) + + assertEquals( + listOf(NodeId(1), NodeId(2), NodeId(4)), + service.shortestPath(NodeId(1), NodeId(4)), + ) + } + + @Test + fun `direct reach is true for one hop neighbors`() = runTest { + val service = MeshTopologyService() + + service.ingestNeighborInfo(neighborInfo(1, 2 to 6.0f)) + + assertTrue(service.isDirectReach(NodeId(1), NodeId(2))) + assertTrue(service.isDirectReach(NodeId(2), NodeId(1))) + assertFalse(service.isDirectReach(NodeId(1), NodeId(3))) + } + + @Test + fun `remove node cleans all associated edges`() = runTest { + val service = MeshTopologyService() + + service.ingestNeighborInfo(neighborInfo(1, 2 to 5.0f, 3 to 1.0f)) + service.ingestNeighborInfo(neighborInfo(4, 2 to 2.5f)) + + service.removeNode(NodeId(2)) + + assertEquals(setOf(edge(1, 3, 1.0f)), service.edges.value.toSet()) + assertEquals(3, service.nodeCount.value) + assertFalse(service.isDirectReach(NodeId(1), NodeId(2))) + assertEquals(emptyList(), service.getNeighbors(NodeId(2))) + } + + @Test + fun `concurrent access keeps reporter edges consistent`() = runTest { + val service = MeshTopologyService() + val firstSnapshot = neighborInfo(1, 2 to 5.0f, 3 to 4.0f) + val secondSnapshot = neighborInfo(1, 4 to 9.0f) + val expectedFirst = setOf(edge(1, 2, 5.0f), edge(1, 3, 4.0f)) + val expectedSecond = setOf(edge(1, 4, 9.0f)) + + coroutineScope { + repeat(100) { index -> + launch { + if (index % 2 == 0) { + service.ingestNeighborInfo(firstSnapshot) + } else { + service.ingestNeighborInfo(secondSnapshot) + } + service.shortestPath(NodeId(1), NodeId(4)) + service.isDirectReach(NodeId(1), NodeId(2)) + } + } + } + + val actualNeighbors = service.getNeighbors(NodeId(1)).toSet() + assertTrue(actualNeighbors == expectedFirst || actualNeighbors == expectedSecond) + assertEquals(actualNeighbors, service.edges.value.toSet()) + assertTrue(service.nodeCount.value == 3 || service.nodeCount.value == 2) + } + + @Test + fun `circular topology path search terminates`() = runTest { + val service = MeshTopologyService() + + service.ingestNeighborInfo(neighborInfo(1, 2 to 1.0f)) + service.ingestNeighborInfo(neighborInfo(2, 3 to 1.0f)) + service.ingestNeighborInfo(neighborInfo(3, 1 to 1.0f, 4 to 1.0f)) + + val path = withTimeout(1.seconds) { service.shortestPath(NodeId(1), NodeId(4)) } + + assertEquals(NodeId(1), path.first()) + assertEquals(NodeId(4), path.last()) + assertTrue(path.size in 3..4) + } + + @Test + fun `empty graph returns empty path and no direct reach`() = runTest { + val service = MeshTopologyService() + + assertEquals(emptyList(), service.shortestPath(NodeId(1), NodeId(2))) + assertFalse(service.isDirectReach(NodeId(1), NodeId(2))) + assertEquals(emptyList(), service.edges.value) + assertEquals(0, service.nodeCount.value) + } + + @Test + fun `clear removes all topology state`() = runTest { + val service = MeshTopologyService() + + service.ingestNeighborInfo(neighborInfo(1, 2 to 5.0f)) + service.clear() + + assertEquals(emptyList(), service.edges.value) + assertEquals(0, service.nodeCount.value) + assertEquals(emptyList(), service.getNeighbors(NodeId(1))) + } + + private fun neighborInfo( + reporter: Int, + vararg neighbors: Pair, + lastUpdated: Int = 0, + ): NeighborInfo = NeighborInfo( + nodeId = NodeId(reporter), + neighbors = neighbors.map { (neighbor, snr) -> NeighborInfo.Neighbor(NodeId(neighbor), snr) }, + lastUpdated = lastUpdated, + ) + + private fun edge( + from: Int, + to: Int, + snr: Float, + lastUpdated: Int = 0, + ): MeshTopology.Edge = MeshTopology.Edge(NodeId(from), NodeId(to), snr, lastUpdated) +} diff --git a/core/data/src/commonTest/kotlin/org/meshtastic/core/data/radio/MessageDeliveryTrackerTest.kt b/core/data/src/commonTest/kotlin/org/meshtastic/core/data/radio/MessageDeliveryTrackerTest.kt new file mode 100644 index 000000000..fddb57083 --- /dev/null +++ b/core/data/src/commonTest/kotlin/org/meshtastic/core/data/radio/MessageDeliveryTrackerTest.kt @@ -0,0 +1,326 @@ +/* + * Copyright (c) 2026 Meshtastic LLC + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see . + */ +package org.meshtastic.core.data.radio + +import dev.mokkery.MockMode +import dev.mokkery.answering.calls +import dev.mokkery.everySuspend +import dev.mokkery.matcher.any +import dev.mokkery.mock +import kotlinx.coroutines.ExperimentalCoroutinesApi +import kotlinx.coroutines.test.StandardTestDispatcher +import kotlinx.coroutines.test.TestScope +import kotlinx.coroutines.test.advanceTimeBy +import kotlinx.coroutines.test.advanceUntilIdle +import kotlinx.coroutines.test.runCurrent +import kotlinx.coroutines.test.runTest +import okio.ByteString.Companion.toByteString +import org.meshtastic.core.di.CoroutineDispatchers +import org.meshtastic.core.model.DataPacket +import org.meshtastic.core.model.MessageStatus +import org.meshtastic.core.repository.PacketRepository +import org.meshtastic.proto.Data +import org.meshtastic.proto.MeshPacket +import org.meshtastic.proto.PortNum +import org.meshtastic.proto.Routing +import org.meshtastic.sdk.RadioClient +import org.meshtastic.sdk.RetryPolicy +import org.meshtastic.sdk.TransportIdentity +import org.meshtastic.sdk.testing.FakeRadioTransport +import org.meshtastic.sdk.testing.InMemoryStorageProvider +import kotlin.test.Test +import kotlin.test.assertEquals +import kotlin.time.Duration +import kotlin.time.Duration.Companion.milliseconds +import kotlin.time.Duration.Companion.seconds + +@OptIn(ExperimentalCoroutinesApi::class) +class MessageDeliveryTrackerTest { + + @Test + fun `sent acked flow is persisted as delivered`() = runTest { + val updates = linkedMapOf>() + val repository = mockPacketRepository(updates) + val tracker = buildTracker(repository) + val transport = fakeTransport("fake:acked") + val client = buildClient(transport) + + client.connect() + val handle = client.send(unicastPacket("acked")) + tracker.track(101, handle, RetryPolicy.None) + + runCurrent() + transport.injectRoutingAck(transport.lastTextPacketId()) + runCurrent() + advanceUntilIdle() + + assertEquals( + listOf( + MessageStatus.ENROUTE, + MessageStatus.DELIVERED, + MessageStatus.DELIVERED, + ), + updates.getValue(101), + ) + + client.disconnect() + } + + @Test + fun `retry exhaustion ends in error after the final attempt`() = runTest { + val updates = linkedMapOf>() + val repository = mockPacketRepository(updates) + val tracker = buildTracker(repository) + val transport = fakeTransport("fake:retry-exhausted") + val client = buildClient(transport, sendTimeout = 100.milliseconds) + + client.connect() + val handle = client.send(unicastPacket("retry-exhausted")) + tracker.track(102, handle, RetryPolicy.Fixed(maxAttempts = 1, delay = 100.milliseconds)) + + runCurrent() + assertEquals(1, transport.sentTextPackets().size) + + advanceTimeBy(100.milliseconds) + runCurrent() + advanceTimeBy(100.milliseconds) + runCurrent() + assertEquals(2, transport.sentTextPackets().size) + + advanceTimeBy(100.milliseconds) + runCurrent() + advanceUntilIdle() + + assertEquals(MessageStatus.ERROR, updates.getValue(102).last()) + + client.disconnect() + } + + @Test + fun `routing failure transitions from enroute to error`() = runTest { + val updates = linkedMapOf>() + val repository = mockPacketRepository(updates) + val tracker = buildTracker(repository) + val transport = fakeTransport("fake:routing-error") + val client = buildClient(transport) + + client.connect() + val handle = client.send(unicastPacket("fail")) + tracker.track(103, handle, RetryPolicy.None) + + runCurrent() + transport.injectRoutingError(transport.lastTextPacketId(), Routing.Error.NO_ROUTE) + runCurrent() + advanceUntilIdle() + + assertEquals( + listOf( + MessageStatus.ENROUTE, + MessageStatus.ENROUTE, + MessageStatus.ERROR, + ), + updates.getValue(103), + ) + + client.disconnect() + } + + @Test + fun `retry policy resends after exponential backoff`() = runTest { + val updates = linkedMapOf>() + val repository = mockPacketRepository(updates) + val tracker = buildTracker(repository) + val transport = fakeTransport("fake:retry") + val client = buildClient(transport, sendTimeout = 100.milliseconds) + + client.connect() + val handle = client.send(unicastPacket("retry")) + tracker.track( + 104, + handle, + RetryPolicy.ExponentialBackoff( + maxAttempts = 2, + initialDelay = 1.seconds, + maxDelay = 2.seconds, + jitterFactor = 0.0, + ), + ) + + runCurrent() + assertEquals(1, transport.sentTextPackets().size) + + advanceTimeBy(100.milliseconds) + runCurrent() + assertEquals(1, transport.sentTextPackets().size) + + advanceTimeBy(999.milliseconds) + runCurrent() + assertEquals(1, transport.sentTextPackets().size) + + advanceTimeBy(1.milliseconds) + runCurrent() + assertEquals(2, transport.sentTextPackets().size) + + transport.injectRoutingAck(transport.lastTextPacketId()) + runCurrent() + advanceUntilIdle() + + assertEquals(MessageStatus.DELIVERED, updates.getValue(104).last()) + + client.disconnect() + } + + @Test + fun `concurrent messages are tracked independently`() = runTest { + val updates = linkedMapOf>() + val repository = mockPacketRepository(updates) + val tracker = buildTracker(repository) + val transport = fakeTransport("fake:concurrent") + val client = buildClient(transport, sendTimeout = 200.milliseconds) + + client.connect() + val firstHandle = client.send(unicastPacket("first")) + val secondHandle = client.send(unicastPacket("second")) + tracker.track(201, firstHandle, RetryPolicy.None) + tracker.track(202, secondHandle, RetryPolicy.None) + + runCurrent() + val requestIds = transport.sentTextPackets().takeLast(2).map { it.id } + transport.injectRoutingAck(requestIds.first()) + runCurrent() + + advanceTimeBy(200.milliseconds) + runCurrent() + advanceUntilIdle() + + assertEquals(MessageStatus.DELIVERED, updates.getValue(201).last()) + assertEquals(MessageStatus.ERROR, updates.getValue(202).last()) + + client.disconnect() + } + + @Test + fun `timeout marks message as error`() = runTest { + val updates = linkedMapOf>() + val repository = mockPacketRepository(updates) + val tracker = buildTracker(repository) + val transport = fakeTransport("fake:timeout") + val client = buildClient(transport, sendTimeout = 150.milliseconds) + + client.connect() + val handle = client.send(unicastPacket("timeout")) + tracker.track(203, handle, RetryPolicy.None) + + runCurrent() + advanceTimeBy(150.milliseconds) + runCurrent() + advanceUntilIdle() + + assertEquals(MessageStatus.ERROR, updates.getValue(203).last()) + + client.disconnect() + } + + @Test + fun `duplicate ack after delivery does not add extra updates`() = runTest { + val updates = linkedMapOf>() + val repository = mockPacketRepository(updates) + val tracker = buildTracker(repository) + val transport = fakeTransport("fake:duplicate-ack") + val client = buildClient(transport) + + client.connect() + val handle = client.send(unicastPacket("duplicate")) + tracker.track(204, handle, RetryPolicy.None) + + runCurrent() + val requestId = transport.lastTextPacketId() + transport.injectRoutingAck(requestId) + runCurrent() + advanceUntilIdle() + + val completedUpdates = updates.getValue(204).toList() + + transport.injectRoutingAck(requestId) + runCurrent() + advanceUntilIdle() + + assertEquals(completedUpdates, updates.getValue(204)) + + client.disconnect() + } + + private fun TestScope.buildClient( + transport: FakeRadioTransport, + sendTimeout: Duration = 5.seconds, + ): RadioClient = RadioClient.Builder() + .transport(transport) + .storage(InMemoryStorageProvider()) + .coroutineContext(backgroundScope.coroutineContext) + .sendTimeout(sendTimeout) + .build() + + private fun TestScope.buildTracker(packetRepository: PacketRepository): MessageDeliveryTracker { + val dispatcher = StandardTestDispatcher(testScheduler) + return MessageDeliveryTracker( + packetRepository = lazyOf(packetRepository), + dispatchers = CoroutineDispatchers(dispatcher, dispatcher, dispatcher), + ) + } + + private fun mockPacketRepository( + updates: MutableMap>, + ): PacketRepository { + val repository = mock(MockMode.autofill) + + everySuspend { repository.getPacketByPacketId(any()) } calls { args -> + DataPacket(bytes = null, dataType = 0, id = args.arg(0)) + } + everySuspend { repository.updateMessageStatus(any(), any()) } calls { args -> + updates.record(args.arg(0), args.arg(1)) + } + everySuspend { repository.updateMessageStatus(any(), any()) } calls { args -> + updates.record(args.arg(0).id, args.arg(1)) + } + + return repository + } + + private fun MutableMap>.record(packetId: Int, status: MessageStatus) { + getOrPut(packetId) { mutableListOf() }.add(status) + } + + private fun fakeTransport(identity: String) = FakeRadioTransport( + identity = TransportIdentity(identity), + autoHandshake = true, + ) + + private fun unicastPacket(text: String) = MeshPacket( + to = 0x12345678, + channel = 0, + want_ack = true, + decoded = Data( + portnum = PortNum.TEXT_MESSAGE_APP, + payload = text.encodeToByteArray().toByteString(), + ), + ) + + private fun FakeRadioTransport.sentTextPackets(): List = + outboundPackets().filter { it.decoded?.portnum == PortNum.TEXT_MESSAGE_APP } + + private fun FakeRadioTransport.lastTextPacketId(): Int = sentTextPackets().last().id +} diff --git a/core/data/src/commonTest/kotlin/org/meshtastic/core/data/radio/SdkStateBridgeTest.kt b/core/data/src/commonTest/kotlin/org/meshtastic/core/data/radio/SdkStateBridgeTest.kt index b987d9f5a..6c26e6e84 100644 --- a/core/data/src/commonTest/kotlin/org/meshtastic/core/data/radio/SdkStateBridgeTest.kt +++ b/core/data/src/commonTest/kotlin/org/meshtastic/core/data/radio/SdkStateBridgeTest.kt @@ -76,7 +76,7 @@ class SdkStateBridgeTest { ), ) } - val (_, client) = connectedClient(SeededHeartbeatStorageProvider(mapOf(remoteNode to staleHeartbeatMs))) + val (_, client) = connectedClient(StateBridgeHeartbeatStorageProvider(mapOf(remoteNode to staleHeartbeatMs))) buildBridge(client, nodeRepository) client.connect() @@ -107,7 +107,7 @@ class SdkStateBridgeTest { ), ) } - val (transport, client) = connectedClient(SeededHeartbeatStorageProvider(mapOf(remoteNode to staleHeartbeatMs))) + val (transport, client) = connectedClient(StateBridgeHeartbeatStorageProvider(mapOf(remoteNode to staleHeartbeatMs))) buildBridge(client, nodeRepository) client.connect() @@ -134,7 +134,7 @@ class SdkStateBridgeTest { @Test fun `sfpp link provided updates packet repository`() = runTest { val packetRepository = mock(MockMode.autofill) - val (transport, client) = connectedClient(SeededHeartbeatStorageProvider(emptyMap())) + val (transport, client) = connectedClient(StateBridgeHeartbeatStorageProvider(emptyMap())) buildBridge(client, FakeNodeRepository(), packetRepository) client.connect() @@ -170,7 +170,7 @@ class SdkStateBridgeTest { @Test fun `sfpp canon announce updates packet repository by hash`() = runTest { val packetRepository = mock(MockMode.autofill) - val (transport, client) = connectedClient(SeededHeartbeatStorageProvider(emptyMap())) + val (transport, client) = connectedClient(StateBridgeHeartbeatStorageProvider(emptyMap())) buildBridge(client, FakeNodeRepository(), packetRepository) client.connect() @@ -199,7 +199,7 @@ class SdkStateBridgeTest { @Test fun `congestion warning updates service repository congestion level`() = runTest { val serviceRepo = FakeServiceRepository() - val (transport, client) = connectedClient(SeededHeartbeatStorageProvider(emptyMap())) + val (transport, client) = connectedClient(StateBridgeHeartbeatStorageProvider(emptyMap())) buildBridge(client, FakeNodeRepository(), serviceRepository = serviceRepo) client.connect() @@ -231,7 +231,7 @@ class SdkStateBridgeTest { @Test fun `store forward server list propagates to service repository`() = runTest { val serviceRepo = FakeServiceRepository() - val (transport, client) = connectedClient(SeededHeartbeatStorageProvider(emptyMap())) + val (transport, client) = connectedClient(StateBridgeHeartbeatStorageProvider(emptyMap())) buildBridge(client, FakeNodeRepository(), serviceRepository = serviceRepo) client.connect() @@ -325,7 +325,7 @@ class SdkStateBridgeTest { } } -private class SeededHeartbeatStorageProvider( +private class StateBridgeHeartbeatStorageProvider( private val heartbeats: Map, ) : StorageProvider { override suspend fun activate(identity: TransportIdentity): DeviceStorage =