diff --git a/feature/firmware/src/commonMain/kotlin/org/meshtastic/feature/firmware/ota/BleOtaTransport.kt b/feature/firmware/src/commonMain/kotlin/org/meshtastic/feature/firmware/ota/BleOtaTransport.kt index 8e0bd6d28..cd9d7b781 100644 --- a/feature/firmware/src/commonMain/kotlin/org/meshtastic/feature/firmware/ota/BleOtaTransport.kt +++ b/feature/firmware/src/commonMain/kotlin/org/meshtastic/feature/firmware/ota/BleOtaTransport.kt @@ -27,7 +27,6 @@ import kotlinx.coroutines.delay import kotlinx.coroutines.flow.catch import kotlinx.coroutines.flow.launchIn import kotlinx.coroutines.flow.onEach -import kotlinx.coroutines.withTimeout import org.meshtastic.core.ble.BleCharacteristic import org.meshtastic.core.ble.BleConnectionFactory import org.meshtastic.core.ble.BleConnectionState @@ -113,16 +112,16 @@ class BleOtaTransport( val maxLen = bleConnection.maximumWriteValueLength(BleWriteType.WITHOUT_RESPONSE) Logger.i { "BLE OTA: Service ready. Max write value length: $maxLen bytes" } - // Enable notifications and collect responses + // Collect responses. onSubscription fires when the CCCD write completes — a precise readiness + // signal; the settle below is a conservative cushion. val subscribed = CompletableDeferred() service - .observe(txChar) + .observe(txChar) { + Logger.d { "BLE OTA: TX characteristic subscribed" } + subscribed.complete(Unit) + } .onEach { notifyBytes -> try { - if (!subscribed.isCompleted) { - Logger.d { "BLE OTA: TX characteristic subscribed" } - subscribed.complete(Unit) - } val response = notifyBytes.decodeToString() Logger.d { "BLE OTA: Received response: $response" } responseChannel.trySend(response) @@ -136,11 +135,9 @@ class BleOtaTransport( } .launchIn(this) - // Allow time for the BLE subscription to be established before proceeding. - delay(SUBSCRIPTION_SETTLE) - if (!subscribed.isCompleted) subscribed.complete(Unit) - subscribed.await() + // Conservative settle after CCCD confirmation before issuing commands. + delay(SUBSCRIPTION_SETTLE) Logger.i { "BLE OTA: Service discovered and ready" } } } @@ -300,10 +297,8 @@ class BleOtaTransport( return packetsSent } - private suspend fun waitForResponse(timeout: Duration): String = try { - withTimeout(timeout) { responseChannel.receive() } - } catch (@Suppress("SwallowedException") e: kotlinx.coroutines.TimeoutCancellationException) { - throw OtaProtocolException.Timeout("Timeout waiting for response after $timeout") + private suspend fun waitForResponse(timeout: Duration): String = responseChannel.receiveWithin(timeout) { + OtaProtocolException.Timeout("Timeout waiting for response after $timeout") } companion object { diff --git a/feature/firmware/src/commonMain/kotlin/org/meshtastic/feature/firmware/ota/BleScanSupport.kt b/feature/firmware/src/commonMain/kotlin/org/meshtastic/feature/firmware/ota/BleScanSupport.kt index db852ffac..6fe8fd8b9 100644 --- a/feature/firmware/src/commonMain/kotlin/org/meshtastic/feature/firmware/ota/BleScanSupport.kt +++ b/feature/firmware/src/commonMain/kotlin/org/meshtastic/feature/firmware/ota/BleScanSupport.kt @@ -17,9 +17,17 @@ package org.meshtastic.feature.firmware.ota import co.touchlab.kermit.Logger +import kotlinx.coroutines.TimeoutCancellationException +import kotlinx.coroutines.channels.Channel +import kotlinx.coroutines.coroutineScope import kotlinx.coroutines.delay +import kotlinx.coroutines.flow.first import kotlinx.coroutines.flow.firstOrNull import kotlinx.coroutines.flow.onEach +import kotlinx.coroutines.launch +import kotlinx.coroutines.withTimeout +import org.meshtastic.core.ble.BleConnection +import org.meshtastic.core.ble.BleConnectionState import org.meshtastic.core.ble.BleDevice import org.meshtastic.core.ble.BleScanner import kotlin.time.Duration @@ -84,3 +92,39 @@ internal suspend fun scanForBleDevice( } return null } + +/** + * Receives a single element from this channel within [timeout], translating a [withTimeout] timeout into the + * transport-specific exception supplied by [onTimeout] (e.g. `OtaProtocolException.Timeout` vs `DfuException.Timeout`). + * + * Shared by the response-wait paths of [BleOtaTransport] and + * [SecureDfuTransport][org.meshtastic.feature.firmware.ota.dfu.SecureDfuTransport]. The Legacy DFU transport keeps its + * own drain-and-filter loops (a single timeout bounds the whole drain), so it deliberately does not use this helper. + */ +internal suspend fun Channel.receiveWithin(timeout: Duration, onTimeout: () -> Throwable): T = try { + withTimeout(timeout) { receive() } +} catch (@Suppress("SwallowedException") e: TimeoutCancellationException) { + throw onTimeout() +} + +/** + * Runs [block] while a watcher waits for the BLE link to drop. If [connectionState][BleConnection.connectionState] + * reaches [Disconnected][BleConnectionState.Disconnected] mid-[block], the watcher throws the result of [onDrop], + * cancelling [block] and surfacing the drop immediately instead of blocking on a write that will never complete. + * + * Shared by the firmware-streaming paths of the Secure and Legacy DFU transports. + */ +internal suspend fun BleConnection.withDisconnectTripwire( + onDrop: (BleConnectionState) -> Throwable, + block: suspend () -> T, +): T = coroutineScope { + val watcher = launch { + val state = connectionState.first { it is BleConnectionState.Disconnected } + throw onDrop(state) + } + try { + block() + } finally { + watcher.cancel() + } +} diff --git a/feature/firmware/src/commonMain/kotlin/org/meshtastic/feature/firmware/ota/Esp32OtaUpdateHandler.kt b/feature/firmware/src/commonMain/kotlin/org/meshtastic/feature/firmware/ota/Esp32OtaUpdateHandler.kt index 530dfc54c..84281b060 100644 --- a/feature/firmware/src/commonMain/kotlin/org/meshtastic/feature/firmware/ota/Esp32OtaUpdateHandler.kt +++ b/feature/firmware/src/commonMain/kotlin/org/meshtastic/feature/firmware/ota/Esp32OtaUpdateHandler.kt @@ -24,7 +24,6 @@ import org.koin.core.annotation.Single import org.meshtastic.core.ble.BleConnectionFactory import org.meshtastic.core.ble.BleScanner import org.meshtastic.core.common.util.CommonUri -import org.meshtastic.core.common.util.NumberFormatter import org.meshtastic.core.common.util.ioDispatcher import org.meshtastic.core.database.entity.FirmwareRelease import org.meshtastic.core.di.CoroutineDispatchers @@ -54,7 +53,6 @@ import org.meshtastic.feature.firmware.stripFormatArgs private const val RETRY_DELAY = 2000L private const val PERCENT_MAX = 100 -private const val KIB_DIVISOR = 1024f // Time to wait for OTA reboot packet to be sent before disconnecting mesh service private const val PACKET_SEND_DELAY_MS = 2000L @@ -267,20 +265,20 @@ class Esp32OtaUpdateHandler( FirmwareUpdateState.Processing(ProgressState(UiText.Resource(Res.string.firmware_update_waiting_reboot))), ) - for (i in 1..attempts) { - try { + retryWithDelay( + attempts = attempts, + retryDelayMillis = RETRY_DELAY, + onAttempt = { i -> updateState( FirmwareUpdateState.Processing( ProgressState(UiText.Resource(Res.string.firmware_update_connecting_attempt, i, attempts)), ), ) - transport.connect().getOrThrow() - return - } catch (@Suppress("TooGenericExceptionCaught") e: Exception) { - if (i == attempts) throw e - delay(RETRY_DELAY) - } + }, + ) { + transport.connect() } + .getOrThrow() } @Suppress("LongMethod") @@ -328,28 +326,11 @@ class Esp32OtaUpdateHandler( onProgress = { progress -> val bytesSent = (progress * firmwareData.size).toLong() throughputTracker.record(bytesSent) - - val percent = (progress * PERCENT_MAX).toInt() - val bytesPerSecond = throughputTracker.bytesPerSecond() - - val speedText = - if (bytesPerSecond > 0) { - val kibPerSecond = bytesPerSecond.toFloat() / KIB_DIVISOR - val remainingBytes = firmwareData.size - bytesSent - val etaSeconds = remainingBytes.toFloat() / bytesPerSecond - - "${NumberFormatter.format(kibPerSecond, 1)} KiB/s, ETA: ${etaSeconds.toInt()}s" - } else { - "" - } - + val details = + formatTransferProgress(progress, firmwareData.size, throughputTracker.bytesPerSecond()) updateState( FirmwareUpdateState.Updating( - ProgressState( - message = uploadingMsg, - progress = progress, - details = "$percent% ($speedText)", - ), + ProgressState(message = uploadingMsg, progress = progress, details = details), ), ) }, diff --git a/feature/firmware/src/commonMain/kotlin/org/meshtastic/feature/firmware/ota/FirmwareUpdateHelpers.kt b/feature/firmware/src/commonMain/kotlin/org/meshtastic/feature/firmware/ota/FirmwareUpdateHelpers.kt new file mode 100644 index 000000000..4f2574a40 --- /dev/null +++ b/feature/firmware/src/commonMain/kotlin/org/meshtastic/feature/firmware/ota/FirmwareUpdateHelpers.kt @@ -0,0 +1,61 @@ +/* + * Copyright (c) 2026 Meshtastic LLC + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see . + */ +package org.meshtastic.feature.firmware.ota + +import kotlinx.coroutines.delay +import org.meshtastic.core.common.util.NumberFormatter + +private const val PERCENT_MAX = 100 +private const val KIB_DIVISOR = 1024f + +/** + * Formats firmware-transfer progress as a human-readable detail string, e.g. `"42% (12.3 KiB/s, ETA: 5s)"`. + * + * When [bytesPerSecond] is non-positive (no throughput sample yet) only the percentage is returned — no empty + * parentheses. Shared by the ESP32 OTA and Nordic DFU update handlers, which differ only in how they obtain the inputs. + */ +internal fun formatTransferProgress(progress: Float, totalBytes: Int, bytesPerSecond: Long): String { + val percent = (progress * PERCENT_MAX).toInt() + if (bytesPerSecond <= 0L) return "$percent%" + val kibPerSecond = bytesPerSecond.toFloat() / KIB_DIVISOR + val bytesSent = (progress * totalBytes).toLong() + val etaSeconds = ((totalBytes - bytesSent).toFloat() / bytesPerSecond).toInt() + return "$percent% (${NumberFormatter.format(kibPerSecond, 1)} KiB/s, ETA: ${etaSeconds}s)" +} + +/** + * Runs [block] up to [attempts] times, returning the first successful [Result]. [onAttempt] fires before each attempt + * (1-based) for progress reporting, and [retryDelayMillis] is waited between tries (never after the last). If every + * attempt fails, returns the last failure so the caller can surface it however it likes (rethrow as-is vs. wrap in a + * domain exception). + */ +internal suspend fun retryWithDelay( + attempts: Int, + retryDelayMillis: Long, + onAttempt: (attempt: Int) -> Unit, + block: suspend (attempt: Int) -> Result, +): Result { + var lastError: Throwable? = null + for (attempt in 1..attempts) { + onAttempt(attempt) + val result = block(attempt) + if (result.isSuccess) return result + lastError = result.exceptionOrNull() + if (attempt < attempts) delay(retryDelayMillis) + } + return Result.failure(lastError ?: IllegalStateException("retryWithDelay: all $attempts attempts failed")) +} diff --git a/feature/firmware/src/commonMain/kotlin/org/meshtastic/feature/firmware/ota/dfu/LegacyDfuTransport.kt b/feature/firmware/src/commonMain/kotlin/org/meshtastic/feature/firmware/ota/dfu/LegacyDfuTransport.kt index 0444a57d7..01d9fbad4 100644 --- a/feature/firmware/src/commonMain/kotlin/org/meshtastic/feature/firmware/ota/dfu/LegacyDfuTransport.kt +++ b/feature/firmware/src/commonMain/kotlin/org/meshtastic/feature/firmware/ota/dfu/LegacyDfuTransport.kt @@ -34,13 +34,10 @@ import kotlinx.coroutines.SupervisorJob import kotlinx.coroutines.TimeoutCancellationException import kotlinx.coroutines.cancel import kotlinx.coroutines.channels.Channel -import kotlinx.coroutines.coroutineScope import kotlinx.coroutines.delay import kotlinx.coroutines.flow.catch -import kotlinx.coroutines.flow.first import kotlinx.coroutines.flow.launchIn import kotlinx.coroutines.flow.onEach -import kotlinx.coroutines.launch import kotlinx.coroutines.withTimeout import org.meshtastic.core.ble.BleConnectionFactory import org.meshtastic.core.ble.BleConnectionState @@ -50,6 +47,7 @@ import org.meshtastic.core.ble.BleWriteType import org.meshtastic.core.common.util.safeCatching import org.meshtastic.feature.firmware.ota.calculateMacPlusOne import org.meshtastic.feature.firmware.ota.scanForBleDevice +import org.meshtastic.feature.firmware.ota.withDisconnectTripwire import kotlin.time.Duration import kotlin.time.Duration.Companion.milliseconds import kotlin.time.Duration.Companion.minutes @@ -119,12 +117,11 @@ class LegacyDfuTransport( val subscribed = CompletableDeferred() service - .observe(controlChar) + .observe(controlChar) { + Logger.d { "Legacy DFU: Control Point subscribed" } + subscribed.complete(Unit) + } .onEach { bytes -> - if (!subscribed.isCompleted) { - Logger.d { "Legacy DFU: Control Point subscribed" } - subscribed.complete(Unit) - } val parsed = LegacyDfuResponse.parse(bytes) Logger.d { "Legacy DFU: Notification → $parsed" } notificationChannel.trySend(parsed) @@ -135,9 +132,9 @@ class LegacyDfuTransport( } .launchIn(this) - delay(SUBSCRIPTION_SETTLE) - if (!subscribed.isCompleted) subscribed.complete(Unit) subscribed.await() + // Conservative settle after CCCD confirmation before issuing commands. + delay(SUBSCRIPTION_SETTLE) // Best-effort DFU Version read — gate out unsupported old bootloaders (SDK ≤ 6). val versionChar = service.characteristic(LEGACY_DFU_VERSION_UUID) @@ -306,60 +303,54 @@ class LegacyDfuTransport( "(advertised='${dfuAdvertisedName ?: "?"}')" } - coroutineScope { - // Trip-wire: cancels the streaming coroutine the moment Kable observes a disconnect. - val watcher = launch { - val state = bleConnection.connectionState.first { it is BleConnectionState.Disconnected } + bleConnection.withDisconnectTripwire( + onDrop = { state -> Logger.w { "Legacy DFU: Link dropped mid-stream at offset $offset/${firmware.size} (state=$state)" } - throw DfuException.ConnectionFailed("BLE link dropped mid-upload at byte $offset/${firmware.size}") - } - - try { - var packetsSincePrn = 0 - var bytesAtLastPrn = 0L - bleConnection.profile(LegacyDfuUuids.SERVICE, timeout = STREAM_TIMEOUT) { service -> - val packetChar = service.characteristic(LEGACY_DFU_PACKET_UUID) - while (offset < firmware.size) { - val end = minOf(offset + mtu, firmware.size) - try { - service.write(packetChar, firmware.copyOfRange(offset, end), BleWriteType.WITHOUT_RESPONSE) - } catch (e: CancellationException) { - Logger.w(e) { - "Legacy DFU: Write CANCELLED at offset $offset/${firmware.size} cause=${e.cause}" - } - throw e - } catch (@Suppress("TooGenericExceptionCaught") e: Throwable) { - Logger.w(e) { "Legacy DFU: Write FAILED at offset $offset/${firmware.size}: ${e.message}" } - throw e + DfuException.ConnectionFailed("BLE link dropped mid-upload at byte $offset/${firmware.size}") + }, + ) { + var packetsSincePrn = 0 + var bytesAtLastPrn = 0L + bleConnection.profile(LegacyDfuUuids.SERVICE, timeout = STREAM_TIMEOUT) { service -> + val packetChar = service.characteristic(LEGACY_DFU_PACKET_UUID) + while (offset < firmware.size) { + val end = minOf(offset + mtu, firmware.size) + try { + service.write(packetChar, firmware.copyOfRange(offset, end), BleWriteType.WITHOUT_RESPONSE) + } catch (e: CancellationException) { + Logger.w(e) { + "Legacy DFU: Write CANCELLED at offset $offset/${firmware.size} cause=${e.cause}" } - offset = end - packetsSincePrn++ + throw e + } catch (@Suppress("TooGenericExceptionCaught") e: Throwable) { + Logger.w(e) { "Legacy DFU: Write FAILED at offset $offset/${firmware.size}: ${e.message}" } + throw e + } + offset = end + packetsSincePrn++ - if (packetsSincePrn >= PRN_INTERVAL_PACKETS && offset < firmware.size) { - Logger.d { "Legacy DFU: Awaiting PRN at offset $offset" } - val receipt = - try { - awaitPacketReceipt() - } catch (e: CancellationException) { - Logger.w(e) { - "Legacy DFU: awaitPacketReceipt CANCELLED at offset $offset cause=${e.cause}" - } - throw e + if (packetsSincePrn >= PRN_INTERVAL_PACKETS && offset < firmware.size) { + Logger.d { "Legacy DFU: Awaiting PRN at offset $offset" } + val receipt = + try { + awaitPacketReceipt() + } catch (e: CancellationException) { + Logger.w(e) { + "Legacy DFU: awaitPacketReceipt CANCELLED at offset $offset cause=${e.cause}" } - val expected = offset.toLong() - if (receipt.bytesReceived != expected) { - throw LegacyDfuException.PacketReceiptMismatch(expected, receipt.bytesReceived) + throw e } - bytesAtLastPrn = receipt.bytesReceived - packetsSincePrn = 0 - onProgress(offset.toFloat() / firmware.size) + val expected = offset.toLong() + if (receipt.bytesReceived != expected) { + throw LegacyDfuException.PacketReceiptMismatch(expected, receipt.bytesReceived) } + bytesAtLastPrn = receipt.bytesReceived + packetsSincePrn = 0 + onProgress(offset.toFloat() / firmware.size) } } - Logger.d { "Legacy DFU: Streamed $offset/${firmware.size} bytes (lastPRN=$bytesAtLastPrn)" } - } finally { - watcher.cancel() } + Logger.d { "Legacy DFU: Streamed $offset/${firmware.size} bytes (lastPRN=$bytesAtLastPrn)" } } } diff --git a/feature/firmware/src/commonMain/kotlin/org/meshtastic/feature/firmware/ota/dfu/SecureDfuHandler.kt b/feature/firmware/src/commonMain/kotlin/org/meshtastic/feature/firmware/ota/dfu/SecureDfuHandler.kt index 12592e57c..f85ee430b 100644 --- a/feature/firmware/src/commonMain/kotlin/org/meshtastic/feature/firmware/ota/dfu/SecureDfuHandler.kt +++ b/feature/firmware/src/commonMain/kotlin/org/meshtastic/feature/firmware/ota/dfu/SecureDfuHandler.kt @@ -25,7 +25,6 @@ import org.koin.core.annotation.Single import org.meshtastic.core.ble.BleConnectionFactory import org.meshtastic.core.ble.BleScanner import org.meshtastic.core.common.util.CommonUri -import org.meshtastic.core.common.util.NumberFormatter import org.meshtastic.core.common.util.ioDispatcher import org.meshtastic.core.database.entity.FirmwareRelease import org.meshtastic.core.di.CoroutineDispatchers @@ -52,6 +51,8 @@ import org.meshtastic.feature.firmware.FirmwareUpdateState import org.meshtastic.feature.firmware.ProgressState import org.meshtastic.feature.firmware.ota.ThroughputTracker import org.meshtastic.feature.firmware.ota.calculateMacPlusOne +import org.meshtastic.feature.firmware.ota.formatTransferProgress +import org.meshtastic.feature.firmware.ota.retryWithDelay import org.meshtastic.feature.firmware.ota.scanForBleDevice import org.meshtastic.feature.firmware.stripFormatArgs import kotlin.time.Duration.Companion.seconds @@ -61,7 +62,6 @@ private const val GATT_RELEASE_DELAY_MS = 1_500L private const val DFU_REBOOT_WAIT_MS = 3_000L private const val RETRY_DELAY_MS = 2_000L private const val CONNECT_ATTEMPTS = 4 -private const val KIB_DIVISOR = 1024f /** * KMP [FirmwareUpdateHandler] for nRF52 devices. @@ -173,25 +173,10 @@ class SecureDfuHandler( transport .transferFirmware(pkg.firmware) { progress -> - val pct = (progress * PERCENT_MAX).toInt() val bytesSent = (progress * firmwareSize).toLong() throughputTracker.record(bytesSent) - - val bytesPerSecond = throughputTracker.bytesPerSecond() - val speedKib = bytesPerSecond.toFloat() / KIB_DIVISOR - - val details = buildString { - append("$pct%") - if (speedKib > 0f) { - val remainingBytes = firmwareSize - bytesSent - val etaSeconds = remainingBytes.toFloat() / bytesPerSecond - append( - " (${NumberFormatter.format(speedKib, 1)} " + - "KiB/s, ETA: ${etaSeconds.toInt()}s)", - ) - } - } - + val details = + formatTransferProgress(progress, firmwareSize, throughputTracker.bytesPerSecond()) updateState( FirmwareUpdateState.Updating( ProgressState(uploadMsg, progress, details, hint = slowHint), @@ -269,27 +254,34 @@ class SecureDfuHandler( updateState( FirmwareUpdateState.Processing(ProgressState(UiText.Resource(Res.string.firmware_update_waiting_reboot))), ) - var lastError: Throwable? = null - for (attempt in 1..CONNECT_ATTEMPTS) { - updateState( - FirmwareUpdateState.Processing( - ProgressState( - UiText.Resource(Res.string.firmware_update_connecting_attempt, attempt, CONNECT_ATTEMPTS), + retryWithDelay( + attempts = CONNECT_ATTEMPTS, + retryDelayMillis = RETRY_DELAY_MS, + onAttempt = { attempt -> + updateState( + FirmwareUpdateState.Processing( + ProgressState( + UiText.Resource( + Res.string.firmware_update_connecting_attempt, + attempt, + CONNECT_ATTEMPTS, + ), + ), ), - ), - ) - val result = transport.connectToDfuMode() - if (result.isSuccess) { - return - } - lastError = result.exceptionOrNull() - Logger.w { "DFU: Connect attempt $attempt/$CONNECT_ATTEMPTS failed: ${lastError?.message}" } - if (attempt < CONNECT_ATTEMPTS) delay(RETRY_DELAY_MS) - } - throw DfuException.ConnectionFailed( - "Failed to connect to DFU device after $CONNECT_ATTEMPTS attempts", - lastError, + ) + }, + block = { attempt -> + transport.connectToDfuMode().onFailure { + Logger.w { "DFU: Connect attempt $attempt/$CONNECT_ATTEMPTS failed: ${it.message}" } + } + }, ) + .getOrElse { + throw DfuException.ConnectionFailed( + "Failed to connect to DFU device after $CONNECT_ATTEMPTS attempts", + it, + ) + } } private suspend fun obtainZipFile( diff --git a/feature/firmware/src/commonMain/kotlin/org/meshtastic/feature/firmware/ota/dfu/SecureDfuTransport.kt b/feature/firmware/src/commonMain/kotlin/org/meshtastic/feature/firmware/ota/dfu/SecureDfuTransport.kt index b662c0088..73966139f 100644 --- a/feature/firmware/src/commonMain/kotlin/org/meshtastic/feature/firmware/ota/dfu/SecureDfuTransport.kt +++ b/feature/firmware/src/commonMain/kotlin/org/meshtastic/feature/firmware/ota/dfu/SecureDfuTransport.kt @@ -34,13 +34,10 @@ import kotlinx.coroutines.SupervisorJob import kotlinx.coroutines.TimeoutCancellationException import kotlinx.coroutines.cancel import kotlinx.coroutines.channels.Channel -import kotlinx.coroutines.coroutineScope import kotlinx.coroutines.delay import kotlinx.coroutines.flow.catch -import kotlinx.coroutines.flow.first import kotlinx.coroutines.flow.launchIn import kotlinx.coroutines.flow.onEach -import kotlinx.coroutines.launch import kotlinx.coroutines.withTimeout import org.meshtastic.core.ble.BleConnectionFactory import org.meshtastic.core.ble.BleConnectionState @@ -51,7 +48,9 @@ import org.meshtastic.core.ble.DEFAULT_BLE_WRITE_VALUE_LENGTH import org.meshtastic.core.ble.MeshtasticBleDevice import org.meshtastic.core.common.util.safeCatching import org.meshtastic.feature.firmware.ota.calculateMacPlusOne +import org.meshtastic.feature.firmware.ota.receiveWithin import org.meshtastic.feature.firmware.ota.scanForBleDevice +import org.meshtastic.feature.firmware.ota.withDisconnectTripwire import kotlin.time.Duration import kotlin.time.Duration.Companion.milliseconds import kotlin.time.Duration.Companion.minutes @@ -253,27 +252,24 @@ class SecureDfuTransport( bleConnection.profile(SecureDfuUuids.SERVICE) { service -> val controlChar = service.characteristic(SecureDfuUuids.CONTROL_POINT) - // Subscribe to Control Point notifications before issuing any commands. - // launchIn(this) uses connectionScope so the subscription persists beyond this block. + // Subscribe to Control Point notifications before issuing any commands. onSubscription fires once the CCCD + // write completes; launchIn(this) uses connectionScope so the subscription persists beyond this block. val subscribed = CompletableDeferred() service - .observe(controlChar) - .onEach { bytes -> - if (!subscribed.isCompleted) { - Logger.d { "DFU: Control Point subscribed" } - subscribed.complete(Unit) - } - notificationChannel.trySend(bytes) + .observe(controlChar) { + Logger.d { "DFU: Control Point subscribed" } + subscribed.complete(Unit) } + .onEach { bytes -> notificationChannel.trySend(bytes) } .catch { e -> if (!subscribed.isCompleted) subscribed.completeExceptionally(e) Logger.e(e) { "DFU: Control Point notification error" } } .launchIn(this) - delay(SUBSCRIPTION_SETTLE) - if (!subscribed.isCompleted) subscribed.complete(Unit) subscribed.await() + // Conservative settle after CCCD confirmation before issuing commands. + delay(SUBSCRIPTION_SETTLE) Logger.i { "DFU: Connected and ready (${device.address})" } } @@ -515,39 +511,33 @@ class SecureDfuTransport( val mtu = bleConnection.maximumWriteValueLength(BleWriteType.WITHOUT_RESPONSE) ?: DEFAULT_BLE_WRITE_VALUE_LENGTH var pos = from - coroutineScope { - // Trip-wire: cancels the streaming coroutine the moment Kable observes a disconnect. - val watcher = launch { - val state = bleConnection.connectionState.first { it is BleConnectionState.Disconnected } + bleConnection.withDisconnectTripwire( + onDrop = { state -> Logger.w { "Secure DFU: Link dropped mid-stream at offset $pos/$until (state=$state)" } - throw DfuException.ConnectionFailed("BLE link dropped mid-upload at byte $pos/$until") - } + DfuException.ConnectionFailed("BLE link dropped mid-upload at byte $pos/$until") + }, + ) { + var packetsSincePrn = 0 + bleConnection.profile(SecureDfuUuids.SERVICE, timeout = STREAM_TIMEOUT) { service -> + val packetChar = service.characteristic(SecureDfuUuids.PACKET) + while (pos < until) { + val chunkEnd = minOf(pos + mtu, until) + service.write(packetChar, data.copyOfRange(pos, chunkEnd), BleWriteType.WITHOUT_RESPONSE) + pos = chunkEnd + packetsSincePrn++ - try { - var packetsSincePrn = 0 - bleConnection.profile(SecureDfuUuids.SERVICE, timeout = STREAM_TIMEOUT) { service -> - val packetChar = service.characteristic(SecureDfuUuids.PACKET) - while (pos < until) { - val chunkEnd = minOf(pos + mtu, until) - service.write(packetChar, data.copyOfRange(pos, chunkEnd), BleWriteType.WITHOUT_RESPONSE) - pos = chunkEnd - packetsSincePrn++ - - if (prnInterval > 0 && packetsSincePrn >= prnInterval && pos < until) { - val response = awaitNotification(COMMAND_TIMEOUT) - if (response is DfuResponse.ChecksumResult) { - val expectedCrc = DfuCrc32.calculate(data, length = pos) - if (response.offset != pos || response.crc32 != expectedCrc) { - throw DfuException.ChecksumMismatch(expected = expectedCrc, actual = response.crc32) - } - Logger.d { "DFU: PRN checksum OK at offset $pos" } + if (prnInterval > 0 && packetsSincePrn >= prnInterval && pos < until) { + val response = awaitNotification(COMMAND_TIMEOUT) + if (response is DfuResponse.ChecksumResult) { + val expectedCrc = DfuCrc32.calculate(data, length = pos) + if (response.offset != pos || response.crc32 != expectedCrc) { + throw DfuException.ChecksumMismatch(expected = expectedCrc, actual = response.crc32) } - packetsSincePrn = 0 + Logger.d { "DFU: PRN checksum OK at offset $pos" } } + packetsSincePrn = 0 } } - } finally { - watcher.cancel() } } } @@ -608,13 +598,12 @@ class SecureDfuTransport( Logger.d { "DFU: Object executed." } } - private suspend fun awaitNotification(timeout: Duration): DfuResponse = try { - withTimeout(timeout) { - val bytes = notificationChannel.receive() - DfuResponse.parse(bytes).also { Logger.d { "DFU: Notification → $it" } } - } - } catch (_: TimeoutCancellationException) { - throw DfuException.Timeout("No response from Control Point after $timeout") + private suspend fun awaitNotification(timeout: Duration): DfuResponse { + val bytes = + notificationChannel.receiveWithin(timeout) { + DfuException.Timeout("No response from Control Point after $timeout") + } + return DfuResponse.parse(bytes).also { Logger.d { "DFU: Notification → $it" } } } private fun DfuResponse.requireSuccess(expectedOpcode: Byte) { diff --git a/feature/firmware/src/commonTest/kotlin/org/meshtastic/feature/firmware/ota/FirmwareUpdateHelpersTest.kt b/feature/firmware/src/commonTest/kotlin/org/meshtastic/feature/firmware/ota/FirmwareUpdateHelpersTest.kt new file mode 100644 index 000000000..0ac1650e2 --- /dev/null +++ b/feature/firmware/src/commonTest/kotlin/org/meshtastic/feature/firmware/ota/FirmwareUpdateHelpersTest.kt @@ -0,0 +1,98 @@ +/* + * Copyright (c) 2026 Meshtastic LLC + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see . + */ +@file:Suppress("MagicNumber") + +package org.meshtastic.feature.firmware.ota + +import kotlinx.coroutines.ExperimentalCoroutinesApi +import kotlinx.coroutines.test.runTest +import kotlin.test.Test +import kotlin.test.assertEquals +import kotlin.test.assertSame +import kotlin.test.assertTrue + +@OptIn(ExperimentalCoroutinesApi::class) +class FirmwareUpdateHelpersTest { + + // ----- formatTransferProgress ----- + + @Test + fun `formatTransferProgress omits speed when throughput is zero`() { + assertEquals("50%", formatTransferProgress(progress = 0.5f, totalBytes = 1000, bytesPerSecond = 0)) + } + + @Test + fun `formatTransferProgress omits speed when throughput is non-positive`() { + assertEquals("0%", formatTransferProgress(progress = 0f, totalBytes = 1000, bytesPerSecond = -5)) + } + + @Test + fun `formatTransferProgress includes KiB per second and ETA`() { + // 50% of 2048 bytes (1024 remaining) at 1024 B/s → 1.0 KiB/s, 1s ETA. + assertEquals( + "50% (1.0 KiB/s, ETA: 1s)", + formatTransferProgress(progress = 0.5f, totalBytes = 2048, bytesPerSecond = 1024), + ) + } + + // ----- retryWithDelay ----- + + @Test + fun `retryWithDelay returns first success without further attempts`() = runTest { + var calls = 0 + val result = + retryWithDelay(attempts = 3, retryDelayMillis = 1000, onAttempt = {}) { + calls++ + Result.success("ok") + } + assertTrue(result.isSuccess) + assertEquals("ok", result.getOrNull()) + assertEquals(1, calls) + } + + @Test + fun `retryWithDelay retries until success and reports each attempt`() = runTest { + var calls = 0 + val attemptsSeen = mutableListOf() + val result = + retryWithDelay(attempts = 5, retryDelayMillis = 1000, onAttempt = { attemptsSeen += it }) { + calls++ + if (calls < 3) Result.failure(RuntimeException("nope $calls")) else Result.success(calls) + } + assertEquals(3, result.getOrNull()) + assertEquals(3, calls) + assertEquals(listOf(1, 2, 3), attemptsSeen) + } + + @Test + fun `retryWithDelay returns the last failure after exhausting attempts`() = runTest { + var calls = 0 + val lastError = RuntimeException("final") + val result = + retryWithDelay(attempts = 3, retryDelayMillis = 1000, onAttempt = {}) { + calls++ + if (calls < 3) { + Result.failure(RuntimeException("attempt $calls")) + } else { + Result.failure(lastError) + } + } + assertTrue(result.isFailure) + assertSame(lastError, result.exceptionOrNull()) + assertEquals(3, calls) + } +}