From 0f2b1c064aa9b77685ac8a7d05fe81af89655027 Mon Sep 17 00:00:00 2001 From: James Rich <2199651+jamesarich@users.noreply.github.com> Date: Tue, 12 May 2026 07:35:11 -0500 Subject: [PATCH] fix: clamp future lastHeard timestamps to current time on ingestion (#5418) Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> --- .../meshtastic/core/common/util/TimeUtils.kt | 3 +++ .../core/common/util/TimeUtilsTest.kt | 19 +++++++++++++++++++ .../data/manager/MeshMessageProcessorImpl.kt | 3 ++- .../core/data/manager/NodeManagerImpl.kt | 11 +++++++---- .../manager/TelemetryPacketHandlerImpl.kt | 3 ++- core/proto/src/main/proto | 2 +- .../firmware/ota/WifiOtaTransportTest.kt | 6 +----- 7 files changed, 35 insertions(+), 12 deletions(-) diff --git a/core/common/src/commonMain/kotlin/org/meshtastic/core/common/util/TimeUtils.kt b/core/common/src/commonMain/kotlin/org/meshtastic/core/common/util/TimeUtils.kt index f53719274..231c17a91 100644 --- a/core/common/src/commonMain/kotlin/org/meshtastic/core/common/util/TimeUtils.kt +++ b/core/common/src/commonMain/kotlin/org/meshtastic/core/common/util/TimeUtils.kt @@ -36,5 +36,8 @@ val nowSeconds: Long val systemTimeZone: TimeZone get() = TimeZone.currentSystemDefault() +/** Clamps a seconds-since-epoch timestamp so it never exceeds the current wall-clock time. */ +fun clampTimestampToNow(epochSeconds: Int): Int = minOf(epochSeconds, nowSeconds.toInt()) + /** Converts these milliseconds to an [Instant]. */ fun Long.toInstant(): Instant = Instant.fromEpochMilliseconds(this) diff --git a/core/common/src/commonTest/kotlin/org/meshtastic/core/common/util/TimeUtilsTest.kt b/core/common/src/commonTest/kotlin/org/meshtastic/core/common/util/TimeUtilsTest.kt index dc7d4a4ff..13a808a11 100644 --- a/core/common/src/commonTest/kotlin/org/meshtastic/core/common/util/TimeUtilsTest.kt +++ b/core/common/src/commonTest/kotlin/org/meshtastic/core/common/util/TimeUtilsTest.kt @@ -17,6 +17,7 @@ package org.meshtastic.core.common.util import kotlin.test.Test +import kotlin.test.assertEquals import kotlin.test.assertTrue class TimeUtilsTest { @@ -32,4 +33,22 @@ class TimeUtilsTest { val start = nowSeconds assertTrue(start > 0) } + + @Test + fun clampTimestampToNow_pastTimestamp_unchanged() { + val past = (nowSeconds - 3600).toInt() + assertEquals(past, clampTimestampToNow(past)) + } + + @Test + fun clampTimestampToNow_futureTimestamp_clampedToNow() { + val future = (nowSeconds + 86400).toInt() + val clamped = clampTimestampToNow(future) + assertTrue(clamped <= nowSeconds.toInt()) + } + + @Test + fun clampTimestampToNow_zero_unchanged() { + assertEquals(0, clampTimestampToNow(0)) + } } diff --git a/core/data/src/commonMain/kotlin/org/meshtastic/core/data/manager/MeshMessageProcessorImpl.kt b/core/data/src/commonMain/kotlin/org/meshtastic/core/data/manager/MeshMessageProcessorImpl.kt index 1e1406090..e93ea478e 100644 --- a/core/data/src/commonMain/kotlin/org/meshtastic/core/data/manager/MeshMessageProcessorImpl.kt +++ b/core/data/src/commonMain/kotlin/org/meshtastic/core/data/manager/MeshMessageProcessorImpl.kt @@ -26,6 +26,7 @@ import kotlinx.coroutines.sync.Mutex import kotlinx.coroutines.sync.withLock import org.koin.core.annotation.Named import org.koin.core.annotation.Single +import org.meshtastic.core.common.util.clampTimestampToNow import org.meshtastic.core.common.util.handledLaunch import org.meshtastic.core.common.util.nowMillis import org.meshtastic.core.common.util.nowSeconds @@ -244,7 +245,7 @@ class MeshMessageProcessorImpl( } node.copy( - lastHeard = packet.rx_time, + lastHeard = clampTimestampToNow(packet.rx_time), viaMqtt = viaMqtt, lastTransport = packet.transport_mechanism.value, snr = snr, diff --git a/core/data/src/commonMain/kotlin/org/meshtastic/core/data/manager/NodeManagerImpl.kt b/core/data/src/commonMain/kotlin/org/meshtastic/core/data/manager/NodeManagerImpl.kt index c3e21955a..e054f1a2c 100644 --- a/core/data/src/commonMain/kotlin/org/meshtastic/core/data/manager/NodeManagerImpl.kt +++ b/core/data/src/commonMain/kotlin/org/meshtastic/core/data/manager/NodeManagerImpl.kt @@ -26,6 +26,7 @@ import kotlinx.coroutines.flow.first import okio.ByteString import org.koin.core.annotation.Named import org.koin.core.annotation.Single +import org.meshtastic.core.common.util.clampTimestampToNow import org.meshtastic.core.common.util.handledLaunch import org.meshtastic.core.model.DataPacket import org.meshtastic.core.model.DeviceMetrics @@ -241,7 +242,8 @@ class NodeManagerImpl( } updateNode(fromNum) { node -> - val posTime = if (p.time != 0) p.time else (defaultTime / TIME_MS_TO_S).toInt() + val rawPosTime = if (p.time != 0) p.time else (defaultTime / TIME_MS_TO_S).toInt() + val posTime = clampTimestampToNow(rawPosTime) val newLastHeard = maxOf(node.lastHeard, posTime) val newPos = @@ -268,7 +270,7 @@ class NodeManagerImpl( telemetry.environment_metrics?.let { nextNode = nextNode.copy(environmentMetrics = it) } telemetry.power_metrics?.let { nextNode = nextNode.copy(powerMetrics = it) } val telemetryTime = if (telemetry.time != 0) telemetry.time else node.lastHeard - val newLastHeard = maxOf(node.lastHeard, telemetryTime) + val newLastHeard = clampTimestampToNow(maxOf(node.lastHeard, telemetryTime)) nextNode.copy(lastHeard = newLastHeard) } } @@ -303,11 +305,12 @@ class NodeManagerImpl( } val position = info.position if (position != null) { - next = next.copy(position = position) + val clampedPos = position.copy(time = clampTimestampToNow(position.time)) + next = next.copy(position = clampedPos) } next = next.copy( - lastHeard = info.last_heard, + lastHeard = clampTimestampToNow(info.last_heard), deviceMetrics = info.device_metrics ?: next.deviceMetrics, channel = info.channel, viaMqtt = info.via_mqtt, diff --git a/core/data/src/commonMain/kotlin/org/meshtastic/core/data/manager/TelemetryPacketHandlerImpl.kt b/core/data/src/commonMain/kotlin/org/meshtastic/core/data/manager/TelemetryPacketHandlerImpl.kt index 078089396..af1466e69 100644 --- a/core/data/src/commonMain/kotlin/org/meshtastic/core/data/manager/TelemetryPacketHandlerImpl.kt +++ b/core/data/src/commonMain/kotlin/org/meshtastic/core/data/manager/TelemetryPacketHandlerImpl.kt @@ -23,6 +23,7 @@ import kotlinx.coroutines.sync.Mutex import kotlinx.coroutines.sync.withLock import org.koin.core.annotation.Named import org.koin.core.annotation.Single +import org.meshtastic.core.common.util.clampTimestampToNow import org.meshtastic.core.common.util.nowSeconds import org.meshtastic.core.model.DataPacket import org.meshtastic.core.model.Node @@ -123,7 +124,7 @@ class TelemetryPacketHandlerImpl( } val telemetryTime = if (t.time != 0) t.time else nextNode.lastHeard - val newLastHeard = maxOf(nextNode.lastHeard, telemetryTime) + val newLastHeard = clampTimestampToNow(maxOf(nextNode.lastHeard, telemetryTime)) nextNode.copy(lastHeard = newLastHeard) } } diff --git a/core/proto/src/main/proto b/core/proto/src/main/proto index 1c6254062..b302d9233 160000 --- a/core/proto/src/main/proto +++ b/core/proto/src/main/proto @@ -1 +1 @@ -Subproject commit 1c6254062b3f726893b79350aaf8d506eb28503a +Subproject commit b302d923327402fbe49efcf15ff1b6ef2361b22b diff --git a/feature/firmware/src/commonTest/kotlin/org/meshtastic/feature/firmware/ota/WifiOtaTransportTest.kt b/feature/firmware/src/commonTest/kotlin/org/meshtastic/feature/firmware/ota/WifiOtaTransportTest.kt index 826be9d6f..096b1f782 100644 --- a/feature/firmware/src/commonTest/kotlin/org/meshtastic/feature/firmware/ota/WifiOtaTransportTest.kt +++ b/feature/firmware/src/commonTest/kotlin/org/meshtastic/feature/firmware/ota/WifiOtaTransportTest.kt @@ -39,13 +39,11 @@ import kotlinx.coroutines.async import kotlinx.coroutines.cancel import kotlinx.coroutines.launch import kotlinx.coroutines.test.runTest -import kotlinx.coroutines.withTimeout import kotlin.test.Test import kotlin.test.assertContentEquals import kotlin.test.assertEquals import kotlin.test.assertIs import kotlin.test.assertNotNull -import kotlin.test.assertNull import kotlin.test.assertTrue @OptIn(ExperimentalCoroutinesApi::class) @@ -206,13 +204,11 @@ class WifiOtaTransportTest { @Test fun `close resets transport and closes TCP connection`() = runTest { - val (transport, server, connection) = createConnectedTransport() + val (transport, server, _) = createConnectedTransport() try { transport.close() - assertNull(withTimeout(5_000L) { connection.readLine() }) - val result = transport.startOta(1L, "hash") assertTrue(result.isFailure) assertIs(result.exceptionOrNull())