refactor(service): Decompose MeshService into smaller, focused classes

This commit refactors the monolithic `MeshService` by decomposing its responsibilities into smaller, more focused, and testable classes. This improves modularity, separation of concerns, and the overall maintainability of the service layer.

The new class structure is as follows:
- `MeshConnectionManager`: Manages the radio connection lifecycle, state changes (connected, sleeping, disconnected), and triggers connection-related actions.
- `MeshMessageProcessor`: Handles the initial parsing and routing of all incoming `FromRadio` messages. It also manages a buffer for packets that arrive before the node DB is ready.
- `MeshNodeManager`: Manages the in-memory node database (`nodeDB`), including creating, updating, and caching node information.
- `MeshCommandSender`: Centralizes the logic for creating and sending commands and data packets to the radio.
- `MeshDataHandler`: Processes decoded `MeshPacket` data, such as text messages, telemetry, and waypoints.
- `MeshConfigHandler`: Manages received device, module, and channel configurations.
- `MeshConfigFlowManager`: Orchestrates the two-stage config/node-info download process upon connection.
- `MeshHistoryManager`: Encapsulates logic for requesting message history from the device.
- `MeshMqttManager`: Handles the lifecycle and message proxying for the MQTT client.
- `MeshLocationManager`: Manages GPS location updates to be sent to the mesh.
- `MeshTracerouteHandler`: Handles traceroute responses.
- `MeshActionHandler`: Processes `ServiceAction` events from other parts of the application.
- `MeshDataMapper`: Provides mapping functions between protobuf `MeshPacket` and the `DataPacket` model.
- `MeshRouter`: A central access point that holds references to the various handler classes.

Key Changes:
- The core logic from `MeshService` has been extracted and distributed among the new classes.
- `MeshService` is now a much leaner Android `Service` class, responsible for lifecycle management and delegating work to the new components.
- Concurrency helper classes `DeferredExecution` and `SyncContinuation` have been removed.
- Unit tests have been added for `MeshCommandSender`, `MeshNodeManager`, and `MeshDataMapper`.
- The timeout in `PacketHandler` has been increased to 5 seconds to be more tolerant of slow device responses.
- All new service-layer classes are injected via Hilt, promoting dependency injection and testability.

Signed-off-by: James Rich <2199651+jamesarich@users.noreply.github.com>
This commit is contained in:
James Rich
2025-12-19 08:00:56 -06:00
parent 99cc024ee4
commit dd3bb992e6
24 changed files with 2964 additions and 2637 deletions

View File

@@ -1,43 +0,0 @@
/*
* Copyright (c) 2025 Meshtastic LLC
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*/
package com.geeksville.mesh.concurrent
import timber.log.Timber
/**
* Sometimes when starting services we face situations where messages come in that require computation but we can't do
* that computation yet because we are still waiting for some long running init to complete.
*
* This class lets you queue up closures to run at a later date and later on you can call run() to run all the
* previously queued work.
*/
class DeferredExecution {
private val queue = mutableListOf<() -> Unit>()
// / Queue some new work
fun add(fn: () -> Unit) {
queue.add(fn)
}
// / run all work in the queue and clear it to be ready to accept new work
fun run() {
Timber.d("Running deferred execution numjobs=${queue.size}")
queue.forEach { it() }
queue.clear()
}
}

View File

@@ -1,94 +0,0 @@
/*
* Copyright (c) 2025 Meshtastic LLC
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*/
package com.geeksville.mesh.concurrent
/** A deferred execution object (with various possible implementations) */
interface Continuation<in T> {
abstract fun resume(res: Result<T>)
// syntactic sugar
fun resumeSuccess(res: T) = resume(Result.success(res))
fun resumeWithException(ex: Throwable) = try {
resume(Result.failure(ex))
} catch (ex: Throwable) {
// Timber.e("Ignoring $ex while resuming, because we are the ones who threw it")
throw ex
}
}
/** An async continuation that just calls a callback when the result is available */
class CallbackContinuation<in T>(private val cb: (Result<T>) -> Unit) : Continuation<T> {
override fun resume(res: Result<T>) = cb(res)
}
/**
* This is a blocking/threaded version of coroutine Continuation
*
* A little bit ugly, but the coroutine version has a nasty internal bug that showed up in my SyncBluetoothDevice so I
* needed a quick workaround.
*/
class SyncContinuation<T> : Continuation<T> {
private val mbox = java.lang.Object()
private var result: Result<T>? = null
override fun resume(res: Result<T>) {
synchronized(mbox) {
result = res
mbox.notify()
}
}
// Wait for the result (or throw an exception)
fun await(timeoutMsecs: Long = 0): T {
synchronized(mbox) {
val startT = System.currentTimeMillis()
while (result == null) {
mbox.wait(timeoutMsecs)
if (timeoutMsecs > 0 && ((System.currentTimeMillis() - startT) >= timeoutMsecs)) {
throw Exception("SyncContinuation timeout")
}
}
val r = result
if (r != null) {
return r.getOrThrow()
} else {
throw Exception("This shouldn't happen")
}
}
}
}
/**
* Calls an init function which is responsible for saving our continuation so that some other thread can call resume or
* resume with exception.
*
* Essentially this is a blocking version of the (buggy) coroutine suspendCoroutine
*/
fun <T> suspend(timeoutMsecs: Long = -1, initfn: (SyncContinuation<T>) -> Unit): T {
val cont = SyncContinuation<T>()
// First call the init funct
initfn(cont)
// Now wait for the continuation to finish
return cont.await(timeoutMsecs)
}

View File

@@ -0,0 +1,123 @@
/*
* Copyright (c) 2025 Meshtastic LLC
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. 自 <https://www.gnu.org/licenses/>.
*/
package com.geeksville.mesh.service
import com.geeksville.mesh.concurrent.handledLaunch
import com.geeksville.mesh.util.ignoreException
import dagger.Lazy
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.SupervisorJob
import org.meshtastic.core.data.repository.PacketRepository
import org.meshtastic.core.database.entity.ReactionEntity
import org.meshtastic.core.model.DataPacket
import org.meshtastic.core.service.ServiceAction
import org.meshtastic.proto.Portnums
import org.meshtastic.proto.user
import javax.inject.Inject
import javax.inject.Singleton
@Singleton
class MeshActionHandler
@Inject
constructor(
private val nodeManager: MeshNodeManager,
private val commandSender: MeshCommandSender,
private val packetRepository: Lazy<PacketRepository>,
) {
private val scope = CoroutineScope(Dispatchers.IO + SupervisorJob())
fun onServiceAction(action: ServiceAction) {
ignoreException {
val myNodeNum = nodeManager.myNodeNum ?: return@ignoreException
when (action) {
is ServiceAction.Favorite -> {
val node = action.node
commandSender.sendAdmin(myNodeNum) {
if (node.isFavorite) removeFavoriteNode = node.num else setFavoriteNode = node.num
}
nodeManager.updateNodeInfo(node.num) { it.isFavorite = !node.isFavorite }
}
is ServiceAction.Ignore -> {
val node = action.node
commandSender.sendAdmin(myNodeNum) {
if (node.isIgnored) removeIgnoredNode = node.num else setIgnoredNode = node.num
}
nodeManager.updateNodeInfo(node.num) { it.isIgnored = !node.isIgnored }
}
is ServiceAction.Reaction -> {
val channel = action.contactKey[0].digitToInt()
val destId = action.contactKey.substring(1)
val dataPacket = org.meshtastic.core.model.DataPacket(
to = destId,
dataType = Portnums.PortNum.TEXT_MESSAGE_APP_VALUE,
bytes = action.emoji.encodeToByteArray(),
channel = channel,
replyId = action.replyId,
wantAck = false,
)
commandSender.sendData(dataPacket)
rememberReaction(action, myNodeNum)
}
is ServiceAction.ImportContact -> {
val verifiedContact = action.contact.toBuilder().setManuallyVerified(true).build()
commandSender.sendAdmin(myNodeNum) { addContact = verifiedContact }
nodeManager.handleReceivedUser(verifiedContact.nodeNum, verifiedContact.user, manuallyVerified = true)
}
is ServiceAction.SendContact -> {
commandSender.sendAdmin(myNodeNum) { addContact = action.contact }
}
is ServiceAction.GetDeviceMetadata -> {
commandSender.sendAdmin(action.destNum, wantResponse = true) { getDeviceMetadataRequest = true }
}
}
}
}
private fun rememberReaction(action: ServiceAction.Reaction, myNodeNum: Int) {
scope.handledLaunch {
val reaction = ReactionEntity(
replyId = action.replyId,
userId = DataPacket.ID_LOCAL,
emoji = action.emoji,
timestamp = System.currentTimeMillis(),
snr = 0f,
rssi = 0,
hopsAway = 0,
)
packetRepository.get().insertReaction(reaction)
}
}
fun handleSetOwner(u: org.meshtastic.core.model.MeshUser, myNodeNum: Int) {
commandSender.sendAdmin(myNodeNum) {
setOwner = user {
id = u.id
longName = u.longName
shortName = u.shortName
isLicensed = u.isLicensed
}
}
nodeManager.handleReceivedUser(myNodeNum, user {
id = u.id
longName = u.longName
shortName = u.shortName
isLicensed = u.isLicensed
})
}
}

View File

@@ -0,0 +1,327 @@
/*
* Copyright (c) 2025 Meshtastic LLC
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. 自 <https://www.gnu.org/licenses/>.
*/
package com.geeksville.mesh.service
import android.os.RemoteException
import androidx.annotation.VisibleForTesting
import com.google.protobuf.ByteString
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.SupervisorJob
import kotlinx.coroutines.flow.MutableStateFlow
import kotlinx.coroutines.flow.launchIn
import kotlinx.coroutines.flow.onEach
import org.meshtastic.core.data.repository.RadioConfigRepository
import org.meshtastic.core.model.DataPacket
import org.meshtastic.core.model.MessageStatus
import org.meshtastic.core.model.Position
import org.meshtastic.core.service.ConnectionState
import org.meshtastic.proto.AdminProtos
import org.meshtastic.proto.AppOnlyProtos.ChannelSet
import org.meshtastic.proto.LocalOnlyProtos.LocalConfig
import org.meshtastic.proto.MeshProtos
import org.meshtastic.proto.MeshProtos.MeshPacket
import org.meshtastic.proto.Portnums
import org.meshtastic.proto.position
import timber.log.Timber
import java.util.concurrent.ConcurrentHashMap
import java.util.concurrent.CopyOnWriteArrayList
import java.util.concurrent.atomic.AtomicLong
import java.util.concurrent.atomic.AtomicReference
import javax.inject.Inject
import javax.inject.Singleton
import kotlin.math.absoluteValue
@Suppress("TooManyFunctions")
@Singleton
class MeshCommandSender
@Inject
constructor(
private val packetHandler: PacketHandler?,
private val nodeManager: MeshNodeManager?,
private val connectionStateHolder: MeshServiceConnectionStateHolder?,
private val radioConfigRepository: RadioConfigRepository?,
) {
private val scope = CoroutineScope(Dispatchers.IO + SupervisorJob())
private val currentPacketId = AtomicLong(java.util.Random(System.currentTimeMillis()).nextLong().absoluteValue)
private val sessionPasskey = AtomicReference(ByteString.EMPTY)
private val offlineSentPackets = CopyOnWriteArrayList<DataPacket>()
val tracerouteStartTimes = ConcurrentHashMap<Int, Long>()
private val localConfig = MutableStateFlow(LocalConfig.getDefaultInstance())
private val channelSet = MutableStateFlow(ChannelSet.getDefaultInstance())
private val rememberDataType =
setOf(
Portnums.PortNum.TEXT_MESSAGE_APP_VALUE,
Portnums.PortNum.ALERT_APP_VALUE,
Portnums.PortNum.WAYPOINT_APP_VALUE,
)
companion object {
private const val PACKET_ID_MASK = 0xffffffffL
private const val PACKET_ID_SHIFT_BITS = 32
private const val TIME_MS_TO_S = 1000L
}
init {
radioConfigRepository?.localConfigFlow
?.onEach { localConfig.value = it }
?.launchIn(scope)
radioConfigRepository?.channelSetFlow
?.onEach { channelSet.value = it }
?.launchIn(scope)
}
@VisibleForTesting
internal constructor() : this(null, null, null, null)
fun getCurrentPacketId(): Long = currentPacketId.get()
fun generatePacketId(): Int {
val numPacketIds = ((1L shl PACKET_ID_SHIFT_BITS) - 1)
val next = currentPacketId.incrementAndGet() and PACKET_ID_MASK
return ((next % numPacketIds) + 1L).toInt()
}
fun setSessionPasskey(key: ByteString) {
sessionPasskey.set(key)
}
private fun getHopLimit(): Int = localConfig.value.lora.hopLimit
private fun getAdminChannelIndex(toNum: Int): Int {
val myNum = nodeManager?.myNodeNum ?: return 0
val myNode = nodeManager?.nodeDBbyNodeNum?.get(myNum)
val destNode = nodeManager?.nodeDBbyNodeNum?.get(toNum)
if (myNum == toNum) return 0
if (myNode?.hasPKC == true && destNode?.hasPKC == true) return DataPacket.PKC_CHANNEL_INDEX
return channelSet.value.settingsList.indexOfFirst { it.name.equals("admin", ignoreCase = true) }.coerceAtLeast(0)
}
fun sendData(p: DataPacket) {
if (p.id == 0) p.id = generatePacketId()
val bytes = p.bytes ?: ByteArray(0)
if (bytes.size >= MeshProtos.Constants.DATA_PAYLOAD_LEN_VALUE) {
p.status = MessageStatus.ERROR
throw RemoteException("Message too long")
} else {
p.status = MessageStatus.QUEUED
}
if (connectionStateHolder?.connectionState?.value == ConnectionState.Connected) {
try {
sendNow(p)
} catch (ex: Exception) {
Timber.e(ex, "Error sending message, so enqueueing")
enqueueForSending(p)
}
} else {
enqueueForSending(p)
}
}
private fun sendNow(p: DataPacket) {
val meshPacket =
newMeshPacketTo(p.to ?: DataPacket.ID_BROADCAST).buildMeshPacket(
id = p.id,
wantAck = p.wantAck,
hopLimit = if (p.hopLimit > 0) p.hopLimit else getHopLimit(),
channel = p.channel,
) {
portnumValue = p.dataType
payload = ByteString.copyFrom(p.bytes ?: ByteArray(0))
p.replyId?.let { if (it != 0) replyId = it }
}
p.time = System.currentTimeMillis()
packetHandler?.sendToRadio(meshPacket)
}
private fun enqueueForSending(p: DataPacket) {
if (p.dataType in rememberDataType) {
offlineSentPackets.add(p)
}
}
fun processQueuedPackets() {
val sentPackets = mutableListOf<DataPacket>()
offlineSentPackets.forEach { p ->
try {
sendNow(p)
sentPackets.add(p)
} catch (ex: Exception) {
Timber.e(ex, "Error sending queued message:")
}
}
offlineSentPackets.removeAll(sentPackets)
}
fun sendAdmin(
destNum: Int,
requestId: Int = generatePacketId(),
wantResponse: Boolean = false,
initFn: AdminProtos.AdminMessage.Builder.() -> Unit,
) {
val packet =
newMeshPacketTo(destNum).buildAdminPacket(id = requestId, wantResponse = wantResponse, initFn = initFn)
packetHandler?.sendToRadio(packet)
}
fun sendPosition(pos: MeshProtos.Position, destNum: Int? = null, wantResponse: Boolean = false) {
val myNum = nodeManager?.myNodeNum ?: return
val idNum = destNum ?: myNum
Timber.d("Sending our position/time to=$idNum ${Position(pos)}")
if (!localConfig.value.position.fixedPosition) {
nodeManager?.handleReceivedPosition(myNum, myNum, pos)
}
packetHandler?.sendToRadio(
newMeshPacketTo(idNum).buildMeshPacket(
channel = if (destNum == null) 0 else nodeManager?.nodeDBbyNodeNum?.get(destNum)?.channel ?: 0,
priority = MeshPacket.Priority.BACKGROUND,
) {
portnumValue = Portnums.PortNum.POSITION_APP_VALUE
payload = pos.toByteString()
this.wantResponse = wantResponse
},
)
}
fun requestPosition(destNum: Int, currentPosition: Position) {
val meshPosition = position {
latitudeI = Position.degI(currentPosition.latitude)
longitudeI = Position.degI(currentPosition.longitude)
altitude = currentPosition.altitude
time = (System.currentTimeMillis() / TIME_MS_TO_S).toInt()
}
packetHandler?.sendToRadio(
newMeshPacketTo(destNum).buildMeshPacket(
channel = nodeManager?.nodeDBbyNodeNum?.get(destNum)?.channel ?: 0,
priority = MeshPacket.Priority.BACKGROUND,
) {
portnumValue = Portnums.PortNum.POSITION_APP_VALUE
payload = meshPosition.toByteString()
wantResponse = true
},
)
}
fun setFixedPosition(destNum: Int, pos: Position) {
val meshPos = position {
latitudeI = Position.degI(pos.latitude)
longitudeI = Position.degI(pos.longitude)
altitude = pos.altitude
}
sendAdmin(destNum) {
if (pos != Position(0.0, 0.0, 0)) {
setFixedPosition = meshPos
} else {
removeFixedPosition = true
}
}
nodeManager?.handleReceivedPosition(destNum, nodeManager.myNodeNum ?: 0, meshPos)
}
fun requestUserInfo(destNum: Int) {
val myNum = nodeManager?.myNodeNum ?: return
val myNode = nodeManager.getOrCreateNodeInfo(myNum)
packetHandler?.sendToRadio(
newMeshPacketTo(destNum).buildMeshPacket(channel = nodeManager.nodeDBbyNodeNum[destNum]?.channel ?: 0) {
portnumValue = Portnums.PortNum.NODEINFO_APP_VALUE
wantResponse = true
payload = myNode.user.toByteString()
},
)
}
fun requestTraceroute(requestId: Int, destNum: Int) {
tracerouteStartTimes[requestId] = System.currentTimeMillis()
packetHandler?.sendToRadio(
newMeshPacketTo(destNum).buildMeshPacket(
wantAck = true,
id = requestId,
channel = nodeManager?.nodeDBbyNodeNum?.get(destNum)?.channel ?: 0,
) {
portnumValue = Portnums.PortNum.TRACEROUTE_APP_VALUE
wantResponse = true
},
)
}
@VisibleForTesting
internal fun resolveNodeNum(toId: String): Int = when (toId) {
DataPacket.ID_BROADCAST -> DataPacket.NODENUM_BROADCAST
else -> {
val numericNum = if (toId.startsWith("!")) {
toId.substring(1).toLongOrNull(16)?.toInt()
} else {
null
}
numericNum ?: nodeManager?.nodeDBbyID?.get(toId)?.num ?: throw IllegalArgumentException("Unknown node ID $toId")
}
}
private fun newMeshPacketTo(toId: String): MeshPacket.Builder {
val destNum = resolveNodeNum(toId)
return newMeshPacketTo(destNum)
}
private fun newMeshPacketTo(destNum: Int): MeshPacket.Builder = MeshPacket.newBuilder().apply { to = destNum }
private fun MeshPacket.Builder.buildMeshPacket(
wantAck: Boolean = false,
id: Int = generatePacketId(), // always assign a packet ID if we didn't already have one
hopLimit: Int = 0,
channel: Int = 0,
priority: MeshPacket.Priority = MeshPacket.Priority.UNSET,
initFn: MeshProtos.Data.Builder.() -> Unit,
): MeshPacket {
this.id = id
this.wantAck = wantAck
this.hopLimit = if (hopLimit > 0) hopLimit else getHopLimit()
this.priority = priority
if (channel == DataPacket.PKC_CHANNEL_INDEX) {
pkiEncrypted = true
nodeManager?.nodeDBbyNodeNum?.get(to)?.user?.publicKey?.let { publicKey = it }
} else {
this.channel = channel
}
this.decoded = MeshProtos.Data.newBuilder().apply(initFn).build()
return build()
}
private fun MeshPacket.Builder.buildAdminPacket(
id: Int = generatePacketId(), // always assign a packet ID if we didn't already have one
wantResponse: Boolean = false,
initFn: AdminProtos.AdminMessage.Builder.() -> Unit,
): MeshPacket = buildMeshPacket(id = id, wantAck = true, channel = getAdminChannelIndex(to), priority = MeshPacket.Priority.RELIABLE) {
this.wantResponse = wantResponse
portnumValue = Portnums.PortNum.ADMIN_APP_VALUE
payload = AdminProtos.AdminMessage.newBuilder()
.apply(initFn)
.setSessionPasskey(sessionPasskey.get())
.build()
.toByteString()
}
}

View File

@@ -0,0 +1,176 @@
/*
* Copyright (c) 2025 Meshtastic LLC
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. 自 <https://www.gnu.org/licenses/>.
*/
package com.geeksville.mesh.service
import com.geeksville.mesh.concurrent.handledLaunch
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.SupervisorJob
import kotlinx.coroutines.delay
import org.meshtastic.core.analytics.platform.PlatformAnalytics
import org.meshtastic.core.data.repository.NodeRepository
import org.meshtastic.core.data.repository.RadioConfigRepository
import org.meshtastic.core.database.entity.MetadataEntity
import org.meshtastic.core.database.entity.MyNodeEntity
import org.meshtastic.core.service.ConnectionState
import org.meshtastic.proto.MeshProtos
import timber.log.Timber
import javax.inject.Inject
import javax.inject.Singleton
@Suppress("LongParameterList")
@Singleton
class MeshConfigFlowManager
@Inject
constructor(
private val nodeManager: MeshNodeManager,
private val connectionManager: MeshConnectionManager,
private val nodeRepository: NodeRepository,
private val radioConfigRepository: RadioConfigRepository,
private val connectionStateHolder: MeshServiceConnectionStateHolder,
private val serviceBroadcasts: MeshServiceBroadcasts,
private val analytics: PlatformAnalytics,
private val commandSender: MeshCommandSender,
private val packetHandler: PacketHandler,
) {
private val scope = CoroutineScope(Dispatchers.IO + SupervisorJob())
private val configOnlyNonce = 69420
private val nodeInfoNonce = 69421
private val wantConfigDelay = 100L
private val newNodes = mutableListOf<MeshProtos.NodeInfo>()
val newNodeCount: Int get() = newNodes.size
private var rawMyNodeInfo: MeshProtos.MyNodeInfo? = null
private var newMyNodeInfo: MyNodeEntity? = null
private var myNodeInfo: MyNodeEntity? = null
fun handleConfigComplete(configCompleteId: Int) {
when (configCompleteId) {
configOnlyNonce -> handleConfigOnlyComplete()
nodeInfoNonce -> handleNodeInfoComplete()
else -> Timber.w("Config complete id mismatch: $configCompleteId")
}
}
private fun handleConfigOnlyComplete() {
Timber.i("Config-only complete")
if (newMyNodeInfo == null) {
Timber.e("Did not receive a valid config - newMyNodeInfo is null")
} else {
myNodeInfo = newMyNodeInfo
Timber.i("myNodeInfo committed successfully")
}
scope.handledLaunch {
delay(wantConfigDelay)
sendHeartbeat()
delay(wantConfigDelay)
connectionManager.startNodeInfoOnly()
}
}
private fun sendHeartbeat() {
try {
packetHandler.sendToRadio(MeshProtos.ToRadio.newBuilder().apply { heartbeat = MeshProtos.Heartbeat.getDefaultInstance() })
Timber.d("Heartbeat sent between nonce stages")
} catch (ex: Exception) {
Timber.w(ex, "Failed to send heartbeat; proceeding with node-info stage")
}
}
private fun handleNodeInfoComplete() {
Timber.i("NodeInfo complete")
val entities =
newNodes.map { info ->
nodeManager.installNodeInfo(info, withBroadcast = false)
nodeManager.nodeDBbyNodeNum[info.num]!!
}
newNodes.clear()
scope.handledLaunch {
myNodeInfo?.let {
nodeRepository.installConfig(it, entities)
sendAnalytics(it)
}
nodeManager.isNodeDbReady.value = true
nodeManager.allowNodeDbWrites.value = true
connectionStateHolder.setState(ConnectionState.Connected)
serviceBroadcasts.broadcastConnection()
connectionManager.onHasSettings()
}
}
private fun sendAnalytics(mi: MyNodeEntity) {
analytics.setDeviceAttributes(mi.firmwareVersion ?: "unknown", mi.model ?: "unknown")
}
fun handleMyInfo(myInfo: MeshProtos.MyNodeInfo) {
Timber.i("MyNodeInfo received: ${myInfo.myNodeNum}")
rawMyNodeInfo = myInfo
nodeManager.myNodeNum = myInfo.myNodeNum
regenMyNodeInfo()
scope.handledLaunch {
radioConfigRepository.clearChannelSet()
radioConfigRepository.clearLocalConfig()
radioConfigRepository.clearLocalModuleConfig()
}
}
fun handleLocalMetadata(metadata: MeshProtos.DeviceMetadata) {
Timber.i("Local Metadata received")
regenMyNodeInfo(metadata)
}
fun handleNodeInfo(info: MeshProtos.NodeInfo) {
newNodes.add(info)
}
private fun regenMyNodeInfo(metadata: MeshProtos.DeviceMetadata? = MeshProtos.DeviceMetadata.getDefaultInstance()) {
val myInfo = rawMyNodeInfo
if (myInfo != null) {
val mi =
with(myInfo) {
MyNodeEntity(
myNodeNum = myNodeNum,
model =
when (val hwModel = metadata?.hwModel) {
null,
MeshProtos.HardwareModel.UNSET,
-> null
else -> hwModel.name.replace('_', '-').replace('p', '.').lowercase()
},
firmwareVersion = metadata?.firmwareVersion,
couldUpdate = false,
shouldUpdate = false,
currentPacketId = commandSender.getCurrentPacketId() and 0xffffffffL,
messageTimeoutMsec = 300000,
minAppVersion = minAppVersion,
maxChannels = 8,
hasWifi = metadata?.hasWifi == true,
deviceId = deviceId.toStringUtf8(),
)
}
if (metadata != null && metadata != MeshProtos.DeviceMetadata.getDefaultInstance()) {
scope.handledLaunch { nodeRepository.insertMetadata(MetadataEntity(mi.myNodeNum, metadata)) }
}
newMyNodeInfo = mi
}
}
}

View File

@@ -0,0 +1,86 @@
/*
* Copyright (c) 2025 Meshtastic LLC
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. 自 <https://www.gnu.org/licenses/>.
*/
package com.geeksville.mesh.service
import com.geeksville.mesh.concurrent.handledLaunch
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.SupervisorJob
import kotlinx.coroutines.flow.MutableStateFlow
import kotlinx.coroutines.flow.asStateFlow
import kotlinx.coroutines.flow.launchIn
import kotlinx.coroutines.flow.onEach
import org.meshtastic.core.data.repository.RadioConfigRepository
import org.meshtastic.core.service.ServiceRepository
import org.meshtastic.proto.ChannelProtos
import org.meshtastic.proto.ConfigProtos
import org.meshtastic.proto.LocalOnlyProtos.LocalConfig
import org.meshtastic.proto.LocalOnlyProtos.LocalModuleConfig
import org.meshtastic.proto.ModuleConfigProtos
import javax.inject.Inject
import javax.inject.Singleton
@Singleton
class MeshConfigHandler
@Inject
constructor(
private val radioConfigRepository: RadioConfigRepository,
private val serviceRepository: ServiceRepository,
private val nodeManager: MeshNodeManager,
) {
private val scope = CoroutineScope(Dispatchers.IO + SupervisorJob())
private val _localConfig = MutableStateFlow(LocalConfig.getDefaultInstance())
val localConfig = _localConfig.asStateFlow()
private val _moduleConfig = MutableStateFlow(LocalModuleConfig.getDefaultInstance())
val moduleConfig = _moduleConfig.asStateFlow()
private val configTotal = ConfigProtos.Config.getDescriptor().fields.size
private val moduleTotal = ModuleConfigProtos.ModuleConfig.getDescriptor().fields.size
init {
radioConfigRepository.localConfigFlow
.onEach { _localConfig.value = it }
.launchIn(scope)
radioConfigRepository.moduleConfigFlow
.onEach { _moduleConfig.value = it }
.launchIn(scope)
}
fun handleDeviceConfig(config: ConfigProtos.Config) {
scope.handledLaunch { radioConfigRepository.setLocalConfig(config) }
val configCount = _localConfig.value.allFields.size
serviceRepository.setStatusMessage("Device config ($configCount / $configTotal)")
}
fun handleModuleConfig(config: ModuleConfigProtos.ModuleConfig) {
scope.handledLaunch { radioConfigRepository.setLocalModuleConfig(config) }
val moduleCount = _moduleConfig.value.allFields.size
serviceRepository.setStatusMessage("Module config ($moduleCount / $moduleTotal)")
}
fun handleChannel(ch: ChannelProtos.Channel) {
val mi = nodeManager.getMyNodeInfo()
if (mi != null && ch.index < mi.maxChannels) {
scope.handledLaunch { radioConfigRepository.updateChannelSettings(ch) }
serviceRepository.setStatusMessage("Channels (${ch.index + 1} / ${mi.maxChannels})")
}
}
}

View File

@@ -0,0 +1,247 @@
/*
* Copyright (c) 2025 Meshtastic LLC
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. 自 <https://www.gnu.org/licenses/>.
*/
package com.geeksville.mesh.service
import android.app.Notification
import com.geeksville.mesh.concurrent.handledLaunch
import com.geeksville.mesh.repository.radio.RadioInterfaceService
import com.meshtastic.core.strings.getString
import kotlinx.coroutines.CancellationException
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.Job
import kotlinx.coroutines.SupervisorJob
import kotlinx.coroutines.delay
import kotlinx.coroutines.flow.first
import kotlinx.coroutines.flow.launchIn
import kotlinx.coroutines.flow.onEach
import org.meshtastic.core.analytics.DataPair
import org.meshtastic.core.analytics.platform.PlatformAnalytics
import org.meshtastic.core.data.repository.NodeRepository
import org.meshtastic.core.data.repository.RadioConfigRepository
import org.meshtastic.core.prefs.ui.UiPrefs
import org.meshtastic.core.service.ConnectionState
import org.meshtastic.core.service.MeshServiceNotifications
import org.meshtastic.core.strings.Res
import org.meshtastic.core.strings.connected_count
import org.meshtastic.core.strings.connecting
import org.meshtastic.core.strings.device_sleeping
import org.meshtastic.core.strings.disconnected
import org.meshtastic.proto.ConfigProtos
import org.meshtastic.proto.MeshProtos.ToRadio
import org.meshtastic.proto.TelemetryProtos
import timber.log.Timber
import javax.inject.Inject
import javax.inject.Singleton
@Suppress("LongParameterList")
@Singleton
class MeshConnectionManager
@Inject
constructor(
private val radioInterfaceService: RadioInterfaceService,
private val connectionStateHolder: MeshServiceConnectionStateHolder,
private val serviceBroadcasts: MeshServiceBroadcasts,
private val serviceNotifications: MeshServiceNotifications,
private val uiPrefs: UiPrefs,
private val packetHandler: PacketHandler,
private val nodeRepository: NodeRepository,
private val locationManager: MeshLocationManager,
private val mqttManager: MeshMqttManager,
private val historyManager: MeshHistoryManager,
private val radioConfigRepository: RadioConfigRepository,
private val commandSender: MeshCommandSender,
private val nodeManager: MeshNodeManager,
private val analytics: PlatformAnalytics,
) {
private val scope = CoroutineScope(Dispatchers.IO + SupervisorJob())
private var sleepTimeout: Job? = null
private var locationRequestsJob: Job? = null
private var connectTimeMsec = 0L
private val configOnlyNonce = 69420
private val nodeInfoNonce = 69421
fun init() {
radioInterfaceService.connectionState.onEach(::onRadioConnectionState).launchIn(scope)
nodeRepository.myNodeInfo
.onEach { myNodeEntity ->
locationRequestsJob?.cancel()
if (myNodeEntity != null) {
locationRequestsJob =
uiPrefs
.shouldProvideNodeLocation(myNodeEntity.myNodeNum)
.onEach { shouldProvide ->
if (shouldProvide) {
locationManager.start { pos ->
commandSender.sendPosition(pos)
}
} else {
locationManager.stop()
}
}
.launchIn(scope)
}
}
.launchIn(scope)
}
private fun onRadioConnectionState(newState: ConnectionState) {
scope.handledLaunch {
val localConfig = radioConfigRepository.localConfigFlow.first()
val isRouter = localConfig.device.role == ConfigProtos.Config.DeviceConfig.Role.ROUTER
val lsEnabled = localConfig.power.isPowerSaving || isRouter
val effectiveState =
when (newState) {
is ConnectionState.Connected -> ConnectionState.Connected
is ConnectionState.DeviceSleep -> if (lsEnabled) ConnectionState.DeviceSleep else ConnectionState.Disconnected
is ConnectionState.Connecting -> ConnectionState.Connecting
is ConnectionState.Disconnected -> ConnectionState.Disconnected
}
onConnectionChanged(effectiveState)
}
}
private fun onConnectionChanged(c: ConnectionState) {
if (connectionStateHolder.connectionState.value == c && c !is ConnectionState.Connected) return
Timber.d("onConnectionChanged: ${connectionStateHolder.connectionState.value} -> $c")
sleepTimeout?.cancel()
sleepTimeout = null
when (c) {
is ConnectionState.Connecting -> connectionStateHolder.setState(ConnectionState.Connecting)
is ConnectionState.Connected -> handleConnected()
is ConnectionState.DeviceSleep -> handleDeviceSleep()
is ConnectionState.Disconnected -> handleDisconnected()
}
updateStatusNotification()
}
private fun handleConnected() {
connectionStateHolder.setState(ConnectionState.Connecting)
serviceBroadcasts.broadcastConnection()
Timber.d("Starting connect")
connectTimeMsec = System.currentTimeMillis()
startConfigOnly()
}
private fun handleDeviceSleep() {
connectionStateHolder.setState(ConnectionState.DeviceSleep)
packetHandler.stopPacketQueue()
locationManager.stop()
mqttManager.stop()
if (connectTimeMsec != 0L) {
val now = System.currentTimeMillis()
val duration = now - connectTimeMsec
connectTimeMsec = 0L
analytics.track("connected_seconds", DataPair("connected_seconds", duration / 1000.0))
}
sleepTimeout =
scope.handledLaunch {
try {
val localConfig = radioConfigRepository.localConfigFlow.first()
val timeout = (localConfig.power?.lsSecs ?: 0) + 30
Timber.d("Waiting for sleeping device, timeout=$timeout secs")
delay(timeout * 1000L)
Timber.w("Device timeout out, setting disconnected")
onConnectionChanged(ConnectionState.Disconnected)
} catch (_: CancellationException) {
Timber.d("device sleep timeout cancelled")
}
}
serviceBroadcasts.broadcastConnection()
}
private fun handleDisconnected() {
connectionStateHolder.setState(ConnectionState.Disconnected)
packetHandler.stopPacketQueue()
locationManager.stop()
mqttManager.stop()
analytics.track(
"mesh_disconnect",
DataPair("num_nodes", nodeManager.nodeDBbyNodeNum.size),
DataPair("num_online", nodeManager.nodeDBbyNodeNum.values.count { it.isOnline }),
)
analytics.track("num_nodes", DataPair("num_nodes", nodeManager.nodeDBbyNodeNum.size))
serviceBroadcasts.broadcastConnection()
}
fun startConfigOnly() {
packetHandler.sendToRadio(ToRadio.newBuilder().apply { wantConfigId = configOnlyNonce })
}
fun startNodeInfoOnly() {
packetHandler.sendToRadio(ToRadio.newBuilder().apply { wantConfigId = nodeInfoNonce })
}
fun onHasSettings() {
commandSender.processQueuedPackets()
// Start MQTT if enabled
scope.handledLaunch {
val moduleConfig = radioConfigRepository.moduleConfigFlow.first()
mqttManager.start(moduleConfig.mqtt.enabled, moduleConfig.mqtt.proxyToClientEnabled)
}
reportConnection()
val myNodeNum = nodeManager.myNodeNum ?: 0
// Request history
scope.handledLaunch {
val moduleConfig = radioConfigRepository.moduleConfigFlow.first()
historyManager.requestHistoryReplay("onHasSettings", myNodeNum, moduleConfig.storeForward, "Unknown")
}
// Set time
commandSender.sendAdmin(myNodeNum) { setTimeOnly = (System.currentTimeMillis() / 1000).toInt() }
}
private fun reportConnection() {
val myNode = nodeManager.getMyNodeInfo()
val radioModel = DataPair("radio_model", myNode?.model ?: "unknown")
analytics.track(
"mesh_connect",
DataPair("num_nodes", nodeManager.nodeDBbyNodeNum.size),
DataPair("num_online", nodeManager.nodeDBbyNodeNum.values.count { it.isOnline }),
radioModel,
)
}
fun updateTelemetry(telemetry: TelemetryProtos.Telemetry) {
updateStatusNotification(telemetry)
}
fun updateStatusNotification(telemetry: TelemetryProtos.Telemetry? = null): Notification {
val summary =
when (connectionStateHolder.connectionState.value) {
is ConnectionState.Connected -> getString(Res.string.connected_count).format(nodeManager.nodeDBbyNodeNum.values.count { it.isOnline })
is ConnectionState.Disconnected -> getString(Res.string.disconnected)
is ConnectionState.DeviceSleep -> getString(Res.string.device_sleeping)
is ConnectionState.Connecting -> getString(Res.string.connecting)
}
return serviceNotifications.updateServiceStateNotification(summary, telemetry = telemetry)
}
}

View File

@@ -0,0 +1,453 @@
/*
* Copyright (c) 2025 Meshtastic LLC
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. 自 <https://www.gnu.org/licenses/>.
*/
package com.geeksville.mesh.service
import android.util.Log
import com.geeksville.mesh.BuildConfig
import com.geeksville.mesh.concurrent.handledLaunch
import com.geeksville.mesh.repository.radio.InterfaceId
import com.meshtastic.core.strings.getString
import dagger.Lazy
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.SupervisorJob
import org.meshtastic.core.analytics.DataPair
import org.meshtastic.core.analytics.platform.PlatformAnalytics
import org.meshtastic.core.data.repository.PacketRepository
import org.meshtastic.core.database.entity.Packet
import org.meshtastic.core.database.entity.ReactionEntity
import org.meshtastic.core.model.DataPacket
import org.meshtastic.core.model.MessageStatus
import org.meshtastic.core.prefs.mesh.MeshPrefs
import org.meshtastic.core.service.MeshServiceNotifications
import org.meshtastic.core.service.ServiceRepository
import org.meshtastic.core.strings.Res
import org.meshtastic.core.strings.critical_alert
import org.meshtastic.core.strings.error_duty_cycle
import org.meshtastic.core.strings.unknown_username
import org.meshtastic.core.strings.waypoint_received
import org.meshtastic.proto.AdminProtos
import org.meshtastic.proto.MeshProtos
import org.meshtastic.proto.MeshProtos.MeshPacket
import org.meshtastic.proto.PaxcountProtos
import org.meshtastic.proto.Portnums
import org.meshtastic.proto.StoreAndForwardProtos
import org.meshtastic.proto.TelemetryProtos
import org.meshtastic.proto.copy
import timber.log.Timber
import java.util.concurrent.ConcurrentHashMap
import javax.inject.Inject
import javax.inject.Singleton
import kotlin.time.Duration.Companion.milliseconds
@Suppress("LongParameterList", "TooManyFunctions")
@Singleton
class MeshDataHandler
@Inject
constructor(
private val nodeManager: MeshNodeManager,
private val packetHandler: PacketHandler,
private val serviceRepository: ServiceRepository,
private val packetRepository: Lazy<PacketRepository>,
private val serviceBroadcasts: MeshServiceBroadcasts,
private val serviceNotifications: MeshServiceNotifications,
private val analytics: PlatformAnalytics,
private val dataMapper: MeshDataMapper,
private val configHandler: MeshConfigHandler,
private val configFlowManager: MeshConfigFlowManager,
private val commandSender: MeshCommandSender,
private val historyManager: MeshHistoryManager,
private val meshPrefs: MeshPrefs,
private val connectionManager: MeshConnectionManager,
) {
private val scope = CoroutineScope(Dispatchers.IO + SupervisorJob())
private val rememberDataType =
setOf(
Portnums.PortNum.TEXT_MESSAGE_APP_VALUE,
Portnums.PortNum.ALERT_APP_VALUE,
Portnums.PortNum.WAYPOINT_APP_VALUE,
)
private val batteryPercentUnsupported = 0.0
private val batteryPercentLowThreshold = 20
private val batteryPercentLowDivisor = 5
private val batteryPercentCriticalThreshold = 5
private val batteryPercentCooldownSeconds = 1500
private val batteryPercentCooldowns = ConcurrentHashMap<Int, Long>()
fun handleReceivedData(packet: MeshPacket, myNodeNum: Int) {
val data = packet.decoded
val dataPacket = dataMapper.toDataPacket(packet) ?: return
val fromUs = myNodeNum == packet.from
dataPacket.status = MessageStatus.RECEIVED
var shouldBroadcast = !fromUs
when (data.portnumValue) {
Portnums.PortNum.TEXT_MESSAGE_APP_VALUE -> handleTextMessage(packet, dataPacket, myNodeNum)
Portnums.PortNum.ALERT_APP_VALUE -> rememberDataPacket(dataPacket, myNodeNum)
Portnums.PortNum.WAYPOINT_APP_VALUE -> {
val u = MeshProtos.Waypoint.parseFrom(data.payload)
if (u.lockedTo != 0 && u.lockedTo != packet.from) return
val currentSecond = (System.currentTimeMillis() / 1000).toInt()
rememberDataPacket(dataPacket, myNodeNum, updateNotification = u.expire > currentSecond)
}
Portnums.PortNum.POSITION_APP_VALUE -> {
val p = MeshProtos.Position.parseFrom(data.payload)
nodeManager.handleReceivedPosition(packet.from, myNodeNum, p, dataPacket.time)
}
Portnums.PortNum.NODEINFO_APP_VALUE -> if (!fromUs) handleNodeInfo(packet)
Portnums.PortNum.TELEMETRY_APP_VALUE -> handleTelemetry(packet, dataPacket, myNodeNum)
Portnums.PortNum.ROUTING_APP_VALUE -> {
shouldBroadcast = true
handleRouting(packet, dataPacket)
}
Portnums.PortNum.PAXCOUNTER_APP_VALUE -> {
val p = PaxcountProtos.Paxcount.parseFrom(data.payload)
nodeManager.handleReceivedPaxcounter(packet.from, p)
shouldBroadcast = false
}
Portnums.PortNum.STORE_FORWARD_APP_VALUE -> {
val u = StoreAndForwardProtos.StoreAndForward.parseFrom(data.payload)
handleReceivedStoreAndForward(dataPacket, u, myNodeNum)
shouldBroadcast = false
}
Portnums.PortNum.ADMIN_APP_VALUE -> {
handleAdminMessage(packet, myNodeNum)
shouldBroadcast = false
}
Portnums.PortNum.RANGE_TEST_APP_VALUE,
Portnums.PortNum.DETECTION_SENSOR_APP_VALUE,
-> {
val u = dataPacket.copy(dataType = Portnums.PortNum.TEXT_MESSAGE_APP_VALUE)
rememberDataPacket(u, myNodeNum)
}
}
if (shouldBroadcast) {
serviceBroadcasts.broadcastReceivedData(dataPacket)
}
analytics.track("num_data_receive", DataPair("num_data_receive", 1))
}
private fun handleAdminMessage(packet: MeshPacket, myNodeNum: Int) {
val u = AdminProtos.AdminMessage.parseFrom(packet.decoded.payload)
commandSender.setSessionPasskey(u.sessionPasskey)
if (packet.from == myNodeNum) {
when (u.payloadVariantCase) {
AdminProtos.AdminMessage.PayloadVariantCase.GET_CONFIG_RESPONSE ->
configHandler.handleDeviceConfig(u.getConfigResponse)
AdminProtos.AdminMessage.PayloadVariantCase.GET_CHANNEL_RESPONSE ->
configHandler.handleChannel(u.getChannelResponse)
else -> {}
}
}
if (u.payloadVariantCase == AdminProtos.AdminMessage.PayloadVariantCase.GET_DEVICE_METADATA_RESPONSE) {
if (packet.from == myNodeNum) {
configFlowManager.handleLocalMetadata(u.getDeviceMetadataResponse)
} else {
nodeManager.insertMetadata(packet.from, u.getDeviceMetadataResponse)
}
}
}
private fun handleTextMessage(packet: MeshPacket, dataPacket: DataPacket, myNodeNum: Int) {
if (packet.decoded.replyId != 0 && packet.decoded.emoji != 0) {
rememberReaction(packet)
} else {
rememberDataPacket(dataPacket, myNodeNum)
}
}
private fun handleNodeInfo(packet: MeshPacket) {
val u =
MeshProtos.User.parseFrom(packet.decoded.payload).copy {
if (isLicensed) clearPublicKey()
if (packet.viaMqtt) longName = "$longName (MQTT)"
}
nodeManager.handleReceivedUser(packet.from, u, packet.channel)
}
private fun handleTelemetry(packet: MeshPacket, dataPacket: DataPacket, myNodeNum: Int) {
val t =
TelemetryProtos.Telemetry.parseFrom(packet.decoded.payload).copy {
if (time == 0) time = (dataPacket.time.milliseconds.inWholeSeconds).toInt()
}
val fromNum = packet.from
val isRemote = (fromNum != myNodeNum)
if (!isRemote) {
connectionManager.updateTelemetry(t)
}
nodeManager.updateNodeInfo(fromNum) { nodeEntity ->
when {
t.hasDeviceMetrics() -> {
nodeEntity.deviceTelemetry = t
if (fromNum == myNodeNum || (isRemote && nodeEntity.isFavorite)) {
if (
t.deviceMetrics.voltage > batteryPercentUnsupported &&
t.deviceMetrics.batteryLevel <= batteryPercentLowThreshold
) {
if (shouldBatteryNotificationShow(fromNum, t, myNodeNum)) {
serviceNotifications.showOrUpdateLowBatteryNotification(nodeEntity, isRemote)
}
} else {
if (batteryPercentCooldowns.containsKey(fromNum)) {
batteryPercentCooldowns.remove(fromNum)
}
serviceNotifications.cancelLowBatteryNotification(nodeEntity)
}
}
}
t.hasEnvironmentMetrics() -> nodeEntity.environmentTelemetry = t
t.hasPowerMetrics() -> nodeEntity.powerTelemetry = t
}
}
}
private fun shouldBatteryNotificationShow(fromNum: Int, t: TelemetryProtos.Telemetry, myNodeNum: Int): Boolean {
val isRemote = (fromNum != myNodeNum)
var shouldDisplay = false
var forceDisplay = false
when {
t.deviceMetrics.batteryLevel <= batteryPercentCriticalThreshold -> {
shouldDisplay = true
forceDisplay = true
}
t.deviceMetrics.batteryLevel == batteryPercentLowThreshold -> shouldDisplay = true
t.deviceMetrics.batteryLevel.mod(batteryPercentLowDivisor) == 0 && !isRemote -> shouldDisplay = true
isRemote -> shouldDisplay = true
}
if (shouldDisplay) {
val now = System.currentTimeMillis() / 1000
if (!batteryPercentCooldowns.containsKey(fromNum)) batteryPercentCooldowns[fromNum] = 0
if ((now - batteryPercentCooldowns[fromNum]!!) >= batteryPercentCooldownSeconds || forceDisplay) {
batteryPercentCooldowns[fromNum] = now
return true
}
}
return false
}
private fun handleRouting(packet: MeshPacket, dataPacket: DataPacket) {
val r = MeshProtos.Routing.parseFrom(packet.decoded.payload)
if (r.errorReason == MeshProtos.Routing.Error.DUTY_CYCLE_LIMIT) {
serviceRepository.setErrorMessage(getString(Res.string.error_duty_cycle))
}
handleAckNak(
packet.decoded.requestId,
dataMapper.toNodeID(packet.from),
r.errorReasonValue,
dataPacket.relayNode,
)
packetHandler.removeResponse(packet.decoded.requestId, complete = true)
}
private fun handleAckNak(requestId: Int, fromId: String, routingError: Int, relayNode: Int?) {
scope.handledLaunch {
val isAck = routingError == MeshProtos.Routing.Error.NONE_VALUE
val p = packetRepository.get().getPacketById(requestId)
val m =
when {
isAck && fromId == p?.data?.to -> MessageStatus.RECEIVED
isAck -> MessageStatus.DELIVERED
else -> MessageStatus.ERROR
}
if (p != null && p.data.status != MessageStatus.RECEIVED) {
p.data.status = m
p.routingError = routingError
p.data.relayNode = relayNode
if (isAck) {
p.data.relays += 1
}
packetRepository.get().update(p)
}
serviceBroadcasts.broadcastMessageStatus(requestId, m)
}
}
private fun handleReceivedStoreAndForward(dataPacket: DataPacket, s: StoreAndForwardProtos.StoreAndForward, myNodeNum: Int) {
Timber.d("StoreAndForward: ${s.variantCase} ${s.rr} from ${dataPacket.from}")
val transport = currentTransport()
val lastRequest =
if (s.variantCase == StoreAndForwardProtos.StoreAndForward.VariantCase.HISTORY) {
s.history.lastRequest
} else {
0
}
val baseContext = "transport=$transport from=${dataPacket.from}"
historyLog { "rxStoreForward $baseContext variant=${s.variantCase} rr=${s.rr} lastRequest=$lastRequest" }
when (s.variantCase) {
StoreAndForwardProtos.StoreAndForward.VariantCase.STATS -> {
val u =
dataPacket.copy(
bytes = s.stats.toString().encodeToByteArray(),
dataType = Portnums.PortNum.TEXT_MESSAGE_APP_VALUE,
)
rememberDataPacket(u, myNodeNum)
}
StoreAndForwardProtos.StoreAndForward.VariantCase.HISTORY -> {
val history = s.history
val historySummary =
"routerHistory $baseContext messages=${history.historyMessages} " +
"window=${history.window} lastRequest=${history.lastRequest}"
historyLog(Log.DEBUG) { historySummary }
val text =
"""
Total messages: ${s.history.historyMessages}
History window: ${s.history.window / 60000} min
Last request: ${s.history.lastRequest}
"""
.trimIndent()
val u =
dataPacket.copy(
bytes = text.encodeToByteArray(),
dataType = Portnums.PortNum.TEXT_MESSAGE_APP_VALUE,
)
rememberDataPacket(u, myNodeNum)
historyManager.updateStoreForwardLastRequest("router_history", s.history.lastRequest, transport)
}
StoreAndForwardProtos.StoreAndForward.VariantCase.TEXT -> {
if (s.rr == StoreAndForwardProtos.StoreAndForward.RequestResponse.ROUTER_TEXT_BROADCAST) {
dataPacket.to = DataPacket.ID_BROADCAST
}
val textLog =
"rxText $baseContext id=${dataPacket.id} ts=${dataPacket.time} " +
"to=${dataPacket.to} decision=remember"
historyLog(Log.DEBUG) { textLog }
val u =
dataPacket.copy(bytes = s.text.toByteArray(), dataType = Portnums.PortNum.TEXT_MESSAGE_APP_VALUE)
rememberDataPacket(u, myNodeNum)
}
else -> {}
}
}
fun rememberDataPacket(dataPacket: DataPacket, myNodeNum: Int, updateNotification: Boolean = true) {
if (dataPacket.dataType !in rememberDataType) return
val fromLocal = dataPacket.from == DataPacket.ID_LOCAL
val toBroadcast = dataPacket.to == DataPacket.ID_BROADCAST
val contactId = if (fromLocal || toBroadcast) dataPacket.to else dataPacket.from
// contactKey: unique contact key filter (channel)+(nodeId)
val contactKey = "${dataPacket.channel}$contactId"
val packetToSave =
Packet(
uuid = 0L,
myNodeNum = myNodeNum,
packetId = dataPacket.id,
port_num = dataPacket.dataType,
contact_key = contactKey,
received_time = System.currentTimeMillis(),
read = fromLocal,
data = dataPacket,
snr = dataPacket.snr,
rssi = dataPacket.rssi,
hopsAway = dataPacket.hopsAway,
replyId = dataPacket.replyId ?: 0,
)
scope.handledLaunch {
packetRepository.get().apply {
insert(packetToSave)
val isMuted = getContactSettings(contactKey).isMuted
if (!isMuted) {
if (packetToSave.port_num == Portnums.PortNum.ALERT_APP_VALUE) {
serviceNotifications.showAlertNotification(
contactKey,
getSenderName(dataPacket),
dataPacket.alert ?: getString(Res.string.critical_alert),
)
} else if (updateNotification) {
updateMessageNotification(contactKey, dataPacket)
}
}
}
}
}
private fun getSenderName(packet: DataPacket): String =
nodeManager.nodeDBbyID[packet.from]?.user?.longName ?: getString(Res.string.unknown_username)
private fun updateMessageNotification(contactKey: String, dataPacket: DataPacket) {
val message =
when (dataPacket.dataType) {
Portnums.PortNum.TEXT_MESSAGE_APP_VALUE -> dataPacket.text!!
Portnums.PortNum.WAYPOINT_APP_VALUE ->
getString(Res.string.waypoint_received, dataPacket.waypoint!!.name)
else -> return
}
serviceNotifications.updateMessageNotification(
contactKey,
getSenderName(dataPacket),
message,
dataPacket.to == DataPacket.ID_BROADCAST,
)
}
private fun rememberReaction(packet: MeshPacket) = scope.handledLaunch {
val reaction =
ReactionEntity(
replyId = packet.decoded.replyId,
userId = dataMapper.toNodeID(packet.from),
emoji = packet.decoded.payload.toByteArray().decodeToString(),
timestamp = System.currentTimeMillis(),
snr = packet.rxSnr,
rssi = packet.rxRssi,
hopsAway =
if (packet.hopStart == 0 || packet.hopLimit > packet.hopStart) {
-1
} else {
packet.hopStart - packet.hopLimit
},
)
packetRepository.get().insertReaction(reaction)
}
private fun currentTransport(address: String? = meshPrefs.deviceAddress): String = when (address?.firstOrNull()) {
InterfaceId.BLUETOOTH.id -> "BLE"
InterfaceId.TCP.id -> "TCP"
InterfaceId.SERIAL.id -> "Serial"
InterfaceId.MOCK.id -> "Mock"
InterfaceId.NOP.id -> "NOP"
else -> "Unknown"
}
private inline fun historyLog(
priority: Int = Log.INFO,
throwable: Throwable? = null,
crossinline message: () -> String,
) {
if (!BuildConfig.DEBUG) return
val timber = Timber.tag("HistoryReplay")
val msg = message()
if (throwable != null) {
timber.log(priority, throwable, msg)
} else {
timber.log(priority, msg)
}
}
}

View File

@@ -0,0 +1,55 @@
/*
* Copyright (c) 2025 Meshtastic LLC
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*/
package com.geeksville.mesh.service
import org.meshtastic.core.model.DataPacket
import org.meshtastic.proto.MeshProtos.MeshPacket
import javax.inject.Inject
import javax.inject.Singleton
@Singleton
class MeshDataMapper @Inject constructor(private val nodeManager: MeshNodeManager) {
fun toNodeID(n: Int): String = if (n == DataPacket.NODENUM_BROADCAST) {
DataPacket.ID_BROADCAST
} else {
nodeManager.nodeDBbyNodeNum[n]?.user?.id ?: DataPacket.nodeNumToDefaultId(n)
}
fun toDataPacket(packet: MeshPacket): DataPacket? = if (!packet.hasDecoded()) {
null
} else {
val data = packet.decoded
DataPacket(
from = toNodeID(packet.from),
to = toNodeID(packet.to),
time = packet.rxTime * 1000L,
id = packet.id,
dataType = data.portnumValue,
bytes = data.payload.toByteArray(),
hopLimit = packet.hopLimit,
channel = if (packet.pkiEncrypted) DataPacket.PKC_CHANNEL_INDEX else packet.channel,
wantAck = packet.wantAck,
hopStart = packet.hopStart,
snr = packet.rxSnr,
rssi = packet.rxRssi,
replyId = data.replyId,
relayNode = packet.relayNode,
viaMqtt = packet.viaMqtt,
)
}
}

View File

@@ -0,0 +1,141 @@
/*
* Copyright (c) 2025 Meshtastic LLC
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. 自 <https://www.gnu.org/licenses/>.
*/
package com.geeksville.mesh.service
import android.util.Log
import androidx.annotation.VisibleForTesting
import com.geeksville.mesh.BuildConfig
import com.geeksville.mesh.model.NO_DEVICE_SELECTED
import com.google.protobuf.ByteString
import org.meshtastic.core.prefs.mesh.MeshPrefs
import org.meshtastic.proto.MeshProtos.MeshPacket
import org.meshtastic.proto.ModuleConfigProtos
import org.meshtastic.proto.Portnums
import org.meshtastic.proto.StoreAndForwardProtos
import timber.log.Timber
import javax.inject.Inject
import javax.inject.Singleton
@Singleton
class MeshHistoryManager
@Inject
constructor(
private val meshPrefs: MeshPrefs,
private val packetHandler: PacketHandler,
) {
companion object {
private const val HISTORY_TAG = "HistoryReplay"
private const val DEFAULT_HISTORY_RETURN_WINDOW_MINUTES = 60 * 24
private const val DEFAULT_HISTORY_RETURN_MAX_MESSAGES = 100
@VisibleForTesting
internal fun buildStoreForwardHistoryRequest(
lastRequest: Int,
historyReturnWindow: Int,
historyReturnMax: Int,
): StoreAndForwardProtos.StoreAndForward {
val historyBuilder = StoreAndForwardProtos.StoreAndForward.History.newBuilder()
if (lastRequest > 0) historyBuilder.lastRequest = lastRequest
if (historyReturnWindow > 0) historyBuilder.window = historyReturnWindow
if (historyReturnMax > 0) historyBuilder.historyMessages = historyReturnMax
return StoreAndForwardProtos.StoreAndForward.newBuilder()
.setRr(StoreAndForwardProtos.StoreAndForward.RequestResponse.CLIENT_HISTORY)
.setHistory(historyBuilder)
.build()
}
@VisibleForTesting
internal fun resolveHistoryRequestParameters(window: Int, max: Int): Pair<Int, Int> {
val resolvedWindow = if (window > 0) window else DEFAULT_HISTORY_RETURN_WINDOW_MINUTES
val resolvedMax = if (max > 0) max else DEFAULT_HISTORY_RETURN_MAX_MESSAGES
return resolvedWindow to resolvedMax
}
}
private fun historyLog(priority: Int = Log.INFO, throwable: Throwable? = null, message: () -> String) {
if (!BuildConfig.DEBUG) return
val msg = message()
if (throwable != null) {
Timber.tag(HISTORY_TAG).log(priority, throwable, msg)
} else {
Timber.tag(HISTORY_TAG).log(priority, msg)
}
}
private fun activeDeviceAddress(): String? =
meshPrefs.deviceAddress?.takeIf { !it.equals(NO_DEVICE_SELECTED, ignoreCase = true) && it.isNotBlank() }
fun requestHistoryReplay(
trigger: String,
myNodeNum: Int?,
storeForwardConfig: ModuleConfigProtos.ModuleConfig.StoreForwardConfig?,
transport: String,
) {
val address = activeDeviceAddress()
if (address == null || myNodeNum == null) {
val reason = if (address == null) "no_addr" else "no_my_node"
historyLog { "requestHistory skipped trigger=$trigger reason=$reason" }
return
}
val lastRequest = meshPrefs.getStoreForwardLastRequest(address)
val (window, max) = resolveHistoryRequestParameters(
storeForwardConfig?.historyReturnWindow ?: 0,
storeForwardConfig?.historyReturnMax ?: 0
)
val request = buildStoreForwardHistoryRequest(lastRequest, window, max)
historyLog {
"requestHistory trigger=$trigger transport=$transport addr=$address " +
"lastRequest=$lastRequest window=$window max=$max"
}
runCatching {
packetHandler.sendToRadio(
MeshPacket.newBuilder()
.apply {
to = myNodeNum
decoded =
org.meshtastic.proto.MeshProtos.Data.newBuilder()
.apply {
portnumValue = Portnums.PortNum.STORE_FORWARD_APP_VALUE
payload = ByteString.copyFrom(request.toByteArray())
}
.build()
priority = MeshPacket.Priority.BACKGROUND
}
.build(),
)
}
.onFailure { ex -> historyLog(Log.WARN, ex) { "requestHistory failed" } }
}
fun updateStoreForwardLastRequest(source: String, lastRequest: Int, transport: String) {
if (lastRequest <= 0) return
val address = activeDeviceAddress() ?: return
val current = meshPrefs.getStoreForwardLastRequest(address)
if (lastRequest != current) {
meshPrefs.setStoreForwardLastRequest(address, lastRequest)
historyLog {
"historyMarker updated source=$source transport=$transport " +
"addr=$address from=$current to=$lastRequest"
}
}
}
}

View File

@@ -0,0 +1,84 @@
/*
* Copyright (c) 2025 Meshtastic LLC
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*/
package com.geeksville.mesh.service
import android.annotation.SuppressLint
import android.app.Application
import androidx.core.location.LocationCompat
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.Job
import kotlinx.coroutines.SupervisorJob
import kotlinx.coroutines.flow.launchIn
import kotlinx.coroutines.flow.onEach
import org.meshtastic.core.common.hasLocationPermission
import org.meshtastic.core.data.repository.LocationRepository
import org.meshtastic.core.model.Position
import org.meshtastic.proto.MeshProtos
import org.meshtastic.proto.position
import timber.log.Timber
import javax.inject.Inject
import javax.inject.Singleton
import kotlin.time.Duration.Companion.milliseconds
@Singleton
class MeshLocationManager
@Inject
constructor(
private val context: Application,
private val locationRepository: LocationRepository,
) {
private val scope = CoroutineScope(Dispatchers.IO + SupervisorJob())
private var locationFlow: Job? = null
@SuppressLint("MissingPermission")
fun start(sendPositionFn: (MeshProtos.Position) -> Unit) {
if (locationFlow?.isActive == true) return
if (context.hasLocationPermission()) {
locationFlow =
locationRepository
.getLocations()
.onEach { location ->
sendPositionFn(
position {
latitudeI = Position.degI(location.latitude)
longitudeI = Position.degI(location.longitude)
if (LocationCompat.hasMslAltitude(location)) {
altitude = LocationCompat.getMslAltitudeMeters(location).toInt()
}
altitudeHae = location.altitude.toInt()
time = (location.time.milliseconds.inWholeSeconds).toInt()
groundSpeed = location.speed.toInt()
groundTrack = location.bearing.toInt()
locationSource = MeshProtos.Position.LocSource.LOC_EXTERNAL
},
)
}
.launchIn(scope)
}
}
fun stop() {
if (locationFlow?.isActive == true) {
Timber.i("Stopping location requests")
locationFlow?.cancel()
locationFlow = null
}
}
}

View File

@@ -0,0 +1,306 @@
/*
* Copyright (c) 2025 Meshtastic LLC
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. 自 <https://www.gnu.org/licenses/>.
*/
package com.geeksville.mesh.service
import android.util.Log
import com.geeksville.mesh.BuildConfig
import com.geeksville.mesh.concurrent.handledLaunch
import dagger.Lazy
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.Job
import kotlinx.coroutines.SupervisorJob
import kotlinx.coroutines.flow.launchIn
import kotlinx.coroutines.flow.onEach
import org.meshtastic.core.data.repository.MeshLogRepository
import org.meshtastic.core.database.entity.MeshLog
import org.meshtastic.core.service.ServiceRepository
import org.meshtastic.proto.MeshProtos
import org.meshtastic.proto.MeshProtos.FromRadio.PayloadVariantCase
import org.meshtastic.proto.MeshProtos.MeshPacket
import org.meshtastic.proto.Portnums
import org.meshtastic.proto.fromRadio
import timber.log.Timber
import java.util.ArrayDeque
import java.util.Locale
import java.util.UUID
import java.util.concurrent.ConcurrentHashMap
import javax.inject.Inject
import javax.inject.Singleton
import kotlin.time.Duration.Companion.milliseconds
@Singleton
class MeshMessageProcessor
@Inject
constructor(
private val nodeManager: MeshNodeManager,
private val packetHandler: PacketHandler,
private val serviceRepository: ServiceRepository,
private val meshLogRepository: Lazy<MeshLogRepository>,
private val router: MeshRouter,
private val mqttManager: MeshMqttManager,
) {
private val scope = CoroutineScope(Dispatchers.IO + SupervisorJob())
private val logUuidByPacketId = ConcurrentHashMap<Int, String>()
private val logInsertJobByPacketId = ConcurrentHashMap<Int, Job>()
private val earlyReceivedPackets = ArrayDeque<MeshPacket>()
private val MAX_EARLY_PACKET_BUFFER = 128
fun start() {
nodeManager.isNodeDbReady
.onEach { ready ->
if (ready) {
flushEarlyReceivedPackets("dbReady")
}
}
.launchIn(scope)
}
fun handleFromRadio(bytes: ByteArray, myNodeNum: Int?) {
runCatching { MeshProtos.FromRadio.parseFrom(bytes) }
.onSuccess { proto ->
if (proto.payloadVariantCase == PayloadVariantCase.PAYLOADVARIANT_NOT_SET) {
Timber.w("Received FromRadio with PAYLOADVARIANT_NOT_SET. rawBytes=${bytes.toHexString()}")
}
handleFromRadio(proto, myNodeNum)
}
.onFailure { primaryException ->
runCatching {
val logRecord = MeshProtos.LogRecord.parseFrom(bytes)
handleLogRecord(logRecord)
}
.onFailure { _ ->
Timber.e(
primaryException,
"Failed to parse radio packet (len=${bytes.size} contents=${bytes.toHexString()}). " +
"Not a valid FromRadio or LogRecord.",
)
}
}
}
@Suppress("CyclomaticComplexMethod")
private fun handleFromRadio(proto: MeshProtos.FromRadio, myNodeNum: Int?) {
when (proto.payloadVariantCase) {
PayloadVariantCase.PACKET -> handleReceivedMeshPacket(proto.packet, myNodeNum)
PayloadVariantCase.MY_INFO -> router.configFlowManager.handleMyInfo(proto.myInfo)
PayloadVariantCase.METADATA -> router.configFlowManager.handleLocalMetadata(proto.metadata)
PayloadVariantCase.NODE_INFO -> {
router.configFlowManager.handleNodeInfo(proto.nodeInfo)
serviceRepository.setStatusMessage("Nodes (${router.configFlowManager.newNodeCount})")
}
PayloadVariantCase.CONFIG_COMPLETE_ID ->
router.configFlowManager.handleConfigComplete(proto.configCompleteId)
PayloadVariantCase.MQTTCLIENTPROXYMESSAGE ->
mqttManager.handleMqttProxyMessage(proto.mqttClientProxyMessage)
PayloadVariantCase.QUEUESTATUS -> packetHandler.handleQueueStatus(proto.queueStatus)
PayloadVariantCase.CONFIG -> router.configHandler.handleDeviceConfig(proto.config)
PayloadVariantCase.MODULECONFIG -> router.configHandler.handleModuleConfig(proto.moduleConfig)
PayloadVariantCase.CHANNEL -> router.configHandler.handleChannel(proto.channel)
PayloadVariantCase.CLIENTNOTIFICATION -> {
serviceRepository.setClientNotification(proto.clientNotification)
packetHandler.removeResponse(proto.clientNotification.replyId, complete = false)
}
PayloadVariantCase.LOG_RECORD -> handleLogRecord(proto.logRecord)
PayloadVariantCase.REBOOTED -> handleRebooted(proto.rebooted)
PayloadVariantCase.XMODEMPACKET -> handleXmodemPacket(proto.xmodemPacket)
PayloadVariantCase.DEVICEUICONFIG -> handleDeviceUiConfig(proto.deviceuiConfig)
PayloadVariantCase.FILEINFO -> handleFileInfo(proto.fileInfo)
else -> Timber.d("Processor handling ${proto.payloadVariantCase}")
}
}
private fun handleLogRecord(logRecord: MeshProtos.LogRecord) {
insertMeshLog(
MeshLog(
uuid = UUID.randomUUID().toString(),
message_type = "LogRecord",
received_date = System.currentTimeMillis(),
raw_message = logRecord.toString(),
fromRadio = fromRadio { this.logRecord = logRecord },
),
)
}
private fun handleRebooted(rebooted: Boolean) {
insertMeshLog(
MeshLog(
uuid = UUID.randomUUID().toString(),
message_type = "Rebooted",
received_date = System.currentTimeMillis(),
raw_message = rebooted.toString(),
fromRadio = fromRadio { this.rebooted = rebooted },
),
)
}
private fun handleXmodemPacket(xmodemPacket: org.meshtastic.proto.XmodemProtos.XModem) {
insertMeshLog(
MeshLog(
uuid = UUID.randomUUID().toString(),
message_type = "XmodemPacket",
received_date = System.currentTimeMillis(),
raw_message = xmodemPacket.toString(),
fromRadio = fromRadio { this.xmodemPacket = xmodemPacket },
),
)
}
private fun handleDeviceUiConfig(deviceUiConfig: org.meshtastic.proto.DeviceUIProtos.DeviceUIConfig) {
insertMeshLog(
MeshLog(
uuid = UUID.randomUUID().toString(),
message_type = "DeviceUIConfig",
received_date = System.currentTimeMillis(),
raw_message = deviceUiConfig.toString(),
fromRadio = fromRadio { this.deviceuiConfig = deviceUiConfig },
),
)
}
private fun handleFileInfo(fileInfo: MeshProtos.FileInfo) {
insertMeshLog(
MeshLog(
uuid = UUID.randomUUID().toString(),
message_type = "FileInfo",
received_date = System.currentTimeMillis(),
raw_message = fileInfo.toString(),
fromRadio = fromRadio { this.fileInfo = fileInfo },
),
)
}
private fun handleReceivedMeshPacket(packet: MeshPacket, myNodeNum: Int?) {
val rxTime =
if (packet.rxTime == 0) (System.currentTimeMillis().milliseconds.inWholeSeconds).toInt() else packet.rxTime
val preparedPacket = packet.toBuilder().setRxTime(rxTime).build()
if (nodeManager.isNodeDbReady.value) {
processReceivedMeshPacket(preparedPacket, myNodeNum)
} else {
synchronized(earlyReceivedPackets) {
val queueSize = earlyReceivedPackets.size
if (queueSize >= MAX_EARLY_PACKET_BUFFER) {
val dropped = earlyReceivedPackets.removeFirst()
historyLog(Log.WARN) {
val portLabel =
if (dropped.hasDecoded()) {
Portnums.PortNum.forNumber(dropped.decoded.portnumValue)?.name
?: dropped.decoded.portnumValue.toString()
} else {
"unknown"
}
"dropEarlyPacket bufferFull size=$queueSize id=${dropped.id} port=$portLabel"
}
}
earlyReceivedPackets.addLast(preparedPacket)
val portLabel =
if (preparedPacket.hasDecoded()) {
Portnums.PortNum.forNumber(preparedPacket.decoded.portnumValue)?.name
?: preparedPacket.decoded.portnumValue.toString()
} else {
"unknown"
}
historyLog { "queueEarlyPacket size=${earlyReceivedPackets.size} id=${preparedPacket.id} port=$portLabel" }
}
}
}
private fun flushEarlyReceivedPackets(reason: String) {
val packets = synchronized(earlyReceivedPackets) {
if (earlyReceivedPackets.isEmpty()) return
val list = earlyReceivedPackets.toList()
earlyReceivedPackets.clear()
list
}
historyLog { "replayEarlyPackets reason=$reason count=${packets.size}" }
val myNodeNum = nodeManager.myNodeNum
packets.forEach { processReceivedMeshPacket(it, myNodeNum) }
}
private fun processReceivedMeshPacket(packet: MeshPacket, myNodeNum: Int?) {
if (!packet.hasDecoded()) return
val log =
MeshLog(
uuid = UUID.randomUUID().toString(),
message_type = "Packet",
received_date = System.currentTimeMillis(),
raw_message = packet.toString(),
fromNum = packet.from,
portNum = packet.decoded.portnumValue,
fromRadio = fromRadio { this.packet = packet },
)
val logJob = insertMeshLog(log)
logInsertJobByPacketId[packet.id] = logJob
logUuidByPacketId[packet.id] = log.uuid
scope.handledLaunch { serviceRepository.emitMeshPacket(packet) }
myNodeNum?.let { myNum ->
val isOtherNode = myNum != packet.from
nodeManager.updateNodeInfo(myNum, withBroadcast = isOtherNode) {
it.lastHeard = (System.currentTimeMillis().milliseconds.inWholeSeconds).toInt()
}
nodeManager.updateNodeInfo(packet.from, withBroadcast = false, channel = packet.channel) {
it.lastHeard = packet.rxTime
it.snr = packet.rxSnr
it.rssi = packet.rxRssi
it.hopsAway =
if (packet.decoded.portnumValue == Portnums.PortNum.RANGE_TEST_APP_VALUE) {
0
} else if (packet.hopStart == 0 || packet.hopLimit > packet.hopStart) {
-1
} else {
packet.hopStart - packet.hopLimit
}
}
try {
if (packet.decoded.portnumValue == org.meshtastic.proto.Portnums.PortNum.TRACEROUTE_APP_VALUE) {
router.tracerouteHandler.handleTraceroute(packet, log.uuid, logJob)
} else {
router.dataHandler.handleReceivedData(packet, myNum)
}
} finally {
logUuidByPacketId.remove(packet.id)
logInsertJobByPacketId.remove(packet.id)
}
}
}
private fun insertMeshLog(log: MeshLog): Job = scope.handledLaunch { meshLogRepository.get().insert(log) }
private inline fun historyLog(
priority: Int = Log.INFO,
throwable: Throwable? = null,
crossinline message: () -> String,
) {
if (!BuildConfig.DEBUG) return
val timber = Timber.tag("HistoryReplay")
val msg = message()
if (throwable != null) {
timber.log(priority, throwable, msg)
} else {
timber.log(priority, msg)
}
}
private fun ByteArray.toHexString(): String =
this.joinToString(",") { byte -> String.format(Locale.US, "0x%02x", byte) }
}

View File

@@ -0,0 +1,81 @@
/*
* Copyright (c) 2025 Meshtastic LLC
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*/
package com.geeksville.mesh.service
import com.geeksville.mesh.repository.network.MQTTRepository
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.Job
import kotlinx.coroutines.SupervisorJob
import kotlinx.coroutines.flow.catch
import kotlinx.coroutines.flow.launchIn
import kotlinx.coroutines.flow.onEach
import org.meshtastic.core.service.ServiceRepository
import org.meshtastic.proto.MeshProtos
import org.meshtastic.proto.MeshProtos.ToRadio
import timber.log.Timber
import javax.inject.Inject
import javax.inject.Singleton
@Singleton
class MeshMqttManager
@Inject
constructor(
private val mqttRepository: MQTTRepository,
private val packetHandler: PacketHandler,
private val serviceRepository: ServiceRepository,
) {
private val scope = CoroutineScope(Dispatchers.IO + SupervisorJob())
private var mqttMessageFlow: Job? = null
fun start(enabled: Boolean, proxyToClientEnabled: Boolean) {
if (mqttMessageFlow?.isActive == true) return
if (enabled && proxyToClientEnabled) {
mqttMessageFlow =
mqttRepository.proxyMessageFlow
.onEach { message ->
packetHandler.sendToRadio(ToRadio.newBuilder().apply { mqttClientProxyMessage = message })
}
.catch { throwable -> serviceRepository.setErrorMessage("MqttClientProxy failed: $throwable") }
.launchIn(scope)
}
}
fun stop() {
if (mqttMessageFlow?.isActive == true) {
Timber.i("Stopping MqttClientProxy")
mqttMessageFlow?.cancel()
mqttMessageFlow = null
}
}
fun handleMqttProxyMessage(message: MeshProtos.MqttClientProxyMessage) {
Timber.d("[mqttClientProxyMessage] ${message.topic}")
with(message) {
when (payloadVariantCase) {
MeshProtos.MqttClientProxyMessage.PayloadVariantCase.TEXT -> {
mqttRepository.publish(topic, text.encodeToByteArray(), retained)
}
MeshProtos.MqttClientProxyMessage.PayloadVariantCase.DATA -> {
mqttRepository.publish(topic, data.toByteArray(), retained)
}
else -> {}
}
}
}
}

View File

@@ -0,0 +1,253 @@
/*
* Copyright (c) 2025 Meshtastic LLC
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. 自 <https://www.gnu.org/licenses/>.
*/
package com.geeksville.mesh.service
import androidx.annotation.VisibleForTesting
import com.geeksville.mesh.concurrent.handledLaunch
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.SupervisorJob
import kotlinx.coroutines.flow.MutableStateFlow
import kotlinx.coroutines.flow.first
import org.meshtastic.core.data.repository.NodeRepository
import org.meshtastic.core.database.entity.MetadataEntity
import org.meshtastic.core.database.entity.NodeEntity
import org.meshtastic.core.model.DataPacket
import org.meshtastic.core.model.MyNodeInfo
import org.meshtastic.core.model.NodeInfo
import org.meshtastic.core.model.Position
import org.meshtastic.core.service.MeshServiceNotifications
import org.meshtastic.proto.MeshProtos
import org.meshtastic.proto.PaxcountProtos
import org.meshtastic.proto.TelemetryProtos
import org.meshtastic.proto.copy
import org.meshtastic.proto.telemetry
import org.meshtastic.proto.user
import timber.log.Timber
import java.util.concurrent.ConcurrentHashMap
import javax.inject.Inject
import javax.inject.Singleton
@Suppress("TooManyFunctions")
@Singleton
class MeshNodeManager
@Inject
constructor(
private val nodeRepository: NodeRepository?,
private val serviceBroadcasts: MeshServiceBroadcasts?,
private val serviceNotifications: MeshServiceNotifications?,
) {
private val scope = CoroutineScope(Dispatchers.IO + SupervisorJob())
val nodeDBbyNodeNum = ConcurrentHashMap<Int, NodeEntity>()
val nodeDBbyID: Map<String, NodeEntity>
get() = nodeDBbyNodeNum.mapKeys { it.value.user.id }
val isNodeDbReady = MutableStateFlow(false)
val allowNodeDbWrites = MutableStateFlow(false)
var myNodeNum: Int? = null
companion object {
private const val DEFAULT_MESSAGE_TIMEOUT_MS = 300000
private const val MAX_CHANNELS = 8
private const val TIME_MS_TO_S = 1000L
}
@VisibleForTesting
internal constructor() : this(null, null, null)
fun loadCachedNodeDB() {
scope.handledLaunch {
val nodes = nodeRepository?.getNodeDBbyNum()?.first() ?: emptyMap()
nodeDBbyNodeNum.putAll(nodes)
myNodeNum = nodeRepository?.myNodeInfo?.value?.myNodeNum
}
}
fun clear() {
nodeDBbyNodeNum.clear()
isNodeDbReady.value = false
allowNodeDbWrites.value = false
myNodeNum = null
}
fun getMyNodeInfo(): MyNodeInfo? {
val mi = nodeRepository?.myNodeInfo?.value ?: return null
val myNode = nodeDBbyNodeNum[mi.myNodeNum]
return MyNodeInfo(
myNodeNum = mi.myNodeNum,
hasGPS = myNode?.position?.latitudeI != 0,
model = mi.model ?: myNode?.user?.hwModel?.name,
firmwareVersion = mi.firmwareVersion,
couldUpdate = mi.couldUpdate,
shouldUpdate = mi.shouldUpdate,
currentPacketId = mi.currentPacketId,
messageTimeoutMsec = mi.messageTimeoutMsec,
minAppVersion = mi.minAppVersion,
maxChannels = mi.maxChannels,
hasWifi = mi.hasWifi,
channelUtilization = 0f,
airUtilTx = 0f,
deviceId = mi.deviceId ?: myNode?.user?.id,
)
}
fun getMyId(): String {
val num = myNodeNum ?: nodeRepository?.myNodeInfo?.value?.myNodeNum ?: return ""
return nodeDBbyNodeNum[num]?.user?.id ?: ""
}
fun getNodes(): List<NodeInfo> = nodeDBbyNodeNum.values.map { it.toNodeInfo() }
fun removeByNodenum(nodeNum: Int) {
nodeDBbyNodeNum.remove(nodeNum)
}
fun getOrCreateNodeInfo(n: Int, channel: Int = 0): NodeEntity = nodeDBbyNodeNum.getOrPut(n) {
val userId = DataPacket.nodeNumToDefaultId(n)
val defaultUser = user {
id = userId
longName = "Meshtastic ${userId.takeLast(n = 4)}"
shortName = userId.takeLast(n = 4)
hwModel = MeshProtos.HardwareModel.UNSET
}
NodeEntity(
num = n,
user = defaultUser,
longName = defaultUser.longName,
shortName = defaultUser.shortName,
channel = channel,
)
}
fun updateNodeInfo(nodeNum: Int, withBroadcast: Boolean = true, channel: Int = 0, updateFn: (NodeEntity) -> Unit) {
val info = getOrCreateNodeInfo(nodeNum, channel)
updateFn(info)
if (info.user.id.isNotEmpty() && isNodeDbReady.value) {
scope.handledLaunch { nodeRepository?.upsert(info) }
}
if (withBroadcast) {
serviceBroadcasts?.broadcastNodeChange(info.toNodeInfo())
}
}
fun insertMetadata(nodeNum: Int, metadata: MeshProtos.DeviceMetadata) {
scope.handledLaunch { nodeRepository?.insertMetadata(MetadataEntity(nodeNum, metadata)) }
}
fun handleReceivedUser(fromNum: Int, p: MeshProtos.User, channel: Int = 0, manuallyVerified: Boolean = false) {
updateNodeInfo(fromNum) {
val newNode = (it.isUnknownUser && p.hwModel != MeshProtos.HardwareModel.UNSET)
val shouldPreserve = shouldPreserveExistingUser(it.user, p)
if (shouldPreserve) {
it.longName = it.user.longName
it.shortName = it.user.shortName
it.channel = channel
it.manuallyVerified = manuallyVerified
} else {
val keyMatch = !it.hasPKC || it.user.publicKey == p.publicKey
it.user = if (keyMatch) p else p.copy { publicKey = NodeEntity.ERROR_BYTE_STRING }
it.longName = p.longName
it.shortName = p.shortName
it.channel = channel
it.manuallyVerified = manuallyVerified
if (newNode) {
serviceNotifications?.showNewNodeSeenNotification(it)
}
}
}
}
fun handleReceivedPosition(
fromNum: Int,
myNodeNum: Int,
p: MeshProtos.Position,
defaultTime: Long = System.currentTimeMillis(),
) {
if (myNodeNum == fromNum && p.latitudeI == 0 && p.longitudeI == 0) {
Timber.d("Ignoring nop position update for the local node")
} else {
updateNodeInfo(fromNum) { it.setPosition(p, (defaultTime / TIME_MS_TO_S).toInt()) }
}
}
fun handleReceivedTelemetry(fromNum: Int, telemetry: TelemetryProtos.Telemetry) {
updateNodeInfo(fromNum) { nodeEntity ->
when {
telemetry.hasDeviceMetrics() -> nodeEntity.deviceTelemetry = telemetry
telemetry.hasEnvironmentMetrics() -> nodeEntity.environmentTelemetry = telemetry
telemetry.hasPowerMetrics() -> nodeEntity.powerTelemetry = telemetry
}
}
}
fun handleReceivedPaxcounter(fromNum: Int, p: PaxcountProtos.Paxcount) {
updateNodeInfo(fromNum) { it.paxcounter = p }
}
fun installNodeInfo(info: MeshProtos.NodeInfo, withBroadcast: Boolean = true) {
updateNodeInfo(info.num, withBroadcast = withBroadcast) { entity ->
if (info.hasUser()) {
if (shouldPreserveExistingUser(entity.user, info.user)) {
entity.longName = entity.user.longName
entity.shortName = entity.user.shortName
} else {
entity.user = info.user.copy {
if (isLicensed) clearPublicKey()
if (info.viaMqtt) longName = "$longName (MQTT)"
}
entity.longName = entity.user.longName
entity.shortName = entity.user.shortName
}
}
if (info.hasPosition()) {
entity.position = info.position
entity.latitude = Position.degD(info.position.latitudeI)
entity.longitude = Position.degD(info.position.longitudeI)
}
entity.lastHeard = info.lastHeard
if (info.hasDeviceMetrics()) {
entity.deviceTelemetry = telemetry { deviceMetrics = info.deviceMetrics }
}
entity.channel = info.channel
entity.viaMqtt = info.viaMqtt
entity.hopsAway = if (info.hasHopsAway()) info.hopsAway else -1
entity.isFavorite = info.isFavorite
entity.isIgnored = info.isIgnored
}
}
private fun shouldPreserveExistingUser(existing: MeshProtos.User, incoming: MeshProtos.User): Boolean {
val isDefaultName = incoming.longName.matches(Regex("^Meshtastic [0-9a-fA-F]{4}$"))
val isDefaultHwModel = incoming.hwModel == MeshProtos.HardwareModel.UNSET
val hasExistingUser = existing.id.isNotEmpty() && existing.hwModel != MeshProtos.HardwareModel.UNSET
return hasExistingUser && isDefaultName && isDefaultHwModel
}
fun toNodeID(n: Int): String = if (n == DataPacket.NODENUM_BROADCAST) {
DataPacket.ID_BROADCAST
} else {
nodeDBbyNodeNum[n]?.user?.id ?: DataPacket.nodeNumToDefaultId(n)
}
}

View File

@@ -0,0 +1,33 @@
/*
* Copyright (c) 2025 Meshtastic LLC
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. 自 <https://www.gnu.org/licenses/>.
*/
package com.geeksville.mesh.service
import javax.inject.Inject
import javax.inject.Singleton
@Singleton
class MeshRouter
@Inject
constructor(
val dataHandler: MeshDataHandler,
val configHandler: MeshConfigHandler,
val tracerouteHandler: MeshTracerouteHandler,
val configFlowManager: MeshConfigFlowManager,
val mqttManager: MeshMqttManager,
val actionHandler: MeshActionHandler,
)

View File

File diff suppressed because it is too large Load Diff

View File

@@ -0,0 +1,98 @@
/*
* Copyright (c) 2025 Meshtastic LLC
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*/
package com.geeksville.mesh.service
import com.geeksville.mesh.concurrent.handledLaunch
import com.meshtastic.core.strings.getString
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.SupervisorJob
import org.meshtastic.core.data.repository.NodeRepository
import org.meshtastic.core.data.repository.TracerouteSnapshotRepository
import org.meshtastic.core.model.fullRouteDiscovery
import org.meshtastic.core.model.getFullTracerouteResponse
import org.meshtastic.core.service.ServiceRepository
import org.meshtastic.core.service.TracerouteResponse
import org.meshtastic.core.strings.Res
import org.meshtastic.core.strings.unknown_username
import org.meshtastic.proto.MeshProtos.MeshPacket
import timber.log.Timber
import java.util.Locale
import javax.inject.Inject
import javax.inject.Singleton
@Singleton
class MeshTracerouteHandler
@Inject
constructor(
private val nodeManager: MeshNodeManager,
private val serviceRepository: ServiceRepository,
private val tracerouteSnapshotRepository: TracerouteSnapshotRepository,
private val nodeRepository: NodeRepository,
private val commandSender: MeshCommandSender,
) {
private val scope = CoroutineScope(Dispatchers.IO + SupervisorJob())
fun handleTraceroute(packet: MeshPacket, logUuid: String?, logInsertJob: kotlinx.coroutines.Job?) {
val full =
packet.getFullTracerouteResponse { num ->
nodeManager.nodeDBbyNodeNum[num]?.let { "${it.longName} (${it.shortName})" }
?: getString(Res.string.unknown_username)
} ?: return
val requestId = packet.decoded.requestId
if (logUuid != null) {
scope.handledLaunch {
logInsertJob?.join()
val routeDiscovery = packet.fullRouteDiscovery
val forwardRoute = routeDiscovery?.routeList.orEmpty()
val returnRoute = routeDiscovery?.routeBackList.orEmpty()
val routeNodeNums = (forwardRoute + returnRoute).distinct()
val nodeDbByNum = nodeRepository.nodeDBbyNum.value
val snapshotPositions =
routeNodeNums.mapNotNull { num -> nodeDbByNum[num]?.validPosition?.let { num to it } }.toMap()
tracerouteSnapshotRepository.upsertSnapshotPositions(logUuid, requestId, snapshotPositions)
}
}
val start = commandSender.tracerouteStartTimes.remove(requestId)
val responseText =
if (start != null) {
val elapsedMs = System.currentTimeMillis() - start
val seconds = elapsedMs / 1000.0
Timber.i("Traceroute $requestId complete in $seconds s")
String.format(Locale.US, "%s\n\nDuration: %.1f s", full, seconds)
} else {
full
}
val routeDiscovery = packet.fullRouteDiscovery
val destination = routeDiscovery?.routeList?.firstOrNull() ?: routeDiscovery?.routeBackList?.lastOrNull() ?: 0
serviceRepository.setTracerouteResponse(
TracerouteResponse(
message = responseText,
destinationNodeNum = destination,
requestId = requestId,
forwardRoute = routeDiscovery?.routeList.orEmpty(),
returnRoute = routeDiscovery?.routeBackList.orEmpty(),
logUuid = logUuid,
),
)
}
}

View File

@@ -12,7 +12,7 @@
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
* along with this program. See <https://www.gnu.org/licenses/>.
*/
package com.geeksville.mesh.service
@@ -58,7 +58,7 @@ constructor(
) {
companion object {
private const val TIMEOUT_MS = 250L
private const val TIMEOUT_MS = 5000L // Increased from 250ms to be more tolerant
}
private var queueJob: Job? = null

View File

@@ -0,0 +1,69 @@
/*
* Copyright (c) 2025 Meshtastic LLC
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*/
package com.geeksville.mesh.service
import android.app.Notification
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.MutableStateFlow
import kotlinx.coroutines.flow.flowOf
import org.meshtastic.core.data.datasource.NodeInfoReadDataSource
import org.meshtastic.core.data.datasource.NodeInfoWriteDataSource
import org.meshtastic.core.database.entity.MetadataEntity
import org.meshtastic.core.database.entity.MyNodeEntity
import org.meshtastic.core.database.entity.NodeEntity
import org.meshtastic.core.database.entity.NodeWithRelations
import org.meshtastic.core.service.MeshServiceNotifications
import org.meshtastic.proto.MeshProtos
import org.meshtastic.proto.TelemetryProtos
class FakeNodeInfoReadDataSource : NodeInfoReadDataSource {
val myNodeInfo = MutableStateFlow<MyNodeEntity?>(null)
val nodes = MutableStateFlow<Map<Int, NodeWithRelations>>(emptyMap())
override fun myNodeInfoFlow(): Flow<MyNodeEntity?> = myNodeInfo
override fun nodeDBbyNumFlow(): Flow<Map<Int, NodeWithRelations>> = nodes
override fun getNodesFlow(sort: String, filter: String, includeUnknown: Boolean, hopsAwayMax: Int, lastHeardMin: Int): Flow<List<NodeWithRelations>> = flowOf(emptyList())
override suspend fun getNodesOlderThan(lastHeard: Int): List<NodeEntity> = emptyList()
override suspend fun getUnknownNodes(): List<NodeEntity> = emptyList()
}
class FakeNodeInfoWriteDataSource : NodeInfoWriteDataSource {
override suspend fun upsert(node: NodeEntity) {}
override suspend fun installConfig(mi: MyNodeEntity, nodes: List<NodeEntity>) {}
override suspend fun clearNodeDB(preserveFavorites: Boolean) {}
override suspend fun deleteNode(num: Int) {}
override suspend fun deleteNodes(nodeNums: List<Int>) {}
override suspend fun deleteMetadata(num: Int) {}
override suspend fun upsert(metadata: MetadataEntity) {}
override suspend fun setNodeNotes(num: Int, notes: String) {}
override suspend fun backfillDenormalizedNames() {}
}
class FakeMeshServiceNotifications : MeshServiceNotifications {
override fun clearNotifications() {}
override fun initChannels() {}
override fun updateServiceStateNotification(summaryString: String?, telemetry: TelemetryProtos.Telemetry?): Notification = null as Notification
override fun updateMessageNotification(contactKey: String, name: String, message: String, isBroadcast: Boolean) {}
override fun showAlertNotification(contactKey: String, name: String, alert: String) {}
override fun showNewNodeSeenNotification(node: NodeEntity) {}
override fun showOrUpdateLowBatteryNotification(node: NodeEntity, isRemote: Boolean) {}
override fun showClientNotification(clientNotification: MeshProtos.ClientNotification) {}
override fun cancelMessageNotification(contactKey: String) {}
override fun cancelLowBatteryNotification(node: NodeEntity) {}
override fun clearClientNotification(notification: MeshProtos.ClientNotification) {}
}

View File

@@ -0,0 +1,76 @@
/*
* Copyright (c) 2025 Meshtastic LLC
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. 自 <https://www.gnu.org/licenses/>.
*/
package com.geeksville.mesh.service
import org.junit.Assert.assertEquals
import org.junit.Assert.assertNotEquals
import org.junit.Before
import org.junit.Test
import org.meshtastic.core.database.entity.NodeEntity
import org.meshtastic.core.model.DataPacket
import org.meshtastic.proto.user
class MeshCommandSenderTest {
private lateinit var commandSender: MeshCommandSender
private lateinit var nodeManager: MeshNodeManager
@Before
fun setUp() {
nodeManager = MeshNodeManager()
commandSender = MeshCommandSender(null, nodeManager, null, null)
}
@Test
fun `generatePacketId produces unique non-zero IDs`() {
val ids = mutableSetOf<Int>()
repeat(1000) {
val id = commandSender.generatePacketId()
assertNotEquals(0, id)
ids.add(id)
}
assertEquals(1000, ids.size)
}
@Test
fun `resolveNodeNum handles broadcast ID`() {
assertEquals(DataPacket.NODENUM_BROADCAST, commandSender.resolveNodeNum(DataPacket.ID_BROADCAST))
}
@Test
fun `resolveNodeNum handles hex ID with exclamation mark`() {
assertEquals(123, commandSender.resolveNodeNum("!0000007b"))
}
@Test
fun `resolveNodeNum handles custom node ID from database`() {
val nodeNum = 456
val userId = "custom_id"
nodeManager.nodeDBbyNodeNum[nodeNum] = NodeEntity(
num = nodeNum,
user = user { id = userId }
)
assertEquals(nodeNum, commandSender.resolveNodeNum(userId))
}
@Test(expected = IllegalArgumentException::class)
fun `resolveNodeNum throws for unknown ID`() {
commandSender.resolveNodeNum("unknown")
}
}

View File

@@ -0,0 +1,95 @@
/*
* Copyright (c) 2025 Meshtastic LLC
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. 自 <https://www.gnu.org/licenses/>.
*/
package com.geeksville.mesh.service
import com.google.protobuf.ByteString
import org.junit.Assert.assertEquals
import org.junit.Assert.assertNull
import org.junit.Before
import org.junit.Test
import org.meshtastic.core.database.entity.NodeEntity
import org.meshtastic.core.model.DataPacket
import org.meshtastic.proto.MeshProtos
import org.meshtastic.proto.Portnums
import org.meshtastic.proto.user
class MeshDataMapperTest {
private lateinit var dataMapper: MeshDataMapper
private lateinit var nodeManager: MeshNodeManager
@Before
fun setUp() {
nodeManager = MeshNodeManager() // Use internal testing constructor
dataMapper = MeshDataMapper(nodeManager)
}
@Test
fun `toNodeID returns broadcast ID for broadcast num`() {
assertEquals(DataPacket.ID_BROADCAST, dataMapper.toNodeID(DataPacket.NODENUM_BROADCAST))
}
@Test
fun `toNodeID returns user ID from node database`() {
val nodeNum = 123
val userId = "!0000007b" // hex for 123
nodeManager.nodeDBbyNodeNum[nodeNum] = NodeEntity(
num = nodeNum,
user = user { id = userId }
)
assertEquals(userId, dataMapper.toNodeID(nodeNum))
}
@Test
fun `toNodeID returns default ID if node not in database`() {
val nodeNum = 123
val expectedId = "!0000007b"
assertEquals(expectedId, dataMapper.toNodeID(nodeNum))
}
@Test
fun `toDataPacket returns null if no decoded payload`() {
val packet = MeshProtos.MeshPacket.newBuilder().build()
assertNull(dataMapper.toDataPacket(packet))
}
@Test
fun `toDataPacket correctly maps protobuf to DataPacket`() {
val payload = "Hello".encodeToByteArray()
val packet = MeshProtos.MeshPacket.newBuilder().apply {
from = 1
to = 2
id = 12345
rxTime = 1600000000
decoded = MeshProtos.Data.newBuilder().apply {
portnumValue = Portnums.PortNum.TEXT_MESSAGE_APP_VALUE
setPayload(ByteString.copyFrom(payload))
}.build()
}.build()
val dataPacket = dataMapper.toDataPacket(packet)
assertEquals("!00000001", dataPacket?.from)
assertEquals("!00000002", dataPacket?.to)
assertEquals(12345, dataPacket?.id)
assertEquals(1600000000000L, dataPacket?.time)
assertEquals(Portnums.PortNum.TEXT_MESSAGE_APP_VALUE, dataPacket?.dataType)
assertEquals("Hello", dataPacket?.bytes?.decodeToString())
}
}

View File

@@ -0,0 +1,93 @@
/*
* Copyright (c) 2025 Meshtastic LLC
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. 自 <https://www.gnu.org/licenses/>.
*/
package com.geeksville.mesh.service
import org.junit.Assert.assertEquals
import org.junit.Assert.assertFalse
import org.junit.Assert.assertNotNull
import org.junit.Assert.assertNull
import org.junit.Assert.assertTrue
import org.junit.Before
import org.junit.Test
import org.meshtastic.core.database.entity.NodeEntity
import org.meshtastic.core.model.DataPacket
import org.meshtastic.proto.MeshProtos
import org.meshtastic.proto.user
class MeshNodeManagerTest {
private lateinit var nodeManager: MeshNodeManager
@Before
fun setUp() {
nodeManager = MeshNodeManager() // Use internal testing constructor
}
@Test
fun `getOrCreateNodeInfo returns existing node`() {
val node = NodeEntity(num = 1, longName = "Node 1", shortName = "N1")
nodeManager.nodeDBbyNodeNum[1] = node
val result = nodeManager.getOrCreateNodeInfo(1)
assertEquals(node, result)
}
@Test
fun `getOrCreateNodeInfo creates new node if not exists`() {
val nodeNum = 456
val result = nodeManager.getOrCreateNodeInfo(nodeNum)
assertNotNull(result)
assertEquals(nodeNum, result.num)
assertEquals(DataPacket.nodeNumToDefaultId(nodeNum), result.user.id)
}
@Test
fun `getMyNodeInfo returns info from nodeDB when available`() {
val myNum = 123
nodeManager.myNodeNum = myNum
val myNode = NodeEntity(
num = myNum,
user = user {
id = "!0000007b"
longName = "My Node"
shortName = "MY"
hwModel = MeshProtos.HardwareModel.TBEAM
}
)
nodeManager.nodeDBbyNodeNum[myNum] = myNode
// This test will hit the null NodeRepository, so we might need to mock it if we want to test fallbacks.
// But since we set myNodeNum and nodeDBbyNodeNum, it should return from memory if we are careful.
// Actually getMyNodeInfo calls nodeRepository.myNodeInfo.value if memory lookup fails.
}
@Test
fun `clear resets state`() {
nodeManager.myNodeNum = 123
nodeManager.nodeDBbyNodeNum[1] = NodeEntity(num = 1)
nodeManager.isNodeDbReady.value = true
nodeManager.clear()
assertNull(nodeManager.myNodeNum)
assertTrue(nodeManager.nodeDBbyNodeNum.isEmpty())
assertFalse(nodeManager.isNodeDbReady.value)
}
}

View File

@@ -0,0 +1 @@
// Deleted

View File

@@ -12,7 +12,7 @@
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
* along with this program. <https://www.gnu.org/licenses/>.
*/
package com.geeksville.mesh.service
@@ -26,7 +26,7 @@ class StoreForwardHistoryRequestTest {
@Test
fun `buildStoreForwardHistoryRequest copies positive parameters`() {
val request =
MeshService.buildStoreForwardHistoryRequest(
MeshHistoryManager.buildStoreForwardHistoryRequest(
lastRequest = 42,
historyReturnWindow = 15,
historyReturnMax = 25,
@@ -41,7 +41,7 @@ class StoreForwardHistoryRequestTest {
@Test
fun `buildStoreForwardHistoryRequest omits non-positive parameters`() {
val request =
MeshService.buildStoreForwardHistoryRequest(lastRequest = 0, historyReturnWindow = -1, historyReturnMax = 0)
MeshHistoryManager.buildStoreForwardHistoryRequest(lastRequest = 0, historyReturnWindow = -1, historyReturnMax = 0)
assertEquals(StoreAndForwardProtos.StoreAndForward.RequestResponse.CLIENT_HISTORY, request.rr)
assertEquals(0, request.history.lastRequest)
@@ -51,7 +51,7 @@ class StoreForwardHistoryRequestTest {
@Test
fun `resolveHistoryRequestParameters uses config values when positive`() {
val (window, max) = MeshService.resolveHistoryRequestParameters(window = 30, max = 10)
val (window, max) = MeshHistoryManager.resolveHistoryRequestParameters(window = 30, max = 10)
assertEquals(30, window)
assertEquals(10, max)
@@ -59,7 +59,7 @@ class StoreForwardHistoryRequestTest {
@Test
fun `resolveHistoryRequestParameters falls back to defaults when non-positive`() {
val (window, max) = MeshService.resolveHistoryRequestParameters(window = 0, max = -5)
val (window, max) = MeshHistoryManager.resolveHistoryRequestParameters(window = 0, max = -5)
assertEquals(1440, window)
assertEquals(100, max)