feat(network): migrate TcpTransport to ktor-network (commonMain) (#5995)

Co-authored-by: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
James Rich
2026-06-28 10:19:36 -05:00
committed by GitHub
parent a048ea1c94
commit 1d44fd15f8
5 changed files with 334 additions and 72 deletions

View File

@@ -47,6 +47,7 @@ kotlin {
implementation(libs.kotlinx.serialization.json)
implementation(libs.kotlinx.atomicfu)
implementation(libs.ktor.client.core)
implementation(libs.ktor.network) // raw TCP sockets for TcpTransport (KMP-common)
implementation(libs.ktor.client.content.negotiation)
implementation(libs.ktor.client.logging)
implementation(libs.ktor.serialization.kotlinx.json)

View File

@@ -114,7 +114,11 @@ class StreamFrameCodec(
*
* Thread-safe via an internal mutex — multiple callers can call this concurrently.
*/
suspend fun frameAndSend(payload: ByteArray, sendBytes: (ByteArray) -> Unit, flush: () -> Unit = {}) {
suspend fun frameAndSend(
payload: ByteArray,
sendBytes: suspend (ByteArray) -> Unit,
flush: suspend () -> Unit = {},
) {
writeMutex.withLock {
val header = ByteArray(HEADER_SIZE)
header[0] = START1

View File

@@ -17,23 +17,31 @@
package org.meshtastic.core.network.transport
import co.touchlab.kermit.Logger
import io.ktor.network.selector.SelectorManager
import io.ktor.network.sockets.InetSocketAddress
import io.ktor.network.sockets.Socket
import io.ktor.network.sockets.aSocket
import io.ktor.network.sockets.openReadChannel
import io.ktor.network.sockets.openWriteChannel
import io.ktor.utils.io.ByteReadChannel
import io.ktor.utils.io.ByteWriteChannel
import io.ktor.utils.io.readAvailable
import io.ktor.utils.io.writeFully
import kotlinx.atomicfu.atomic
import kotlinx.coroutines.CancellationException
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Job
import kotlinx.coroutines.TimeoutCancellationException
import kotlinx.coroutines.delay
import kotlinx.coroutines.withContext
import kotlinx.coroutines.withTimeout
import kotlinx.coroutines.withTimeoutOrNull
import kotlinx.io.IOException
import org.meshtastic.core.common.util.handledLaunch
import org.meshtastic.core.common.util.nowMillis
import org.meshtastic.core.di.CoroutineDispatchers
import org.meshtastic.proto.ToRadio
import java.io.BufferedInputStream
import java.io.BufferedOutputStream
import java.io.IOException
import java.io.OutputStream
import java.net.InetAddress
import java.net.Socket
import java.net.SocketTimeoutException
import java.util.concurrent.atomic.AtomicBoolean
import java.util.concurrent.atomic.AtomicInteger
import kotlin.concurrent.Volatile
/**
* Decides whether to reset the reconnect backoff based on session data and uptime.
@@ -46,13 +54,14 @@ internal fun shouldResetBackoff(hadData: Boolean, sessionUptimeMs: Long, thresho
hadData && sessionUptimeMs >= thresholdMs
/**
* Shared JVM TCP transport for Meshtastic radios.
* Shared TCP transport for Meshtastic radios.
*
* Manages the TCP socket lifecycle (connect, read loop, reconnect with backoff) and uses [StreamFrameCodec] for the
* START1/START2 stream framing protocol. [sendHeartbeat] sends a heartbeat with a monotonically-increasing nonce so the
* firmware's per-connection duplicate-write filter does not silently drop it.
*
* Used by Android and Desktop via the shared `SharedRadioInterfaceService`.
* Uses Ktor raw sockets (`ktor-network`) so the implementation is KMP-common shared by Android, Desktop, and (once
* wired) iOS via the shared `SharedRadioInterfaceService`.
*/
@Suppress("TooManyFunctions", "MagicNumber")
class TcpTransport(
@@ -82,9 +91,15 @@ class TcpTransport(
const val MAX_RECONNECT_RETRIES = Int.MAX_VALUE
const val MIN_BACKOFF_MILLIS = 1_000L
const val MAX_BACKOFF_MILLIS = 5 * 60 * 1_000L
const val SOCKET_TIMEOUT_MS = 5_000
/** Per-read inactivity timeout. Combined with [SOCKET_RETRIES] this gives the 90s idle-disconnect window. */
const val SOCKET_TIMEOUT_MS = 5_000L
const val SOCKET_RETRIES = 18 // 18 * 5s = 90s inactivity before disconnect
const val TIMEOUT_LOG_INTERVAL = 5
/** TCP connect timeout. A failed connect just feeds the reconnect/backoff loop, so it is not fatal. */
const val CONNECT_TIMEOUT_MS = 30_000L
private const val READ_BUFFER_SIZE = 1024
private const val MILLIS_PER_SECOND = 1_000L
/**
@@ -106,14 +121,18 @@ class TcpTransport(
)
// TCP socket state
@Volatile private var selectorManager: SelectorManager? = null
@Volatile private var socket: Socket? = null
@Volatile private var outStream: OutputStream? = null
@Volatile private var writeChannel: ByteWriteChannel? = null
@Volatile private var connectionJob: Job? = null
@Volatile private var currentAddress: String? = null
@Volatile private var connected: Boolean = false
// Metrics
@Volatile private var connectionStartTime: Long = 0
@@ -127,14 +146,11 @@ class TcpTransport(
@Volatile private var timeoutEvents: Int = 0
private val heartbeatNonce = AtomicInteger(0)
private val heartbeatNonce = atomic(0)
/** Whether the transport is currently connected. */
val isConnected: Boolean
get() {
val s = socket ?: return false
return s.isConnected && !s.isClosed
}
get() = connected && socket?.socketContext?.isActive == true
/**
* Start a TCP connection to the given address with automatic reconnect.
@@ -184,10 +200,18 @@ class TcpTransport(
val hadData =
try {
connectAndRead(address)
} catch (ex: IOException) {
Logger.w { "$logTag: [$address] TCP connection error" }
} catch (ex: TimeoutCancellationException) {
Logger.w(ex) { "$logTag: [$address] TCP connect timed out" }
disconnectSocket()
false
} catch (ex: IOException) {
Logger.w(ex) { "$logTag: [$address] TCP connection error" }
disconnectSocket()
false
} catch (ce: CancellationException) {
// Outer-scope cancellation (stop()) — tear down and let it propagate to end the loop.
disconnectSocket()
throw ce
} catch (@Suppress("TooGenericExceptionCaught") ex: Throwable) {
Logger.e(ex) { "$logTag: [$address] TCP exception" }
disconnectSocket()
@@ -203,8 +227,9 @@ class TcpTransport(
retryCount = 1
backoff = MIN_BACKOFF_MILLIS
} else if (hadData) {
val backoffSec = backoff / MILLIS_PER_SECOND
Logger.d {
"$logTag: [$address] Short session (${sessionUptime}ms) — keeping backoff at ${backoff / MILLIS_PER_SECOND}s"
"$logTag: [$address] Short session (${sessionUptime}ms) — keeping backoff at ${backoffSec}s"
}
}
@@ -229,12 +254,18 @@ class TcpTransport(
Logger.i { "$logTag: [$address] Connecting to $host:$port" }
val attemptStart = nowMillis
Socket(InetAddress.getByName(host), port).use { sock ->
sock.tcpNoDelay = true
sock.keepAlive = true
sock.soTimeout = SOCKET_TIMEOUT_MS
socket = sock
val selector = SelectorManager(dispatchers.io)
selectorManager = selector
val sock =
withTimeout(CONNECT_TIMEOUT_MS) {
aSocket(selector).tcp().connect(InetSocketAddress(host, port)) {
noDelay = true
keepAlive = true
}
}
socket = sock
try {
val connectTime = nowMillis - attemptStart
connectionStartTime = nowMillis
resetMetrics()
@@ -242,55 +273,70 @@ class TcpTransport(
Logger.i { "$logTag: [$address] Socket connected in ${connectTime}ms" }
BufferedOutputStream(sock.getOutputStream()).use { output ->
outStream = output
val output = sock.openWriteChannel(autoFlush = false)
writeChannel = output
val input: ByteReadChannel = sock.openReadChannel()
BufferedInputStream(sock.getInputStream()).use { input ->
// Send wake bytes and signal connected
sendBytesRaw(StreamFrameCodec.WAKE_BYTES)
listener.onConnected()
// Send wake bytes and signal connected
sendBytesRaw(StreamFrameCodec.WAKE_BYTES)
flushBytes()
connected = true
listener.onConnected()
// Read loop
var timeoutCount = 0
while (timeoutCount < SOCKET_RETRIES) {
try {
val c = input.read()
if (c == -1) {
Logger.i { "$logTag: [$address] EOF after $packetsReceived packets" }
break
}
timeoutCount = 0
bytesReceived++
codec.processInputByte(c.toByte())
} catch (_: SocketTimeoutException) {
timeoutCount++
timeoutEvents++
if (timeoutCount % TIMEOUT_LOG_INTERVAL == 0) {
Logger.d { "$logTag: [$address] Timeout $timeoutCount/$SOCKET_RETRIES" }
}
}
}
readLoop(address, input)
if (timeoutCount >= SOCKET_RETRIES) {
Logger.w { "$logTag: [$address] Closing after $SOCKET_RETRIES consecutive timeouts" }
}
}
}
val hadData = bytesReceived > 0
bytesReceived > 0
} finally {
disconnectSocket()
hadData
}
}
/**
* Read until EOF or [SOCKET_RETRIES] consecutive inactivity timeouts. [withTimeoutOrNull] gives a *resumable*
* inactivity timeout: cancelling a parked `readAvailable` leaves the channel usable for the next iteration
* (validated in `TcpTransportTest`).
*/
@Suppress("NestedBlockDepth")
private suspend fun readLoop(address: String, input: ByteReadChannel) {
val buf = ByteArray(READ_BUFFER_SIZE)
var timeoutCount = 0
while (timeoutCount < SOCKET_RETRIES) {
val read = withTimeoutOrNull(SOCKET_TIMEOUT_MS) { input.readAvailable(buf) }
when {
read == null -> {
timeoutCount++
timeoutEvents++
if (timeoutCount % TIMEOUT_LOG_INTERVAL == 0) {
Logger.d { "$logTag: [$address] Timeout $timeoutCount/$SOCKET_RETRIES" }
}
}
read == -1 -> {
Logger.i { "$logTag: [$address] EOF after $packetsReceived packets" }
return
}
else -> {
timeoutCount = 0
bytesReceived += read
for (i in 0 until read) {
codec.processInputByte(buf[i])
}
}
}
}
Logger.w { "$logTag: [$address] Closing after $SOCKET_RETRIES consecutive timeouts" }
}
// Guards against recursive disconnects triggered by listener callbacks.
private val isDisconnecting = AtomicBoolean(false)
private val isDisconnecting = atomic(false)
private fun disconnectSocket() {
if (!isDisconnecting.compareAndSet(false, true)) return
if (!isDisconnecting.compareAndSet(expect = false, update = true)) return
try {
val s = socket
val hadConnection = s != null || outStream != null
val hadConnection = s != null || writeChannel != null
if (s != null) {
val uptime = if (connectionStartTime > 0) nowMillis - connectionStartTime else 0
Logger.i {
@@ -300,19 +346,22 @@ class TcpTransport(
}
try {
s.close()
} catch (_: IOException) {
// Ignore close errors
} catch (ex: IOException) {
Logger.w(ex) { "$logTag: [$currentAddress] Error closing socket" }
}
}
selectorManager?.close()
socket = null
outStream = null
writeChannel = null
selectorManager = null
connected = false
if (hadConnection) {
listener.onDisconnected()
}
} finally {
isDisconnecting.set(false)
isDisconnecting.value = false
}
}
@@ -320,23 +369,23 @@ class TcpTransport(
// region Byte I/O
private fun sendBytesRaw(p: ByteArray) {
private suspend fun sendBytesRaw(p: ByteArray) {
val stream =
outStream
writeChannel
?: run {
Logger.w { "$logTag: [$currentAddress] Cannot send ${p.size} bytes: not connected" }
return
}
try {
stream.write(p)
stream.writeFully(p)
} catch (ex: IOException) {
Logger.w(ex) { "$logTag: [$currentAddress] TCP write error" }
disconnectSocket()
}
}
private fun flushBytes() {
val stream = outStream ?: return
private suspend fun flushBytes() {
val stream = writeChannel ?: return
try {
stream.flush()
} catch (ex: IOException) {

View File

@@ -0,0 +1,208 @@
/*
* Copyright (c) 2026 Meshtastic LLC
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*/
@file:Suppress("MagicNumber")
package org.meshtastic.core.network.transport
import io.ktor.network.selector.SelectorManager
import io.ktor.network.sockets.InetSocketAddress
import io.ktor.network.sockets.ServerSocket
import io.ktor.network.sockets.aSocket
import io.ktor.network.sockets.openReadChannel
import io.ktor.network.sockets.openWriteChannel
import io.ktor.network.sockets.port
import io.ktor.utils.io.ByteWriteChannel
import io.ktor.utils.io.readAvailable
import io.ktor.utils.io.writeFully
import kotlinx.coroutines.CompletableDeferred
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.SupervisorJob
import kotlinx.coroutines.async
import kotlinx.coroutines.cancel
import kotlinx.coroutines.launch
import kotlinx.coroutines.test.runTest
import kotlinx.coroutines.withContext
import kotlinx.coroutines.withTimeout
import kotlinx.coroutines.withTimeoutOrNull
import org.meshtastic.core.di.CoroutineDispatchers
import kotlin.test.Test
import kotlin.test.assertContentEquals
import kotlin.test.assertEquals
import kotlin.test.assertNull
import kotlin.test.assertTrue
class TcpTransportTest {
/**
* THE SPIKE. The whole migration rests on this Ktor assumption: a `withTimeoutOrNull` that cancels a parked
* `readAvailable` must leave the channel usable for the next read — that is what reproduces the old
* `Socket.soTimeout` resumable inactivity timeout. If this fails, switch [TcpTransport]'s read loop to the watchdog
* fallback (see plan).
*/
@Test
fun `read channel survives a withTimeoutOrNull read timeout and resumes`() = runTest {
withContext(Dispatchers.Default) {
val selector = SelectorManager(Dispatchers.Default)
val server = aSocket(selector).tcp().bind(hostname = LOCALHOST, port = 0)
val port = server.localAddress.port()
try {
val acceptJob = async { server.accept() }
val client = aSocket(selector).tcp().connect(InetSocketAddress(LOCALHOST, port))
val serverConn = acceptJob.await()
val clientRead = client.openReadChannel()
val serverWrite = serverConn.openWriteChannel(autoFlush = true)
val buf = ByteArray(64)
// 1st read: server is silent, so this must time out (null), cancelling the parked read.
val firstRead = withTimeoutOrNull(200) { clientRead.readAvailable(buf) }
assertNull(firstRead, "expected the idle read to time out")
// 2nd read on the SAME channel: server now sends a byte — the channel must still deliver it.
serverWrite.writeFully(byteArrayOf(0x42))
val secondRead = withTimeout(2_000) { clientRead.readAvailable(buf) }
assertEquals(1, secondRead, "channel was torn down by the previous read-timeout cancellation")
assertEquals(0x42.toByte(), buf[0])
client.close()
serverConn.close()
} finally {
server.close()
selector.close()
}
}
}
/** End-to-end: connect, receive a framed packet from the peer, decode it through [StreamFrameCodec]. */
@Test
fun `transport decodes a framed packet sent by the peer`() = runTest {
withContext(Dispatchers.Default) {
val server = TestTcpServer.start()
val connected = CompletableDeferred<Unit>()
val received = CompletableDeferred<ByteArray>()
val transport =
TcpTransport(
dispatchers = testDispatchers(),
scope = CoroutineScope(SupervisorJob() + Dispatchers.Default),
listener =
object : TcpTransport.Listener {
override fun onConnected() {
connected.complete(Unit)
}
override fun onDisconnected() = Unit
override fun onPacketReceived(bytes: ByteArray) {
received.complete(bytes)
}
},
)
try {
transport.start("$LOCALHOST:${server.port}")
val conn = withTimeout(5_000) { server.awaitConnection() }
withTimeout(5_000) { connected.await() }
// The transport sends 4 wake bytes (0x94) on connect; drain them so they do not pollute asserts.
conn.drain(4)
val payload = byteArrayOf(0x10, 0x20, 0x30)
conn.writeFramed(payload)
val decoded = withTimeout(5_000) { received.await() }
assertContentEquals(payload, decoded)
assertTrue(transport.isConnected)
} finally {
transport.stop()
server.close()
}
}
}
private fun testDispatchers() =
CoroutineDispatchers(io = Dispatchers.Default, main = Dispatchers.Default, default = Dispatchers.Default)
private class TestTcpServer
private constructor(
private val selector: SelectorManager,
private val socket: ServerSocket,
) {
private val scope = CoroutineScope(SupervisorJob() + Dispatchers.Default)
private val accepted = CompletableDeferred<TestTcpConnection>()
val port: Int = socket.localAddress.port()
init {
scope.launch {
runCatching {
val s = socket.accept()
accepted.complete(TestTcpConnection(s.openReadChannel(), s.openWriteChannel(autoFlush = true)))
}
.onFailure { accepted.completeExceptionally(it) }
}
}
suspend fun awaitConnection(): TestTcpConnection = accepted.await()
fun close() {
runCatching { socket.close() }
runCatching { selector.close() }
scope.cancel()
}
companion object {
suspend fun start(): TestTcpServer {
val selector = SelectorManager(Dispatchers.Default)
return TestTcpServer(selector, aSocket(selector).tcp().bind(hostname = LOCALHOST, port = 0))
}
}
}
private class TestTcpConnection(
private val read: io.ktor.utils.io.ByteReadChannel,
private val write: ByteWriteChannel,
) {
/** Reads and discards exactly [count] bytes. */
suspend fun drain(count: Int) {
val buf = ByteArray(count)
var off = 0
while (off < count) {
read.awaitContent()
val n = read.readAvailable(buf, off, count - off)
if (n == -1) break
off += n
}
}
/** Writes a Meshtastic stream frame: [START1][START2][len MSB][len LSB][payload]. */
suspend fun writeFramed(payload: ByteArray) {
val frame =
byteArrayOf(
StreamFrameCodec.START1,
StreamFrameCodec.START2,
(payload.size shr 8).toByte(),
(payload.size and 0xff).toByte(),
) + payload
write.writeFully(frame)
write.flush()
}
}
private companion object {
const val LOCALHOST = "127.0.0.1"
}
}