test: SdkRadioController and PacketRepository coverage

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
This commit is contained in:
James Rich
2026-05-06 15:06:13 -05:00
parent 37be51bed9
commit 2d299f7e21
3 changed files with 699 additions and 14 deletions

View File

@@ -0,0 +1,32 @@
/*
* Copyright (c) 2026 Meshtastic LLC
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*/
package org.meshtastic.core.data.repository
import kotlin.test.BeforeTest
import org.junit.runner.RunWith
import org.robolectric.RobolectricTestRunner
import org.robolectric.annotation.Config
@RunWith(RobolectricTestRunner::class)
@Config(sdk = [34])
class PacketRepositoryImplTest : CommonPacketRepositoryTest() {
@BeforeTest
fun setUp() {
setupRepo()
}
}

View File

@@ -0,0 +1,435 @@
/*
* Copyright (c) 2026 Meshtastic LLC
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*/
package org.meshtastic.core.data.radio
import dev.mokkery.MockMode
import dev.mokkery.mock
import kotlinx.coroutines.CoroutineDispatcher
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.ExperimentalCoroutinesApi
import kotlinx.coroutines.async
import kotlinx.coroutines.flow.MutableStateFlow
import kotlinx.coroutines.test.TestScope
import kotlinx.coroutines.test.advanceTimeBy
import kotlinx.coroutines.test.advanceUntilIdle
import kotlinx.coroutines.test.runCurrent
import kotlinx.coroutines.test.runTest
import org.meshtastic.core.di.CoroutineDispatchers
import org.meshtastic.core.model.AdminException
import org.meshtastic.core.model.DataPacket
import org.meshtastic.core.model.Position
import org.meshtastic.core.model.TelemetryType
import org.meshtastic.core.repository.MeshLocationManager
import org.meshtastic.core.repository.PacketRepository
import org.meshtastic.core.testing.FakeNodeRepository
import org.meshtastic.core.testing.FakeRadioPrefs
import org.meshtastic.core.testing.FakeServiceRepository
import org.meshtastic.proto.AdminMessage
import org.meshtastic.proto.Channel
import org.meshtastic.proto.ChannelSettings
import org.meshtastic.proto.Config
import org.meshtastic.proto.DeviceMetrics
import org.meshtastic.proto.LocalStats
import org.meshtastic.proto.MeshPacket
import org.meshtastic.proto.ModuleConfig
import org.meshtastic.proto.PortNum
import org.meshtastic.proto.Routing
import org.meshtastic.proto.Telemetry
import org.meshtastic.sdk.RadioClient
import org.meshtastic.sdk.TransportIdentity
import org.meshtastic.sdk.testing.FakeRadioTransport
import org.meshtastic.sdk.testing.InMemoryStorageProvider
import kotlin.test.Test
import kotlin.test.assertEquals
import kotlin.test.assertFailsWith
import kotlin.test.assertTrue
import kotlin.time.Duration.Companion.seconds
@OptIn(ExperimentalCoroutinesApi::class)
class SdkRadioControllerTest {
@Test
fun `setLocalConfig forwards admin config write`() = runTest {
val fixture = connectedFixture()
try {
val config = Config(device = Config.DeviceConfig(role = Config.DeviceConfig.Role.CLIENT))
val outboundBefore = fixture.transport.outboundPackets().size
val deferred = async { fixture.controller.setLocalConfig(config) }
runCurrent()
val request = fixture.transport.outboundPackets().drop(outboundBefore).last { adminOf(it)?.set_config == config }
assertTrue(request.want_ack)
fixture.transport.injectRoutingAck(request.id)
runCurrent()
deferred.await()
} finally {
fixture.client.disconnect()
}
}
@Test
fun `setRemoteChannel forwards remote admin channel write`() = runTest {
val fixture = connectedFixture()
try {
val destNum = 0x22334455
val channel = Channel(index = 1, role = Channel.Role.SECONDARY, settings = ChannelSettings(name = "Ops"))
val outboundBefore = fixture.transport.outboundPackets().size
val deferred = async { fixture.controller.setRemoteChannel(destNum, channel) }
runCurrent()
val request = fixture.transport.outboundPackets().drop(outboundBefore).last { adminOf(it)?.set_channel == channel }
assertEquals(destNum, request.to)
fixture.transport.injectRoutingAck(request.id, fromNode = destNum)
runCurrent()
deferred.await()
} finally {
fixture.client.disconnect()
}
}
@Test
fun `setRemoteChannel forwards disabled channel for removal`() = runTest {
val fixture = connectedFixture()
try {
val destNum = 0x33445566
val channel = Channel(index = 2, role = Channel.Role.DISABLED)
val outboundBefore = fixture.transport.outboundPackets().size
val deferred = async { fixture.controller.setRemoteChannel(destNum, channel) }
runCurrent()
val request = fixture.transport.outboundPackets().drop(outboundBefore).last { adminOf(it)?.set_channel == channel }
assertEquals(destNum, request.to)
fixture.transport.injectRoutingAck(request.id, fromNode = destNum)
runCurrent()
deferred.await()
} finally {
fixture.client.disconnect()
}
}
@Test
fun `requestTelemetry forwards device request to sdk`() = runTest {
val fixture = connectedFixture()
try {
val destNum = 0x44556677
val outboundBefore = fixture.transport.outboundPackets().size
val deferred = async { fixture.controller.requestTelemetry(destNum, TelemetryType.DEVICE) }
runCurrent()
val request = fixture.transport.outboundPackets().drop(outboundBefore).last { telemetryOf(it) != null }
assertEquals(destNum, request.to)
assertEquals(PortNum.TELEMETRY_APP, request.decoded?.portnum)
assertTrue(request.decoded?.want_response == true)
fixture.transport.injectTelemetryResponse(
requestId = request.id,
telemetry = Telemetry(device_metrics = DeviceMetrics(battery_level = 87)),
fromNode = destNum,
)
runCurrent()
deferred.await()
} finally {
fixture.client.disconnect()
}
}
@Test
fun `requestTelemetry local stats targets local node`() = runTest {
val fixture = connectedFixture()
try {
val outboundBefore = fixture.transport.outboundPackets().size
val deferred = async { fixture.controller.requestTelemetry(0xCAFEBABE.toInt(), TelemetryType.LOCAL_STATS) }
runCurrent()
val request = fixture.transport.outboundPackets().drop(outboundBefore).last { telemetryOf(it) != null }
assertEquals(fixture.myNodeNum, request.to)
fixture.transport.injectTelemetryResponse(
requestId = request.id,
telemetry = Telemetry(local_stats = LocalStats(uptime_seconds = 123)),
fromNode = fixture.myNodeNum,
)
runCurrent()
deferred.await()
} finally {
fixture.client.disconnect()
}
}
@Test
fun `requestPosition encodes coordinates and requests ack`() = runTest {
val fixture = connectedFixture()
try {
val destNum = 0x55667788
val position = Position(latitude = 37.1234567, longitude = -122.7654321, altitude = 42, time = 1_700_000_123)
val outboundBefore = fixture.transport.outboundPackets().size
fixture.controller.requestPosition(destNum, position)
runCurrent()
val request = fixture.transport.outboundPackets().drop(outboundBefore).last { it.decoded?.portnum == PortNum.POSITION_APP }
val sentPosition = org.meshtastic.proto.Position.ADAPTER.decode(request.decoded!!.payload)
assertEquals(destNum, request.to)
assertTrue(request.want_ack)
assertEquals(Position.degI(position.latitude), sentPosition.latitude_i)
assertEquals(Position.degI(position.longitude), sentPosition.longitude_i)
assertEquals(position.altitude, sentPosition.altitude)
assertEquals(position.time, sentPosition.time)
} finally {
fixture.client.disconnect()
}
}
@Test
fun `setFixedPosition encodes coordinates for admin api`() = runTest {
val fixture = connectedFixture()
try {
val destNum = 0x66778899
val position = Position(latitude = 12.3456789, longitude = 98.7654321, altitude = 321, time = 456)
val outboundBefore = fixture.transport.outboundPackets().size
val deferred = async { fixture.controller.setFixedPosition(destNum, position) }
runCurrent()
val request = fixture.transport.outboundPackets().drop(outboundBefore).last { adminOf(it)?.set_fixed_position != null }
val sentPosition = adminOf(request)!!.set_fixed_position!!
assertEquals(destNum, request.to)
assertEquals(Position.degI(position.latitude), sentPosition.latitude_i)
assertEquals(Position.degI(position.longitude), sentPosition.longitude_i)
assertEquals(position.altitude, sentPosition.altitude)
assertEquals(position.time, sentPosition.time)
fixture.transport.injectRoutingAck(request.id, fromNode = destNum)
runCurrent()
deferred.await()
} finally {
fixture.client.disconnect()
}
}
@Test
fun `getConfig returns sdk config response`() = runTest {
val fixture = connectedFixture()
try {
val destNum = 0x10203040
val expected = Config(device = Config.DeviceConfig(role = Config.DeviceConfig.Role.CLIENT))
val outboundBefore = fixture.transport.outboundPackets().size
val deferred = async { fixture.controller.getConfig(destNum, AdminMessage.ConfigType.DEVICE_CONFIG.value) }
runCurrent()
val request = fixture.transport.outboundPackets().drop(outboundBefore)
.last { adminOf(it)?.get_config_request == AdminMessage.ConfigType.DEVICE_CONFIG }
fixture.transport.injectAdminResponse(
requestId = request.id,
response = AdminMessage(get_config_response = expected),
fromNode = destNum,
)
runCurrent()
assertEquals(expected, deferred.await())
} finally {
fixture.client.disconnect()
}
}
@Test
fun `getModuleConfig returns sdk module config response`() = runTest {
val fixture = connectedFixture()
try {
val destNum = 0x11223344
val expected = ModuleConfig(mqtt = ModuleConfig.MQTTConfig(enabled = true))
val outboundBefore = fixture.transport.outboundPackets().size
val deferred = async { fixture.controller.getModuleConfig(destNum, AdminMessage.ModuleConfigType.MQTT_CONFIG.value) }
runCurrent()
val request = fixture.transport.outboundPackets().drop(outboundBefore)
.last { adminOf(it)?.get_module_config_request == AdminMessage.ModuleConfigType.MQTT_CONFIG }
fixture.transport.injectAdminResponse(
requestId = request.id,
response = AdminMessage(get_module_config_response = expected),
fromNode = destNum,
)
runCurrent()
assertEquals(expected, deferred.await())
} finally {
fixture.client.disconnect()
}
}
@Test
fun `listChannels delegates to sdk and stops at disabled slot`() = runTest {
val fixture = connectedFixture()
try {
val destNum = 0x7ABCDE01
val outboundBefore = fixture.transport.outboundPackets().size
val deferred = async { fixture.controller.listChannels(destNum) }
repeat(3) {
runCurrent()
val request = fixture.transport.outboundPackets().drop(outboundBefore).last { adminOf(it)?.get_channel_request != null }
assertEquals(destNum, request.to)
val wireIndex = adminOf(request)!!.get_channel_request!!
val channelIndex = wireIndex - 1
val channel = if (channelIndex < 2) {
Channel(index = channelIndex, role = Channel.Role.PRIMARY, settings = ChannelSettings(name = "Channel $channelIndex"))
} else {
Channel(index = channelIndex, role = Channel.Role.DISABLED)
}
fixture.transport.injectAdminResponse(
requestId = request.id,
response = AdminMessage(get_channel_response = channel),
fromNode = destNum,
)
}
runCurrent()
advanceUntilIdle()
val channels = deferred.await()
assertEquals(listOf("Channel 0", "Channel 1"), channels.map { it.settings?.name })
} finally {
fixture.client.disconnect()
}
}
@Test
fun `reboot forwards reboot command as fire and forget admin write`() = runTest {
val fixture = connectedFixture()
try {
val destNum = 0x55667711
val outboundBefore = fixture.transport.outboundPackets().size
val deferred = async { fixture.controller.reboot(destNum) }
runCurrent()
val request = fixture.transport.outboundPackets().drop(outboundBefore).last { adminOf(it)?.reboot_seconds == 0 }
assertEquals(destNum, request.to)
fixture.transport.injectRoutingAck(request.id, fromNode = destNum)
runCurrent()
deferred.await()
} finally {
fixture.client.disconnect()
}
}
@Test
fun `sdk timeouts surface to callers`() = runTest {
val fixture = connectedFixture()
try {
val destNum = 0x12345678
val deferred = async {
assertFailsWith<AdminException.Timeout> {
fixture.controller.getConfig(destNum, AdminMessage.ConfigType.DEVICE_CONFIG.value)
}
}
runCurrent()
advanceTimeBy(70.seconds)
runCurrent()
deferred.await()
} finally {
fixture.client.disconnect()
}
}
@Test
fun `unauthorized admin operations surface permission errors`() = runTest {
val fixture = connectedFixture()
try {
val destNum = 0x21436587
val config = Config(device = Config.DeviceConfig(role = Config.DeviceConfig.Role.CLIENT))
val outboundBefore = fixture.transport.outboundPackets().size
val deferred = async {
assertFailsWith<AdminException.Unauthorized> {
fixture.controller.setConfig(destNum, config)
}
}
runCurrent()
val request = fixture.transport.outboundPackets().drop(outboundBefore).last { adminOf(it)?.set_config == config }
fixture.transport.injectRoutingError(request.id, Routing.Error.ADMIN_PUBLIC_KEY_UNAUTHORIZED, fromNode = destNum)
runCurrent()
deferred.await()
} finally {
fixture.client.disconnect()
}
}
private suspend fun TestScope.connectedFixture(myNodeNum: Int = 0x11111111): ControllerFixture {
val transport = FakeRadioTransport(
identity = TransportIdentity("fake:sdk-radio-controller"),
autoHandshake = true,
nodeNum = myNodeNum,
)
val client = RadioClient.Builder()
.transport(transport)
.storage(InMemoryStorageProvider())
.autoSyncTimeOnConnect(false)
.coroutineContext(backgroundScope.coroutineContext)
.rpcTimeout(60.seconds)
.sendTimeout(60.seconds)
.build()
val dispatcher = backgroundScope.coroutineContext[kotlin.coroutines.ContinuationInterceptor] as CoroutineDispatcher
val controller = SdkRadioController(
accessor = TestRadioClientAccessor(client),
serviceRepository = FakeServiceRepository(),
nodeRepository = FakeNodeRepository(),
locationManager = NoOpLocationManager,
deliveryTracker = MessageDeliveryTracker(lazyOf(mock<PacketRepository>(MockMode.autofill)), CoroutineDispatchers(dispatcher, dispatcher, dispatcher)),
radioPrefs = FakeRadioPrefs(),
)
client.connect()
runCurrent()
return ControllerFixture(controller = controller, transport = transport, client = client, myNodeNum = myNodeNum)
}
private fun adminOf(packet: MeshPacket): AdminMessage? {
val decoded = packet.decoded ?: return null
if (decoded.portnum != PortNum.ADMIN_APP) return null
return runCatching { AdminMessage.ADAPTER.decode(decoded.payload) }.getOrNull()
}
private fun telemetryOf(packet: MeshPacket): Telemetry? {
val decoded = packet.decoded ?: return null
if (decoded.portnum != PortNum.TELEMETRY_APP) return null
return runCatching { Telemetry.ADAPTER.decode(decoded.payload) }.getOrNull()
}
private data class ControllerFixture(
val controller: SdkRadioController,
val transport: FakeRadioTransport,
val client: RadioClient,
val myNodeNum: Int,
)
private class TestRadioClientAccessor(client: RadioClient) : RadioClientAccessor {
override val client = MutableStateFlow<RadioClient?>(client)
override fun rebuildAndConnectAsync() = Unit
override fun disconnect() = Unit
}
private object NoOpLocationManager : MeshLocationManager {
override fun start(scope: CoroutineScope, sendPositionFn: (org.meshtastic.proto.Position) -> Unit) = Unit
override fun stop() = Unit
}
}

View File

@@ -16,55 +16,273 @@
*/
package org.meshtastic.core.data.repository
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.coroutineScope
import kotlinx.coroutines.flow.first
import kotlinx.coroutines.launch
import kotlinx.coroutines.test.UnconfinedTestDispatcher
import kotlinx.coroutines.test.runTest
import okio.ByteString
import okio.ByteString.Companion.toByteString
import org.meshtastic.core.di.CoroutineDispatchers
import org.meshtastic.core.model.DataPacket
import org.meshtastic.core.model.Node
import org.meshtastic.core.model.Reaction
import org.meshtastic.core.testing.FakeDatabaseProvider
import org.meshtastic.core.testing.FakeNodeRepository
import org.meshtastic.core.testing.TestDataFactory
import org.meshtastic.core.testing.setupTestContext
import org.meshtastic.proto.PortNum
import org.meshtastic.proto.User
import org.meshtastic.proto.Waypoint
import kotlin.test.AfterTest
import kotlin.test.Test
import kotlin.test.assertEquals
import kotlin.test.assertFalse
import kotlin.test.assertNotNull
import kotlin.test.assertTrue
abstract class CommonPacketRepositoryTest {
protected lateinit var dbProvider: FakeDatabaseProvider
private val testDispatcher = UnconfinedTestDispatcher()
private val dispatchers = CoroutineDispatchers(main = testDispatcher, io = testDispatcher, default = testDispatcher)
private val nodeRepository = FakeNodeRepository()
protected val nodeRepository = FakeNodeRepository()
protected lateinit var repository: PacketRepositoryImpl
private val myNodeNum = 1
private val broadcastContact = "0${DataPacket.nodeNumToId(DataPacket.BROADCAST)}"
fun setupRepo() {
setupTestContext()
dbProvider = FakeDatabaseProvider()
repository = PacketRepositoryImpl(dbProvider, dispatchers, nodeRepository)
nodeRepository.setMyNodeInfo(TestDataFactory.createMyNodeInfo(myNodeNum = myNodeNum))
}
@AfterTest
fun tearDown() {
dbProvider.close()
if (::dbProvider.isInitialized) {
dbProvider.close()
}
}
@Test
fun `savePacket persists and retrieves waypoints`() = runTest(testDispatcher) {
val myNodeNum = 1
val contact = "contact"
val packet = DataPacket(to = DataPacket.BROADCAST, bytes = ByteString.EMPTY, dataType = PortNum.TEXT_MESSAGE_APP.value, id = 123)
// Set the current node number so PacketRepositoryImpl can pass it to queries
nodeRepository.setMyNodeInfo(org.meshtastic.core.testing.TestDataFactory.createMyNodeInfo(myNodeNum = myNodeNum))
repository.savePacket(myNodeNum, broadcastContact, packet, 1000L)
val packet = DataPacket(to = DataPacket.BROADCAST, bytes = okio.ByteString.EMPTY, dataType = 1, id = 123)
repository.savePacket(myNodeNum, contact, packet, 1000L)
// Verify it was saved.
val count = repository.getMessageCount(contact)
assertEquals(1, count)
assertEquals(1, repository.getMessageCount(broadcastContact))
}
@Test
fun `clearAllUnreadCounts works with real DB`() = runTest(testDispatcher) {
repository.clearAllUnreadCounts()
// No exception thrown
}
@Test
fun `getMessagesFrom limit keeps newest messages within boundary`() = runTest(testDispatcher) {
val contact = "1!abcd1234"
repeat(55) { index ->
saveTextPacket(contact = contact, id = index + 1, receivedTime = 1_000L + index, text = "Message $index", read = false)
}
val messages = repository.getMessagesFrom(contact = contact, limit = 50, getNode = ::lookupNode).first()
assertEquals(50, messages.size)
assertEquals("Message 54", messages.first().text)
assertEquals("Message 5", messages.last().text)
}
@Test
fun `unread counts track read and unread transitions`() = runTest(testDispatcher) {
val contact = "2!feedbeef"
saveTextPacket(contact = contact, id = 1, receivedTime = 100L, read = false)
saveTextPacket(contact = contact, id = 2, receivedTime = 200L, read = false)
saveTextPacket(contact = contact, id = 3, receivedTime = 300L, read = false)
assertEquals(3, repository.getUnreadCount(contact))
assertEquals(3, repository.getUnreadCountTotal().first())
assertTrue(repository.hasUnreadMessages(contact).first())
assertNotNull(repository.getFirstUnreadMessageUuid(contact).first())
repository.clearUnreadCount(contact, 200L)
assertEquals(1, repository.getUnreadCount(contact))
assertEquals(1, repository.getUnreadCountTotal().first())
assertTrue(repository.hasUnreadMessages(contact).first())
repository.clearAllUnreadCounts()
assertEquals(0, repository.getUnreadCount(contact))
assertEquals(0, repository.getUnreadCountTotal().first())
assertFalse(repository.hasUnreadMessages(contact).first())
assertEquals(null, repository.getFirstUnreadMessageUuid(contact).first())
}
@Test
fun `reactions can be added listed and removed with parent message`() = runTest(testDispatcher) {
val contact = "3!react000"
val replyId = 501
saveTextPacket(contact = contact, id = replyId, receivedTime = 1_000L, text = "Original", read = true)
val reaction =
Reaction(
replyId = replyId,
user = User(id = "!reactor"),
emoji = "👍",
timestamp = 2_000L,
snr = 1.5f,
rssi = -70,
hopsAway = 1,
packetId = replyId,
to = "!abcd1234",
channel = 3,
)
repository.insertReaction(reaction, myNodeNum)
val storedReaction = repository.getReactionByPacketId(replyId)
assertNotNull(storedReaction)
assertEquals("👍", storedReaction.emoji)
assertEquals("!reactor", storedReaction.user.id)
val reactions = repository.findReactionsWithId(replyId)
assertEquals(1, reactions.size)
assertEquals(replyId, reactions.single().replyId)
assertEquals("👍", reactions.single().emoji)
val messageUuid = repository.getMessagesFrom(contact = contact, getNode = ::lookupNode).first().single().uuid
repository.deleteMessages(listOf(messageUuid))
assertTrue(repository.findReactionsWithId(replyId).isEmpty())
assertEquals(null, repository.getReactionByPacketId(replyId))
}
@Test
fun `getWaypoints preserves channel data for filtering`() = runTest(testDispatcher) {
saveWaypointPacket(contact = broadcastContact, channel = 0, waypointId = 101, receivedTime = 1_000L)
saveWaypointPacket(contact = "2${DataPacket.nodeNumToId(DataPacket.BROADCAST)}", channel = 2, waypointId = 202, receivedTime = 2_000L)
saveTextPacket(contact = broadcastContact, id = 77, receivedTime = 3_000L)
val waypoints = repository.getWaypoints().first()
val channelTwoWaypoints = waypoints.filter { it.channel == 2 }
assertEquals(2, waypoints.size)
assertEquals(setOf(0, 2), waypoints.map { it.channel }.toSet())
assertEquals(listOf(202), channelTwoWaypoints.mapNotNull { it.waypoint?.id })
}
@Test
fun `contact keys keep channel and destination formats distinct`() = runTest(testDispatcher) {
val channelBroadcast = broadcastContact
val secondaryBroadcast = "1${DataPacket.nodeNumToId(DataPacket.BROADCAST)}"
val directMessage = "${DataPacket.PKC_CHANNEL_INDEX}!70fdde9b"
saveTextPacket(contact = channelBroadcast, id = 1, receivedTime = 100L, channel = 0)
saveTextPacket(contact = secondaryBroadcast, id = 2, receivedTime = 200L, channel = 1)
saveTextPacket(
contact = directMessage,
id = 3,
receivedTime = 300L,
channel = DataPacket.PKC_CHANNEL_INDEX,
to = 0x70fdde9b,
)
val contacts = repository.getContacts().first()
assertEquals(setOf(channelBroadcast, secondaryBroadcast, directMessage), contacts.keys)
assertEquals(0, contacts.getValue(channelBroadcast).channel)
assertEquals(1, contacts.getValue(secondaryBroadcast).channel)
assertEquals(DataPacket.PKC_CHANNEL_INDEX, contacts.getValue(directMessage).channel)
assertEquals(1, repository.getMessageCount(channelBroadcast))
assertEquals(1, repository.getMessageCount(secondaryBroadcast))
assertEquals(1, repository.getMessageCount(directMessage))
}
@Test
fun `getMessagesFrom returns empty flow for unknown contact`() = runTest(testDispatcher) {
val messages = repository.getMessagesFrom(contact = "7!missing", getNode = ::lookupNode).first()
assertTrue(messages.isEmpty())
}
@Test
fun `concurrent writes keep all packets intact`() = runTest {
val concurrentRepository = PacketRepositoryImpl(
dbManager = dbProvider,
dispatchers = CoroutineDispatchers(main = testDispatcher, io = Dispatchers.Default, default = Dispatchers.Default),
nodeRepository = nodeRepository,
)
val contact = "4!concur00"
coroutineScope {
repeat(100) { index ->
launch(Dispatchers.Default) {
concurrentRepository.savePacket(
myNodeNum = myNodeNum,
contactKey = contact,
packet = textPacket(id = index + 1, text = "Concurrent $index", channel = 4),
receivedTime = 10_000L + index,
read = false,
)
}
}
}
val messages = concurrentRepository.getMessagesFrom(contact = contact, getNode = ::lookupNode).first()
assertEquals(100, concurrentRepository.getMessageCount(contact))
assertEquals(100, messages.size)
assertEquals(100, messages.map { it.packetId }.distinct().size)
}
private suspend fun saveTextPacket(
contact: String,
id: Int,
receivedTime: Long,
text: String = "Message $id",
read: Boolean = true,
filtered: Boolean = false,
channel: Int = contact.first().digitToIntOrNull() ?: 0,
to: Int = DataPacket.BROADCAST,
from: Int = 0x12345678,
) {
repository.savePacket(
myNodeNum = myNodeNum,
contactKey = contact,
packet = textPacket(id = id, text = text, channel = channel, to = to, from = from),
receivedTime = receivedTime,
read = read,
filtered = filtered,
)
}
private suspend fun saveWaypointPacket(contact: String, channel: Int, waypointId: Int, receivedTime: Long) {
repository.savePacket(
myNodeNum = myNodeNum,
contactKey = contact,
packet = DataPacket(to = DataPacket.BROADCAST, channel = channel, waypoint = Waypoint(id = waypointId, name = "Waypoint $waypointId")),
receivedTime = receivedTime,
)
}
private fun textPacket(
id: Int,
text: String,
channel: Int,
to: Int = DataPacket.BROADCAST,
from: Int = 0x12345678,
) = DataPacket(
to = to,
bytes = text.encodeToByteArray().toByteString(),
dataType = PortNum.TEXT_MESSAGE_APP.value,
from = from,
id = id,
channel = channel,
)
private fun lookupNode(userId: String?): Node = Node(num = 0, user = User(id = userId.orEmpty(), long_name = userId.orEmpty()))
}