feat: Improve node handling and add client notifications (#4130)

Signed-off-by: James Rich <2199651+jamesarich@users.noreply.github.com>
This commit is contained in:
James Rich
2026-01-04 11:32:13 -06:00
committed by GitHub
parent 2c9f410baa
commit e5f78d101c
5 changed files with 59 additions and 22 deletions

View File

@@ -17,6 +17,7 @@
package com.geeksville.mesh.service
import co.touchlab.kermit.Logger
import org.meshtastic.core.service.MeshServiceNotifications
import org.meshtastic.core.service.ServiceRepository
import org.meshtastic.proto.MeshProtos
import javax.inject.Inject
@@ -34,6 +35,7 @@ constructor(
private val router: MeshRouter,
private val mqttManager: MeshMqttManager,
private val packetHandler: PacketHandler,
private val serviceNotifications: MeshServiceNotifications,
) {
@Suppress("CyclomaticComplexMethod")
fun handleFromRadio(proto: MeshProtos.FromRadio) {
@@ -56,6 +58,7 @@ constructor(
MeshProtos.FromRadio.PayloadVariantCase.CHANNEL -> router.configHandler.handleChannel(proto.channel)
MeshProtos.FromRadio.PayloadVariantCase.CLIENTNOTIFICATION -> {
serviceRepository.setClientNotification(proto.clientNotification)
serviceNotifications.showClientNotification(proto.clientNotification)
packetHandler.removeResponse(proto.clientNotification.replyId, complete = false)
}
// Logging-only variants are handled by MeshMessageProcessor before dispatching here

View File

@@ -56,6 +56,7 @@ constructor(
private val meshPrefs: MeshPrefs,
private val databaseManager: DatabaseManager,
private val serviceNotifications: MeshServiceNotifications,
private val messageProcessor: Lazy<MeshMessageProcessor>,
) {
private var scope: CoroutineScope = CoroutineScope(Dispatchers.IO + SupervisorJob())
@@ -304,6 +305,7 @@ constructor(
meshPrefs.deviceAddress = deviceAddr
scope.handledLaunch {
nodeManager.clear()
messageProcessor.get().clearEarlyPackets()
databaseManager.switchActiveDatabase(deviceAddr)
serviceNotifications.clearNotifications()
nodeManager.loadCachedNodeDB()

View File

@@ -41,7 +41,6 @@ import org.meshtastic.proto.Portnums
import org.meshtastic.proto.TelemetryProtos
import org.meshtastic.proto.position
import org.meshtastic.proto.telemetry
import java.io.IOException
import java.util.concurrent.ConcurrentHashMap
import java.util.concurrent.CopyOnWriteArrayList
import java.util.concurrent.atomic.AtomicLong
@@ -122,6 +121,7 @@ constructor(
fun sendData(p: DataPacket) {
if (p.id == 0) p.id = generatePacketId()
val bytes = p.bytes ?: ByteArray(0)
require(p.dataType != 0) { "Port numbers must be non-zero!" }
if (bytes.size >= MeshProtos.Constants.DATA_PAYLOAD_LEN_VALUE) {
p.status = MessageStatus.ERROR
throw RemoteException("Message too long")
@@ -132,7 +132,7 @@ constructor(
if (connectionStateHolder?.connectionState?.value == ConnectionState.Connected) {
try {
sendNow(p)
} catch (ex: IOException) {
} catch (@Suppress("TooGenericExceptionCaught") ex: Exception) {
Logger.e(ex) { "Error sending message, so enqueueing" }
enqueueForSending(p)
}
@@ -170,7 +170,7 @@ constructor(
try {
sendNow(p)
sentPackets.add(p)
} catch (ex: IOException) {
} catch (@Suppress("TooGenericExceptionCaught") ex: Exception) {
Logger.e(ex) { "Error sending queued message:" }
}
}

View File

@@ -43,6 +43,7 @@ import javax.inject.Inject
import javax.inject.Singleton
import kotlin.time.Duration.Companion.milliseconds
@Suppress("TooManyFunctions")
@Singleton
class MeshMessageProcessor
@Inject
@@ -60,6 +61,10 @@ constructor(
private val earlyReceivedPackets = ArrayDeque<MeshPacket>()
private val maxEarlyPacketBuffer = 128
fun clearEarlyPackets() {
synchronized(earlyReceivedPackets) { earlyReceivedPackets.clear() }
}
fun start(scope: CoroutineScope) {
this.scope = scope
nodeManager.isNodeDbReady
@@ -112,8 +117,13 @@ constructor(
PayloadVariantCase.XMODEMPACKET -> "XmodemPacket" to proto.xmodemPacket.toString()
PayloadVariantCase.DEVICEUICONFIG -> "DeviceUIConfig" to proto.deviceuiConfig.toString()
PayloadVariantCase.FILEINFO -> "FileInfo" to proto.fileInfo.toString()
else -> return // Other variants (Config, NodeInfo, etc.) are handled by dispatcher but not necessarily
// logged as raw strings here
PayloadVariantCase.MY_INFO -> "MyInfo" to proto.myInfo.toString()
PayloadVariantCase.NODE_INFO -> "NodeInfo" to proto.nodeInfo.toString()
PayloadVariantCase.CONFIG -> "Config" to proto.config.toString()
PayloadVariantCase.MODULECONFIG -> "ModuleConfig" to proto.moduleConfig.toString()
PayloadVariantCase.CHANNEL -> "Channel" to proto.channel.toString()
PayloadVariantCase.CLIENTNOTIFICATION -> "ClientNotification" to proto.clientNotification.toString()
else -> return
}
insertMeshLog(
@@ -208,7 +218,9 @@ constructor(
it.hopsAway =
if (packet.decoded.portnumValue == Portnums.PortNum.RANGE_TEST_APP_VALUE) {
0
} else if (packet.hopStart == 0 || packet.hopLimit > packet.hopStart) {
} else if (packet.hopStart == 0 && !packet.decoded.hasBitfield()) {
-1
} else if (packet.hopLimit > packet.hopStart) {
-1
} else {
packet.hopStart - packet.hopLimit

View File

@@ -85,30 +85,50 @@ interface NodeInfoDao {
}
private fun handleExistingNodeUpsertValidation(existingNode: NodeEntity, incomingNode: NodeEntity): NodeEntity {
val isPlaceholder = incomingNode.user.hwModel == MeshProtos.HardwareModel.UNSET
val hasExistingUser = existingNode.user.hwModel != MeshProtos.HardwareModel.UNSET
val isDefaultName = incomingNode.user.longName.matches(Regex("^Meshtastic [0-9a-fA-F]{4}$"))
val shouldPreserve = hasExistingUser && isPlaceholder && isDefaultName
if (shouldPreserve) {
// Preserve existing name and user info, but update metadata like lastHeard, SNR, and position.
val resolvedNotes = if (incomingNode.notes.isBlank()) existingNode.notes else incomingNode.notes
return existingNode.copy(
lastHeard = incomingNode.lastHeard,
snr = incomingNode.snr,
rssi = incomingNode.rssi,
position = incomingNode.position,
hopsAway = incomingNode.hopsAway,
deviceTelemetry = incomingNode.deviceTelemetry,
environmentTelemetry = incomingNode.environmentTelemetry,
powerTelemetry = incomingNode.powerTelemetry,
paxcounter = incomingNode.paxcounter,
channel = incomingNode.channel,
viaMqtt = incomingNode.viaMqtt,
notes = resolvedNotes,
)
}
// A public key is considered matching if the incoming key equals the existing key,
// OR if the existing key is empty (allowing a new key to be set or an update to proceed).
val isPublicKeyMatchingOrExistingIsEmpty =
existingNode.user.publicKey == incomingNode.publicKey || existingNode.user.publicKey.isEmpty
val existingResolvedKey = existingNode.publicKey ?: existingNode.user.publicKey
val isPublicKeyMatchingOrExistingIsEmpty = existingResolvedKey == incomingNode.publicKey || !existingNode.hasPKC
val isPlaceholder = incomingNode.user.hwModel == MeshProtos.HardwareModel.UNSET
val resolvedNotes = if (incomingNode.notes.isBlank()) existingNode.notes else incomingNode.notes
return if (isPublicKeyMatchingOrExistingIsEmpty) {
// Keys match or existing key was empty: trust the incoming node data completely.
// This allows for legitimate updates to user info and other fields.
val resolvedNotes = if (incomingNode.notes.isBlank()) existingNode.notes else incomingNode.notes
val resolvedLongName = if (isPlaceholder) null else incomingNode.longName ?: existingNode.longName
val resolvedShortName = if (isPlaceholder) null else incomingNode.shortName ?: existingNode.shortName
incomingNode.copy(notes = resolvedNotes, longName = resolvedLongName, shortName = resolvedShortName)
incomingNode.copy(notes = resolvedNotes)
} else {
existingNode.copy(
lastHeard = incomingNode.lastHeard,
snr = incomingNode.snr,
position = incomingNode.position,
// Preserve the existing user object, but update its internal public key to EMPTY
// to reflect the conflict state.
user = existingNode.user.toBuilder().setPublicKey(ByteString.EMPTY).build(),
publicKey = ByteString.EMPTY,
notes = existingNode.notes,
// Public key mismatch: This could be a factory reset or a hardware ID collision.
// We allow the name and user info to update, but we clear the public key
// to indicate that this node is no longer "verified" against the previous key.
incomingNode.copy(
user = incomingNode.user.toBuilder().setPublicKey(NodeEntity.ERROR_BYTE_STRING).build(),
publicKey = NodeEntity.ERROR_BYTE_STRING,
notes = resolvedNotes,
)
}
}