diff --git a/core/data/src/androidHostTest/kotlin/org/meshtastic/core/data/repository/PacketRepositoryImplTest.kt b/core/data/src/androidHostTest/kotlin/org/meshtastic/core/data/repository/PacketRepositoryImplTest.kt
new file mode 100644
index 000000000..b4d0603eb
--- /dev/null
+++ b/core/data/src/androidHostTest/kotlin/org/meshtastic/core/data/repository/PacketRepositoryImplTest.kt
@@ -0,0 +1,32 @@
+/*
+ * Copyright (c) 2026 Meshtastic LLC
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation, either version 3 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program. If not, see .
+ */
+package org.meshtastic.core.data.repository
+
+import kotlin.test.BeforeTest
+import org.junit.runner.RunWith
+import org.robolectric.RobolectricTestRunner
+import org.robolectric.annotation.Config
+
+@RunWith(RobolectricTestRunner::class)
+@Config(sdk = [34])
+class PacketRepositoryImplTest : CommonPacketRepositoryTest() {
+
+ @BeforeTest
+ fun setUp() {
+ setupRepo()
+ }
+}
diff --git a/core/data/src/commonTest/kotlin/org/meshtastic/core/data/radio/SdkRadioControllerTest.kt b/core/data/src/commonTest/kotlin/org/meshtastic/core/data/radio/SdkRadioControllerTest.kt
new file mode 100644
index 000000000..cd62081ad
--- /dev/null
+++ b/core/data/src/commonTest/kotlin/org/meshtastic/core/data/radio/SdkRadioControllerTest.kt
@@ -0,0 +1,435 @@
+/*
+ * Copyright (c) 2026 Meshtastic LLC
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation, either version 3 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program. If not, see .
+ */
+package org.meshtastic.core.data.radio
+
+import dev.mokkery.MockMode
+import dev.mokkery.mock
+import kotlinx.coroutines.CoroutineDispatcher
+import kotlinx.coroutines.CoroutineScope
+import kotlinx.coroutines.ExperimentalCoroutinesApi
+import kotlinx.coroutines.async
+import kotlinx.coroutines.flow.MutableStateFlow
+import kotlinx.coroutines.test.TestScope
+import kotlinx.coroutines.test.advanceTimeBy
+import kotlinx.coroutines.test.advanceUntilIdle
+import kotlinx.coroutines.test.runCurrent
+import kotlinx.coroutines.test.runTest
+import org.meshtastic.core.di.CoroutineDispatchers
+import org.meshtastic.core.model.AdminException
+import org.meshtastic.core.model.DataPacket
+import org.meshtastic.core.model.Position
+import org.meshtastic.core.model.TelemetryType
+import org.meshtastic.core.repository.MeshLocationManager
+import org.meshtastic.core.repository.PacketRepository
+import org.meshtastic.core.testing.FakeNodeRepository
+import org.meshtastic.core.testing.FakeRadioPrefs
+import org.meshtastic.core.testing.FakeServiceRepository
+import org.meshtastic.proto.AdminMessage
+import org.meshtastic.proto.Channel
+import org.meshtastic.proto.ChannelSettings
+import org.meshtastic.proto.Config
+import org.meshtastic.proto.DeviceMetrics
+import org.meshtastic.proto.LocalStats
+import org.meshtastic.proto.MeshPacket
+import org.meshtastic.proto.ModuleConfig
+import org.meshtastic.proto.PortNum
+import org.meshtastic.proto.Routing
+import org.meshtastic.proto.Telemetry
+import org.meshtastic.sdk.RadioClient
+import org.meshtastic.sdk.TransportIdentity
+import org.meshtastic.sdk.testing.FakeRadioTransport
+import org.meshtastic.sdk.testing.InMemoryStorageProvider
+import kotlin.test.Test
+import kotlin.test.assertEquals
+import kotlin.test.assertFailsWith
+import kotlin.test.assertTrue
+import kotlin.time.Duration.Companion.seconds
+
+@OptIn(ExperimentalCoroutinesApi::class)
+class SdkRadioControllerTest {
+
+ @Test
+ fun `setLocalConfig forwards admin config write`() = runTest {
+ val fixture = connectedFixture()
+ try {
+ val config = Config(device = Config.DeviceConfig(role = Config.DeviceConfig.Role.CLIENT))
+ val outboundBefore = fixture.transport.outboundPackets().size
+ val deferred = async { fixture.controller.setLocalConfig(config) }
+ runCurrent()
+
+ val request = fixture.transport.outboundPackets().drop(outboundBefore).last { adminOf(it)?.set_config == config }
+ assertTrue(request.want_ack)
+ fixture.transport.injectRoutingAck(request.id)
+ runCurrent()
+
+ deferred.await()
+ } finally {
+ fixture.client.disconnect()
+ }
+ }
+
+ @Test
+ fun `setRemoteChannel forwards remote admin channel write`() = runTest {
+ val fixture = connectedFixture()
+ try {
+ val destNum = 0x22334455
+ val channel = Channel(index = 1, role = Channel.Role.SECONDARY, settings = ChannelSettings(name = "Ops"))
+ val outboundBefore = fixture.transport.outboundPackets().size
+ val deferred = async { fixture.controller.setRemoteChannel(destNum, channel) }
+ runCurrent()
+
+ val request = fixture.transport.outboundPackets().drop(outboundBefore).last { adminOf(it)?.set_channel == channel }
+ assertEquals(destNum, request.to)
+ fixture.transport.injectRoutingAck(request.id, fromNode = destNum)
+ runCurrent()
+
+ deferred.await()
+ } finally {
+ fixture.client.disconnect()
+ }
+ }
+
+ @Test
+ fun `setRemoteChannel forwards disabled channel for removal`() = runTest {
+ val fixture = connectedFixture()
+ try {
+ val destNum = 0x33445566
+ val channel = Channel(index = 2, role = Channel.Role.DISABLED)
+ val outboundBefore = fixture.transport.outboundPackets().size
+ val deferred = async { fixture.controller.setRemoteChannel(destNum, channel) }
+ runCurrent()
+
+ val request = fixture.transport.outboundPackets().drop(outboundBefore).last { adminOf(it)?.set_channel == channel }
+ assertEquals(destNum, request.to)
+ fixture.transport.injectRoutingAck(request.id, fromNode = destNum)
+ runCurrent()
+
+ deferred.await()
+ } finally {
+ fixture.client.disconnect()
+ }
+ }
+
+ @Test
+ fun `requestTelemetry forwards device request to sdk`() = runTest {
+ val fixture = connectedFixture()
+ try {
+ val destNum = 0x44556677
+ val outboundBefore = fixture.transport.outboundPackets().size
+ val deferred = async { fixture.controller.requestTelemetry(destNum, TelemetryType.DEVICE) }
+ runCurrent()
+
+ val request = fixture.transport.outboundPackets().drop(outboundBefore).last { telemetryOf(it) != null }
+ assertEquals(destNum, request.to)
+ assertEquals(PortNum.TELEMETRY_APP, request.decoded?.portnum)
+ assertTrue(request.decoded?.want_response == true)
+ fixture.transport.injectTelemetryResponse(
+ requestId = request.id,
+ telemetry = Telemetry(device_metrics = DeviceMetrics(battery_level = 87)),
+ fromNode = destNum,
+ )
+ runCurrent()
+
+ deferred.await()
+ } finally {
+ fixture.client.disconnect()
+ }
+ }
+
+ @Test
+ fun `requestTelemetry local stats targets local node`() = runTest {
+ val fixture = connectedFixture()
+ try {
+ val outboundBefore = fixture.transport.outboundPackets().size
+ val deferred = async { fixture.controller.requestTelemetry(0xCAFEBABE.toInt(), TelemetryType.LOCAL_STATS) }
+ runCurrent()
+
+ val request = fixture.transport.outboundPackets().drop(outboundBefore).last { telemetryOf(it) != null }
+ assertEquals(fixture.myNodeNum, request.to)
+ fixture.transport.injectTelemetryResponse(
+ requestId = request.id,
+ telemetry = Telemetry(local_stats = LocalStats(uptime_seconds = 123)),
+ fromNode = fixture.myNodeNum,
+ )
+ runCurrent()
+
+ deferred.await()
+ } finally {
+ fixture.client.disconnect()
+ }
+ }
+
+ @Test
+ fun `requestPosition encodes coordinates and requests ack`() = runTest {
+ val fixture = connectedFixture()
+ try {
+ val destNum = 0x55667788
+ val position = Position(latitude = 37.1234567, longitude = -122.7654321, altitude = 42, time = 1_700_000_123)
+ val outboundBefore = fixture.transport.outboundPackets().size
+
+ fixture.controller.requestPosition(destNum, position)
+ runCurrent()
+
+ val request = fixture.transport.outboundPackets().drop(outboundBefore).last { it.decoded?.portnum == PortNum.POSITION_APP }
+ val sentPosition = org.meshtastic.proto.Position.ADAPTER.decode(request.decoded!!.payload)
+ assertEquals(destNum, request.to)
+ assertTrue(request.want_ack)
+ assertEquals(Position.degI(position.latitude), sentPosition.latitude_i)
+ assertEquals(Position.degI(position.longitude), sentPosition.longitude_i)
+ assertEquals(position.altitude, sentPosition.altitude)
+ assertEquals(position.time, sentPosition.time)
+ } finally {
+ fixture.client.disconnect()
+ }
+ }
+
+ @Test
+ fun `setFixedPosition encodes coordinates for admin api`() = runTest {
+ val fixture = connectedFixture()
+ try {
+ val destNum = 0x66778899
+ val position = Position(latitude = 12.3456789, longitude = 98.7654321, altitude = 321, time = 456)
+ val outboundBefore = fixture.transport.outboundPackets().size
+ val deferred = async { fixture.controller.setFixedPosition(destNum, position) }
+ runCurrent()
+
+ val request = fixture.transport.outboundPackets().drop(outboundBefore).last { adminOf(it)?.set_fixed_position != null }
+ val sentPosition = adminOf(request)!!.set_fixed_position!!
+ assertEquals(destNum, request.to)
+ assertEquals(Position.degI(position.latitude), sentPosition.latitude_i)
+ assertEquals(Position.degI(position.longitude), sentPosition.longitude_i)
+ assertEquals(position.altitude, sentPosition.altitude)
+ assertEquals(position.time, sentPosition.time)
+ fixture.transport.injectRoutingAck(request.id, fromNode = destNum)
+ runCurrent()
+
+ deferred.await()
+ } finally {
+ fixture.client.disconnect()
+ }
+ }
+
+ @Test
+ fun `getConfig returns sdk config response`() = runTest {
+ val fixture = connectedFixture()
+ try {
+ val destNum = 0x10203040
+ val expected = Config(device = Config.DeviceConfig(role = Config.DeviceConfig.Role.CLIENT))
+ val outboundBefore = fixture.transport.outboundPackets().size
+ val deferred = async { fixture.controller.getConfig(destNum, AdminMessage.ConfigType.DEVICE_CONFIG.value) }
+ runCurrent()
+
+ val request = fixture.transport.outboundPackets().drop(outboundBefore)
+ .last { adminOf(it)?.get_config_request == AdminMessage.ConfigType.DEVICE_CONFIG }
+ fixture.transport.injectAdminResponse(
+ requestId = request.id,
+ response = AdminMessage(get_config_response = expected),
+ fromNode = destNum,
+ )
+ runCurrent()
+
+ assertEquals(expected, deferred.await())
+ } finally {
+ fixture.client.disconnect()
+ }
+ }
+
+ @Test
+ fun `getModuleConfig returns sdk module config response`() = runTest {
+ val fixture = connectedFixture()
+ try {
+ val destNum = 0x11223344
+ val expected = ModuleConfig(mqtt = ModuleConfig.MQTTConfig(enabled = true))
+ val outboundBefore = fixture.transport.outboundPackets().size
+ val deferred = async { fixture.controller.getModuleConfig(destNum, AdminMessage.ModuleConfigType.MQTT_CONFIG.value) }
+ runCurrent()
+
+ val request = fixture.transport.outboundPackets().drop(outboundBefore)
+ .last { adminOf(it)?.get_module_config_request == AdminMessage.ModuleConfigType.MQTT_CONFIG }
+ fixture.transport.injectAdminResponse(
+ requestId = request.id,
+ response = AdminMessage(get_module_config_response = expected),
+ fromNode = destNum,
+ )
+ runCurrent()
+
+ assertEquals(expected, deferred.await())
+ } finally {
+ fixture.client.disconnect()
+ }
+ }
+
+ @Test
+ fun `listChannels delegates to sdk and stops at disabled slot`() = runTest {
+ val fixture = connectedFixture()
+ try {
+ val destNum = 0x7ABCDE01
+ val outboundBefore = fixture.transport.outboundPackets().size
+ val deferred = async { fixture.controller.listChannels(destNum) }
+
+ repeat(3) {
+ runCurrent()
+ val request = fixture.transport.outboundPackets().drop(outboundBefore).last { adminOf(it)?.get_channel_request != null }
+ assertEquals(destNum, request.to)
+ val wireIndex = adminOf(request)!!.get_channel_request!!
+ val channelIndex = wireIndex - 1
+ val channel = if (channelIndex < 2) {
+ Channel(index = channelIndex, role = Channel.Role.PRIMARY, settings = ChannelSettings(name = "Channel $channelIndex"))
+ } else {
+ Channel(index = channelIndex, role = Channel.Role.DISABLED)
+ }
+ fixture.transport.injectAdminResponse(
+ requestId = request.id,
+ response = AdminMessage(get_channel_response = channel),
+ fromNode = destNum,
+ )
+ }
+ runCurrent()
+ advanceUntilIdle()
+
+ val channels = deferred.await()
+ assertEquals(listOf("Channel 0", "Channel 1"), channels.map { it.settings?.name })
+ } finally {
+ fixture.client.disconnect()
+ }
+ }
+
+ @Test
+ fun `reboot forwards reboot command as fire and forget admin write`() = runTest {
+ val fixture = connectedFixture()
+ try {
+ val destNum = 0x55667711
+ val outboundBefore = fixture.transport.outboundPackets().size
+ val deferred = async { fixture.controller.reboot(destNum) }
+ runCurrent()
+
+ val request = fixture.transport.outboundPackets().drop(outboundBefore).last { adminOf(it)?.reboot_seconds == 0 }
+ assertEquals(destNum, request.to)
+ fixture.transport.injectRoutingAck(request.id, fromNode = destNum)
+ runCurrent()
+
+ deferred.await()
+ } finally {
+ fixture.client.disconnect()
+ }
+ }
+
+ @Test
+ fun `sdk timeouts surface to callers`() = runTest {
+ val fixture = connectedFixture()
+ try {
+ val destNum = 0x12345678
+ val deferred = async {
+ assertFailsWith {
+ fixture.controller.getConfig(destNum, AdminMessage.ConfigType.DEVICE_CONFIG.value)
+ }
+ }
+ runCurrent()
+ advanceTimeBy(70.seconds)
+ runCurrent()
+
+ deferred.await()
+ } finally {
+ fixture.client.disconnect()
+ }
+ }
+
+ @Test
+ fun `unauthorized admin operations surface permission errors`() = runTest {
+ val fixture = connectedFixture()
+ try {
+ val destNum = 0x21436587
+ val config = Config(device = Config.DeviceConfig(role = Config.DeviceConfig.Role.CLIENT))
+ val outboundBefore = fixture.transport.outboundPackets().size
+ val deferred = async {
+ assertFailsWith {
+ fixture.controller.setConfig(destNum, config)
+ }
+ }
+ runCurrent()
+
+ val request = fixture.transport.outboundPackets().drop(outboundBefore).last { adminOf(it)?.set_config == config }
+ fixture.transport.injectRoutingError(request.id, Routing.Error.ADMIN_PUBLIC_KEY_UNAUTHORIZED, fromNode = destNum)
+ runCurrent()
+
+ deferred.await()
+ } finally {
+ fixture.client.disconnect()
+ }
+ }
+
+ private suspend fun TestScope.connectedFixture(myNodeNum: Int = 0x11111111): ControllerFixture {
+ val transport = FakeRadioTransport(
+ identity = TransportIdentity("fake:sdk-radio-controller"),
+ autoHandshake = true,
+ nodeNum = myNodeNum,
+ )
+ val client = RadioClient.Builder()
+ .transport(transport)
+ .storage(InMemoryStorageProvider())
+ .autoSyncTimeOnConnect(false)
+ .coroutineContext(backgroundScope.coroutineContext)
+ .rpcTimeout(60.seconds)
+ .sendTimeout(60.seconds)
+ .build()
+ val dispatcher = backgroundScope.coroutineContext[kotlin.coroutines.ContinuationInterceptor] as CoroutineDispatcher
+ val controller = SdkRadioController(
+ accessor = TestRadioClientAccessor(client),
+ serviceRepository = FakeServiceRepository(),
+ nodeRepository = FakeNodeRepository(),
+ locationManager = NoOpLocationManager,
+ deliveryTracker = MessageDeliveryTracker(lazyOf(mock(MockMode.autofill)), CoroutineDispatchers(dispatcher, dispatcher, dispatcher)),
+ radioPrefs = FakeRadioPrefs(),
+ )
+ client.connect()
+ runCurrent()
+ return ControllerFixture(controller = controller, transport = transport, client = client, myNodeNum = myNodeNum)
+ }
+
+ private fun adminOf(packet: MeshPacket): AdminMessage? {
+ val decoded = packet.decoded ?: return null
+ if (decoded.portnum != PortNum.ADMIN_APP) return null
+ return runCatching { AdminMessage.ADAPTER.decode(decoded.payload) }.getOrNull()
+ }
+
+ private fun telemetryOf(packet: MeshPacket): Telemetry? {
+ val decoded = packet.decoded ?: return null
+ if (decoded.portnum != PortNum.TELEMETRY_APP) return null
+ return runCatching { Telemetry.ADAPTER.decode(decoded.payload) }.getOrNull()
+ }
+
+ private data class ControllerFixture(
+ val controller: SdkRadioController,
+ val transport: FakeRadioTransport,
+ val client: RadioClient,
+ val myNodeNum: Int,
+ )
+
+ private class TestRadioClientAccessor(client: RadioClient) : RadioClientAccessor {
+ override val client = MutableStateFlow(client)
+
+ override fun rebuildAndConnectAsync() = Unit
+
+ override fun disconnect() = Unit
+ }
+
+ private object NoOpLocationManager : MeshLocationManager {
+ override fun start(scope: CoroutineScope, sendPositionFn: (org.meshtastic.proto.Position) -> Unit) = Unit
+
+ override fun stop() = Unit
+ }
+}
diff --git a/core/data/src/commonTest/kotlin/org/meshtastic/core/data/repository/CommonPacketRepositoryTest.kt b/core/data/src/commonTest/kotlin/org/meshtastic/core/data/repository/CommonPacketRepositoryTest.kt
index cffa154c9..897e047e1 100644
--- a/core/data/src/commonTest/kotlin/org/meshtastic/core/data/repository/CommonPacketRepositoryTest.kt
+++ b/core/data/src/commonTest/kotlin/org/meshtastic/core/data/repository/CommonPacketRepositoryTest.kt
@@ -16,55 +16,273 @@
*/
package org.meshtastic.core.data.repository
+import kotlinx.coroutines.Dispatchers
+import kotlinx.coroutines.coroutineScope
+import kotlinx.coroutines.flow.first
+import kotlinx.coroutines.launch
import kotlinx.coroutines.test.UnconfinedTestDispatcher
import kotlinx.coroutines.test.runTest
+import okio.ByteString
+import okio.ByteString.Companion.toByteString
import org.meshtastic.core.di.CoroutineDispatchers
import org.meshtastic.core.model.DataPacket
+import org.meshtastic.core.model.Node
+import org.meshtastic.core.model.Reaction
import org.meshtastic.core.testing.FakeDatabaseProvider
import org.meshtastic.core.testing.FakeNodeRepository
+import org.meshtastic.core.testing.TestDataFactory
+import org.meshtastic.core.testing.setupTestContext
+import org.meshtastic.proto.PortNum
+import org.meshtastic.proto.User
+import org.meshtastic.proto.Waypoint
import kotlin.test.AfterTest
import kotlin.test.Test
import kotlin.test.assertEquals
+import kotlin.test.assertFalse
+import kotlin.test.assertNotNull
+import kotlin.test.assertTrue
abstract class CommonPacketRepositoryTest {
protected lateinit var dbProvider: FakeDatabaseProvider
private val testDispatcher = UnconfinedTestDispatcher()
private val dispatchers = CoroutineDispatchers(main = testDispatcher, io = testDispatcher, default = testDispatcher)
- private val nodeRepository = FakeNodeRepository()
+ protected val nodeRepository = FakeNodeRepository()
protected lateinit var repository: PacketRepositoryImpl
+ private val myNodeNum = 1
+ private val broadcastContact = "0${DataPacket.nodeNumToId(DataPacket.BROADCAST)}"
+
fun setupRepo() {
+ setupTestContext()
dbProvider = FakeDatabaseProvider()
repository = PacketRepositoryImpl(dbProvider, dispatchers, nodeRepository)
+ nodeRepository.setMyNodeInfo(TestDataFactory.createMyNodeInfo(myNodeNum = myNodeNum))
}
@AfterTest
fun tearDown() {
- dbProvider.close()
+ if (::dbProvider.isInitialized) {
+ dbProvider.close()
+ }
}
@Test
fun `savePacket persists and retrieves waypoints`() = runTest(testDispatcher) {
- val myNodeNum = 1
- val contact = "contact"
+ val packet = DataPacket(to = DataPacket.BROADCAST, bytes = ByteString.EMPTY, dataType = PortNum.TEXT_MESSAGE_APP.value, id = 123)
- // Set the current node number so PacketRepositoryImpl can pass it to queries
- nodeRepository.setMyNodeInfo(org.meshtastic.core.testing.TestDataFactory.createMyNodeInfo(myNodeNum = myNodeNum))
+ repository.savePacket(myNodeNum, broadcastContact, packet, 1000L)
- val packet = DataPacket(to = DataPacket.BROADCAST, bytes = okio.ByteString.EMPTY, dataType = 1, id = 123)
-
- repository.savePacket(myNodeNum, contact, packet, 1000L)
-
- // Verify it was saved.
- val count = repository.getMessageCount(contact)
- assertEquals(1, count)
+ assertEquals(1, repository.getMessageCount(broadcastContact))
}
@Test
fun `clearAllUnreadCounts works with real DB`() = runTest(testDispatcher) {
repository.clearAllUnreadCounts()
- // No exception thrown
}
+
+ @Test
+ fun `getMessagesFrom limit keeps newest messages within boundary`() = runTest(testDispatcher) {
+ val contact = "1!abcd1234"
+ repeat(55) { index ->
+ saveTextPacket(contact = contact, id = index + 1, receivedTime = 1_000L + index, text = "Message $index", read = false)
+ }
+
+ val messages = repository.getMessagesFrom(contact = contact, limit = 50, getNode = ::lookupNode).first()
+
+ assertEquals(50, messages.size)
+ assertEquals("Message 54", messages.first().text)
+ assertEquals("Message 5", messages.last().text)
+ }
+
+ @Test
+ fun `unread counts track read and unread transitions`() = runTest(testDispatcher) {
+ val contact = "2!feedbeef"
+ saveTextPacket(contact = contact, id = 1, receivedTime = 100L, read = false)
+ saveTextPacket(contact = contact, id = 2, receivedTime = 200L, read = false)
+ saveTextPacket(contact = contact, id = 3, receivedTime = 300L, read = false)
+
+ assertEquals(3, repository.getUnreadCount(contact))
+ assertEquals(3, repository.getUnreadCountTotal().first())
+ assertTrue(repository.hasUnreadMessages(contact).first())
+ assertNotNull(repository.getFirstUnreadMessageUuid(contact).first())
+
+ repository.clearUnreadCount(contact, 200L)
+
+ assertEquals(1, repository.getUnreadCount(contact))
+ assertEquals(1, repository.getUnreadCountTotal().first())
+ assertTrue(repository.hasUnreadMessages(contact).first())
+
+ repository.clearAllUnreadCounts()
+
+ assertEquals(0, repository.getUnreadCount(contact))
+ assertEquals(0, repository.getUnreadCountTotal().first())
+ assertFalse(repository.hasUnreadMessages(contact).first())
+ assertEquals(null, repository.getFirstUnreadMessageUuid(contact).first())
+ }
+
+ @Test
+ fun `reactions can be added listed and removed with parent message`() = runTest(testDispatcher) {
+ val contact = "3!react000"
+ val replyId = 501
+ saveTextPacket(contact = contact, id = replyId, receivedTime = 1_000L, text = "Original", read = true)
+
+ val reaction =
+ Reaction(
+ replyId = replyId,
+ user = User(id = "!reactor"),
+ emoji = "👍",
+ timestamp = 2_000L,
+ snr = 1.5f,
+ rssi = -70,
+ hopsAway = 1,
+ packetId = replyId,
+ to = "!abcd1234",
+ channel = 3,
+ )
+
+ repository.insertReaction(reaction, myNodeNum)
+
+ val storedReaction = repository.getReactionByPacketId(replyId)
+ assertNotNull(storedReaction)
+ assertEquals("👍", storedReaction.emoji)
+ assertEquals("!reactor", storedReaction.user.id)
+
+ val reactions = repository.findReactionsWithId(replyId)
+ assertEquals(1, reactions.size)
+ assertEquals(replyId, reactions.single().replyId)
+ assertEquals("👍", reactions.single().emoji)
+
+ val messageUuid = repository.getMessagesFrom(contact = contact, getNode = ::lookupNode).first().single().uuid
+ repository.deleteMessages(listOf(messageUuid))
+
+ assertTrue(repository.findReactionsWithId(replyId).isEmpty())
+ assertEquals(null, repository.getReactionByPacketId(replyId))
+ }
+
+ @Test
+ fun `getWaypoints preserves channel data for filtering`() = runTest(testDispatcher) {
+ saveWaypointPacket(contact = broadcastContact, channel = 0, waypointId = 101, receivedTime = 1_000L)
+ saveWaypointPacket(contact = "2${DataPacket.nodeNumToId(DataPacket.BROADCAST)}", channel = 2, waypointId = 202, receivedTime = 2_000L)
+ saveTextPacket(contact = broadcastContact, id = 77, receivedTime = 3_000L)
+
+ val waypoints = repository.getWaypoints().first()
+ val channelTwoWaypoints = waypoints.filter { it.channel == 2 }
+
+ assertEquals(2, waypoints.size)
+ assertEquals(setOf(0, 2), waypoints.map { it.channel }.toSet())
+ assertEquals(listOf(202), channelTwoWaypoints.mapNotNull { it.waypoint?.id })
+ }
+
+ @Test
+ fun `contact keys keep channel and destination formats distinct`() = runTest(testDispatcher) {
+ val channelBroadcast = broadcastContact
+ val secondaryBroadcast = "1${DataPacket.nodeNumToId(DataPacket.BROADCAST)}"
+ val directMessage = "${DataPacket.PKC_CHANNEL_INDEX}!70fdde9b"
+
+ saveTextPacket(contact = channelBroadcast, id = 1, receivedTime = 100L, channel = 0)
+ saveTextPacket(contact = secondaryBroadcast, id = 2, receivedTime = 200L, channel = 1)
+ saveTextPacket(
+ contact = directMessage,
+ id = 3,
+ receivedTime = 300L,
+ channel = DataPacket.PKC_CHANNEL_INDEX,
+ to = 0x70fdde9b,
+ )
+
+ val contacts = repository.getContacts().first()
+
+ assertEquals(setOf(channelBroadcast, secondaryBroadcast, directMessage), contacts.keys)
+ assertEquals(0, contacts.getValue(channelBroadcast).channel)
+ assertEquals(1, contacts.getValue(secondaryBroadcast).channel)
+ assertEquals(DataPacket.PKC_CHANNEL_INDEX, contacts.getValue(directMessage).channel)
+ assertEquals(1, repository.getMessageCount(channelBroadcast))
+ assertEquals(1, repository.getMessageCount(secondaryBroadcast))
+ assertEquals(1, repository.getMessageCount(directMessage))
+ }
+
+ @Test
+ fun `getMessagesFrom returns empty flow for unknown contact`() = runTest(testDispatcher) {
+ val messages = repository.getMessagesFrom(contact = "7!missing", getNode = ::lookupNode).first()
+
+ assertTrue(messages.isEmpty())
+ }
+
+ @Test
+ fun `concurrent writes keep all packets intact`() = runTest {
+ val concurrentRepository = PacketRepositoryImpl(
+ dbManager = dbProvider,
+ dispatchers = CoroutineDispatchers(main = testDispatcher, io = Dispatchers.Default, default = Dispatchers.Default),
+ nodeRepository = nodeRepository,
+ )
+ val contact = "4!concur00"
+
+ coroutineScope {
+ repeat(100) { index ->
+ launch(Dispatchers.Default) {
+ concurrentRepository.savePacket(
+ myNodeNum = myNodeNum,
+ contactKey = contact,
+ packet = textPacket(id = index + 1, text = "Concurrent $index", channel = 4),
+ receivedTime = 10_000L + index,
+ read = false,
+ )
+ }
+ }
+ }
+
+ val messages = concurrentRepository.getMessagesFrom(contact = contact, getNode = ::lookupNode).first()
+
+ assertEquals(100, concurrentRepository.getMessageCount(contact))
+ assertEquals(100, messages.size)
+ assertEquals(100, messages.map { it.packetId }.distinct().size)
+ }
+
+ private suspend fun saveTextPacket(
+ contact: String,
+ id: Int,
+ receivedTime: Long,
+ text: String = "Message $id",
+ read: Boolean = true,
+ filtered: Boolean = false,
+ channel: Int = contact.first().digitToIntOrNull() ?: 0,
+ to: Int = DataPacket.BROADCAST,
+ from: Int = 0x12345678,
+ ) {
+ repository.savePacket(
+ myNodeNum = myNodeNum,
+ contactKey = contact,
+ packet = textPacket(id = id, text = text, channel = channel, to = to, from = from),
+ receivedTime = receivedTime,
+ read = read,
+ filtered = filtered,
+ )
+ }
+
+ private suspend fun saveWaypointPacket(contact: String, channel: Int, waypointId: Int, receivedTime: Long) {
+ repository.savePacket(
+ myNodeNum = myNodeNum,
+ contactKey = contact,
+ packet = DataPacket(to = DataPacket.BROADCAST, channel = channel, waypoint = Waypoint(id = waypointId, name = "Waypoint $waypointId")),
+ receivedTime = receivedTime,
+ )
+ }
+
+ private fun textPacket(
+ id: Int,
+ text: String,
+ channel: Int,
+ to: Int = DataPacket.BROADCAST,
+ from: Int = 0x12345678,
+ ) = DataPacket(
+ to = to,
+ bytes = text.encodeToByteArray().toByteString(),
+ dataType = PortNum.TEXT_MESSAGE_APP.value,
+ from = from,
+ id = id,
+ channel = channel,
+ )
+
+ private fun lookupNode(userId: String?): Node = Node(num = 0, user = User(id = userId.orEmpty(), long_name = userId.orEmpty()))
}