fix: clamp future lastHeard timestamps to current time on ingestion (#5418)

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
This commit is contained in:
James Rich
2026-05-12 07:35:11 -05:00
committed by GitHub
parent 6f42f07887
commit 0f2b1c064a
7 changed files with 35 additions and 12 deletions

View File

@@ -36,5 +36,8 @@ val nowSeconds: Long
val systemTimeZone: TimeZone
get() = TimeZone.currentSystemDefault()
/** Clamps a seconds-since-epoch timestamp so it never exceeds the current wall-clock time. */
fun clampTimestampToNow(epochSeconds: Int): Int = minOf(epochSeconds, nowSeconds.toInt())
/** Converts these milliseconds to an [Instant]. */
fun Long.toInstant(): Instant = Instant.fromEpochMilliseconds(this)

View File

@@ -17,6 +17,7 @@
package org.meshtastic.core.common.util
import kotlin.test.Test
import kotlin.test.assertEquals
import kotlin.test.assertTrue
class TimeUtilsTest {
@@ -32,4 +33,22 @@ class TimeUtilsTest {
val start = nowSeconds
assertTrue(start > 0)
}
@Test
fun clampTimestampToNow_pastTimestamp_unchanged() {
val past = (nowSeconds - 3600).toInt()
assertEquals(past, clampTimestampToNow(past))
}
@Test
fun clampTimestampToNow_futureTimestamp_clampedToNow() {
val future = (nowSeconds + 86400).toInt()
val clamped = clampTimestampToNow(future)
assertTrue(clamped <= nowSeconds.toInt())
}
@Test
fun clampTimestampToNow_zero_unchanged() {
assertEquals(0, clampTimestampToNow(0))
}
}

View File

@@ -26,6 +26,7 @@ import kotlinx.coroutines.sync.Mutex
import kotlinx.coroutines.sync.withLock
import org.koin.core.annotation.Named
import org.koin.core.annotation.Single
import org.meshtastic.core.common.util.clampTimestampToNow
import org.meshtastic.core.common.util.handledLaunch
import org.meshtastic.core.common.util.nowMillis
import org.meshtastic.core.common.util.nowSeconds
@@ -244,7 +245,7 @@ class MeshMessageProcessorImpl(
}
node.copy(
lastHeard = packet.rx_time,
lastHeard = clampTimestampToNow(packet.rx_time),
viaMqtt = viaMqtt,
lastTransport = packet.transport_mechanism.value,
snr = snr,

View File

@@ -26,6 +26,7 @@ import kotlinx.coroutines.flow.first
import okio.ByteString
import org.koin.core.annotation.Named
import org.koin.core.annotation.Single
import org.meshtastic.core.common.util.clampTimestampToNow
import org.meshtastic.core.common.util.handledLaunch
import org.meshtastic.core.model.DataPacket
import org.meshtastic.core.model.DeviceMetrics
@@ -241,7 +242,8 @@ class NodeManagerImpl(
}
updateNode(fromNum) { node ->
val posTime = if (p.time != 0) p.time else (defaultTime / TIME_MS_TO_S).toInt()
val rawPosTime = if (p.time != 0) p.time else (defaultTime / TIME_MS_TO_S).toInt()
val posTime = clampTimestampToNow(rawPosTime)
val newLastHeard = maxOf(node.lastHeard, posTime)
val newPos =
@@ -268,7 +270,7 @@ class NodeManagerImpl(
telemetry.environment_metrics?.let { nextNode = nextNode.copy(environmentMetrics = it) }
telemetry.power_metrics?.let { nextNode = nextNode.copy(powerMetrics = it) }
val telemetryTime = if (telemetry.time != 0) telemetry.time else node.lastHeard
val newLastHeard = maxOf(node.lastHeard, telemetryTime)
val newLastHeard = clampTimestampToNow(maxOf(node.lastHeard, telemetryTime))
nextNode.copy(lastHeard = newLastHeard)
}
}
@@ -303,11 +305,12 @@ class NodeManagerImpl(
}
val position = info.position
if (position != null) {
next = next.copy(position = position)
val clampedPos = position.copy(time = clampTimestampToNow(position.time))
next = next.copy(position = clampedPos)
}
next =
next.copy(
lastHeard = info.last_heard,
lastHeard = clampTimestampToNow(info.last_heard),
deviceMetrics = info.device_metrics ?: next.deviceMetrics,
channel = info.channel,
viaMqtt = info.via_mqtt,

View File

@@ -23,6 +23,7 @@ import kotlinx.coroutines.sync.Mutex
import kotlinx.coroutines.sync.withLock
import org.koin.core.annotation.Named
import org.koin.core.annotation.Single
import org.meshtastic.core.common.util.clampTimestampToNow
import org.meshtastic.core.common.util.nowSeconds
import org.meshtastic.core.model.DataPacket
import org.meshtastic.core.model.Node
@@ -123,7 +124,7 @@ class TelemetryPacketHandlerImpl(
}
val telemetryTime = if (t.time != 0) t.time else nextNode.lastHeard
val newLastHeard = maxOf(nextNode.lastHeard, telemetryTime)
val newLastHeard = clampTimestampToNow(maxOf(nextNode.lastHeard, telemetryTime))
nextNode.copy(lastHeard = newLastHeard)
}
}

View File

@@ -39,13 +39,11 @@ import kotlinx.coroutines.async
import kotlinx.coroutines.cancel
import kotlinx.coroutines.launch
import kotlinx.coroutines.test.runTest
import kotlinx.coroutines.withTimeout
import kotlin.test.Test
import kotlin.test.assertContentEquals
import kotlin.test.assertEquals
import kotlin.test.assertIs
import kotlin.test.assertNotNull
import kotlin.test.assertNull
import kotlin.test.assertTrue
@OptIn(ExperimentalCoroutinesApi::class)
@@ -206,13 +204,11 @@ class WifiOtaTransportTest {
@Test
fun `close resets transport and closes TCP connection`() = runTest {
val (transport, server, connection) = createConnectedTransport()
val (transport, server, _) = createConnectedTransport()
try {
transport.close()
assertNull(withTimeout(5_000L) { connection.readLine() })
val result = transport.startOta(1L, "hash")
assertTrue(result.isFailure)
assertIs<OtaProtocolException.ConnectionFailed>(result.exceptionOrNull())