From 975df024374f37214603f84fb577581be3f24ce0 Mon Sep 17 00:00:00 2001 From: James Rich <2199651+jamesarich@users.noreply.github.com> Date: Thu, 9 Apr 2026 11:24:50 -0500 Subject: [PATCH] fix(tak): resolve frequent TAK client disconnections (#5015) --- .../core/takserver/TAKClientConnection.kt | 29 +++++++++++++++++-- .../meshtastic/core/takserver/TAKDefaults.kt | 2 ++ .../core/takserver/TAKDefaultsTest.kt | 19 ++++++++++++ 3 files changed, 47 insertions(+), 3 deletions(-) diff --git a/core/takserver/src/commonMain/kotlin/org/meshtastic/core/takserver/TAKClientConnection.kt b/core/takserver/src/commonMain/kotlin/org/meshtastic/core/takserver/TAKClientConnection.kt index 9a24d6721..16e75481c 100644 --- a/core/takserver/src/commonMain/kotlin/org/meshtastic/core/takserver/TAKClientConnection.kt +++ b/core/takserver/src/commonMain/kotlin/org/meshtastic/core/takserver/TAKClientConnection.kt @@ -52,6 +52,9 @@ class TAKClientConnection( private val writeChannel: ByteWriteChannel = socket.openWriteChannel(autoFlush = true) private val writeMutex = Mutex() + /** Tracks the last time data was received from the client, used for idle timeout detection. */ + @Volatile private var lastDataReceived: Instant = Clock.System.now() + /** Guards against emitting [TAKConnectionEvent.Disconnected] more than once. */ @Volatile private var disconnectedEmitted = false @@ -86,6 +89,7 @@ class TAKClientConnection( readChannel.awaitContent() val bytesRead = readChannel.readAvailable(buffer) if (bytesRead > 0) { + lastDataReceived = Clock.System.now() processReceivedData(buffer.copyOfRange(0, bytesRead)) } else if (bytesRead == -1) { break // EOF @@ -102,16 +106,34 @@ class TAKClientConnection( } private suspend fun keepaliveLoop() { + val idleTimeoutMs = TAK_KEEPALIVE_INTERVAL_MS * TAK_READ_IDLE_TIMEOUT_MULTIPLIER while (scope.coroutineIsActive && !socket.isClosed) { kotlinx.coroutines.delay(TAK_KEEPALIVE_INTERVAL_MS) + + val idleMs = (Clock.System.now() - lastDataReceived).inWholeMilliseconds + if (idleMs > idleTimeoutMs) { + Logger.w { + "TAK client ${currentClientInfo.id} idle for ${idleMs}ms " + + "(threshold ${idleTimeoutMs}ms), closing connection" + } + close() + return + } + sendKeepalive() } } private fun sendKeepalive() { val now = Clock.System.now() - val stale = now + TAK_KEEPALIVE_INTERVAL_MS.milliseconds - sendXml(buildEventXml(uid = "takPong", type = "t-x-d-d", now = now, stale = stale, detail = "")) + val stale = now + (TAK_KEEPALIVE_INTERVAL_MS * TAK_KEEPALIVE_STALE_MULTIPLIER).milliseconds + sendXml(buildEventXml(uid = "takPong", type = "t-x-c-t", now = now, stale = stale, detail = "")) + } + + private fun sendPong() { + val now = Clock.System.now() + val stale = now + (TAK_KEEPALIVE_INTERVAL_MS * TAK_KEEPALIVE_STALE_MULTIPLIER).milliseconds + sendXml(buildEventXml(uid = "takPong", type = "t-x-c-t-r", now = now, stale = stale, detail = "")) } private fun processReceivedData(newData: ByteArray) { @@ -131,7 +153,7 @@ class TAKClientConnection( return } cotMessage.type == "t-x-c-t" || cotMessage.uid == "ping" -> { - // Keepalive / ping — discard silently + sendPong() return } else -> { @@ -201,6 +223,7 @@ class TAKClientConnection( throw e } catch (e: Exception) { Logger.w(e) { "TAK client send error: ${currentClientInfo.id}" } + close() } } } diff --git a/core/takserver/src/commonMain/kotlin/org/meshtastic/core/takserver/TAKDefaults.kt b/core/takserver/src/commonMain/kotlin/org/meshtastic/core/takserver/TAKDefaults.kt index eef798bf9..8dd76bd05 100644 --- a/core/takserver/src/commonMain/kotlin/org/meshtastic/core/takserver/TAKDefaults.kt +++ b/core/takserver/src/commonMain/kotlin/org/meshtastic/core/takserver/TAKDefaults.kt @@ -29,6 +29,8 @@ internal const val DEFAULT_TAK_STALE_MINUTES = 10 internal const val TAK_HEX_RADIX = 16 internal const val TAK_XML_READ_BUFFER_SIZE = 4_096 internal const val TAK_KEEPALIVE_INTERVAL_MS = 30_000L +internal const val TAK_KEEPALIVE_STALE_MULTIPLIER = 3 +internal const val TAK_READ_IDLE_TIMEOUT_MULTIPLIER = 5 internal const val TAK_ACCEPT_LOOP_DELAY_MS = 100L internal const val TAK_COORDINATE_SCALE = 1e7 internal const val TAK_UNKNOWN_POINT_VALUE = 9_999_999.0 diff --git a/core/takserver/src/commonTest/kotlin/org/meshtastic/core/takserver/TAKDefaultsTest.kt b/core/takserver/src/commonTest/kotlin/org/meshtastic/core/takserver/TAKDefaultsTest.kt index d490e2f73..679b5beed 100644 --- a/core/takserver/src/commonTest/kotlin/org/meshtastic/core/takserver/TAKDefaultsTest.kt +++ b/core/takserver/src/commonTest/kotlin/org/meshtastic/core/takserver/TAKDefaultsTest.kt @@ -21,6 +21,7 @@ import org.meshtastic.proto.Team import org.meshtastic.proto.User import kotlin.test.Test import kotlin.test.assertEquals +import kotlin.test.assertTrue class TAKDefaultsTest { @@ -104,4 +105,22 @@ class TAKDefaultsTest { val user = User(id = "!1234", long_name = "", short_name = "") assertEquals("!1234", user.toTakCallsign()) } + + // ── keepalive / idle timeout constants ───────────────────────────────────── + + @Test + fun `keepalive stale window is wider than keepalive interval`() { + val staleMs = TAK_KEEPALIVE_INTERVAL_MS * TAK_KEEPALIVE_STALE_MULTIPLIER + assertTrue( + staleMs > TAK_KEEPALIVE_INTERVAL_MS, + "Stale window ($staleMs ms) must exceed keepalive interval ($TAK_KEEPALIVE_INTERVAL_MS ms)", + ) + } + + @Test + fun `idle timeout exceeds keepalive stale window`() { + val idleTimeoutMs = TAK_KEEPALIVE_INTERVAL_MS * TAK_READ_IDLE_TIMEOUT_MULTIPLIER + val staleMs = TAK_KEEPALIVE_INTERVAL_MS * TAK_KEEPALIVE_STALE_MULTIPLIER + assertTrue(idleTimeoutMs > staleMs, "Idle timeout ($idleTimeoutMs ms) must exceed stale window ($staleMs ms)") + } }