refactor(firmware): dedupe BLE/DFU OTA transport + handler boilerplate (#5918)

Co-authored-by: Claude Opus 4.8 <noreply@anthropic.com>
This commit is contained in:
James Rich
2026-06-23 13:58:36 -05:00
committed by GitHub
parent 61c8a3f479
commit 56f522cd6c
8 changed files with 337 additions and 186 deletions

View File

@@ -27,7 +27,6 @@ import kotlinx.coroutines.delay
import kotlinx.coroutines.flow.catch
import kotlinx.coroutines.flow.launchIn
import kotlinx.coroutines.flow.onEach
import kotlinx.coroutines.withTimeout
import org.meshtastic.core.ble.BleCharacteristic
import org.meshtastic.core.ble.BleConnectionFactory
import org.meshtastic.core.ble.BleConnectionState
@@ -113,16 +112,16 @@ class BleOtaTransport(
val maxLen = bleConnection.maximumWriteValueLength(BleWriteType.WITHOUT_RESPONSE)
Logger.i { "BLE OTA: Service ready. Max write value length: $maxLen bytes" }
// Enable notifications and collect responses
// Collect responses. onSubscription fires when the CCCD write completes — a precise readiness
// signal; the settle below is a conservative cushion.
val subscribed = CompletableDeferred<Unit>()
service
.observe(txChar)
.observe(txChar) {
Logger.d { "BLE OTA: TX characteristic subscribed" }
subscribed.complete(Unit)
}
.onEach { notifyBytes ->
try {
if (!subscribed.isCompleted) {
Logger.d { "BLE OTA: TX characteristic subscribed" }
subscribed.complete(Unit)
}
val response = notifyBytes.decodeToString()
Logger.d { "BLE OTA: Received response: $response" }
responseChannel.trySend(response)
@@ -136,11 +135,9 @@ class BleOtaTransport(
}
.launchIn(this)
// Allow time for the BLE subscription to be established before proceeding.
delay(SUBSCRIPTION_SETTLE)
if (!subscribed.isCompleted) subscribed.complete(Unit)
subscribed.await()
// Conservative settle after CCCD confirmation before issuing commands.
delay(SUBSCRIPTION_SETTLE)
Logger.i { "BLE OTA: Service discovered and ready" }
}
}
@@ -300,10 +297,8 @@ class BleOtaTransport(
return packetsSent
}
private suspend fun waitForResponse(timeout: Duration): String = try {
withTimeout(timeout) { responseChannel.receive() }
} catch (@Suppress("SwallowedException") e: kotlinx.coroutines.TimeoutCancellationException) {
throw OtaProtocolException.Timeout("Timeout waiting for response after $timeout")
private suspend fun waitForResponse(timeout: Duration): String = responseChannel.receiveWithin(timeout) {
OtaProtocolException.Timeout("Timeout waiting for response after $timeout")
}
companion object {

View File

@@ -17,9 +17,17 @@
package org.meshtastic.feature.firmware.ota
import co.touchlab.kermit.Logger
import kotlinx.coroutines.TimeoutCancellationException
import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.coroutineScope
import kotlinx.coroutines.delay
import kotlinx.coroutines.flow.first
import kotlinx.coroutines.flow.firstOrNull
import kotlinx.coroutines.flow.onEach
import kotlinx.coroutines.launch
import kotlinx.coroutines.withTimeout
import org.meshtastic.core.ble.BleConnection
import org.meshtastic.core.ble.BleConnectionState
import org.meshtastic.core.ble.BleDevice
import org.meshtastic.core.ble.BleScanner
import kotlin.time.Duration
@@ -84,3 +92,39 @@ internal suspend fun scanForBleDevice(
}
return null
}
/**
* Receives a single element from this channel within [timeout], translating a [withTimeout] timeout into the
* transport-specific exception supplied by [onTimeout] (e.g. `OtaProtocolException.Timeout` vs `DfuException.Timeout`).
*
* Shared by the response-wait paths of [BleOtaTransport] and
* [SecureDfuTransport][org.meshtastic.feature.firmware.ota.dfu.SecureDfuTransport]. The Legacy DFU transport keeps its
* own drain-and-filter loops (a single timeout bounds the whole drain), so it deliberately does not use this helper.
*/
internal suspend fun <T> Channel<T>.receiveWithin(timeout: Duration, onTimeout: () -> Throwable): T = try {
withTimeout(timeout) { receive() }
} catch (@Suppress("SwallowedException") e: TimeoutCancellationException) {
throw onTimeout()
}
/**
* Runs [block] while a watcher waits for the BLE link to drop. If [connectionState][BleConnection.connectionState]
* reaches [Disconnected][BleConnectionState.Disconnected] mid-[block], the watcher throws the result of [onDrop],
* cancelling [block] and surfacing the drop immediately instead of blocking on a write that will never complete.
*
* Shared by the firmware-streaming paths of the Secure and Legacy DFU transports.
*/
internal suspend fun <T> BleConnection.withDisconnectTripwire(
onDrop: (BleConnectionState) -> Throwable,
block: suspend () -> T,
): T = coroutineScope {
val watcher = launch {
val state = connectionState.first { it is BleConnectionState.Disconnected }
throw onDrop(state)
}
try {
block()
} finally {
watcher.cancel()
}
}

View File

@@ -24,7 +24,6 @@ import org.koin.core.annotation.Single
import org.meshtastic.core.ble.BleConnectionFactory
import org.meshtastic.core.ble.BleScanner
import org.meshtastic.core.common.util.CommonUri
import org.meshtastic.core.common.util.NumberFormatter
import org.meshtastic.core.common.util.ioDispatcher
import org.meshtastic.core.database.entity.FirmwareRelease
import org.meshtastic.core.di.CoroutineDispatchers
@@ -54,7 +53,6 @@ import org.meshtastic.feature.firmware.stripFormatArgs
private const val RETRY_DELAY = 2000L
private const val PERCENT_MAX = 100
private const val KIB_DIVISOR = 1024f
// Time to wait for OTA reboot packet to be sent before disconnecting mesh service
private const val PACKET_SEND_DELAY_MS = 2000L
@@ -267,20 +265,20 @@ class Esp32OtaUpdateHandler(
FirmwareUpdateState.Processing(ProgressState(UiText.Resource(Res.string.firmware_update_waiting_reboot))),
)
for (i in 1..attempts) {
try {
retryWithDelay(
attempts = attempts,
retryDelayMillis = RETRY_DELAY,
onAttempt = { i ->
updateState(
FirmwareUpdateState.Processing(
ProgressState(UiText.Resource(Res.string.firmware_update_connecting_attempt, i, attempts)),
),
)
transport.connect().getOrThrow()
return
} catch (@Suppress("TooGenericExceptionCaught") e: Exception) {
if (i == attempts) throw e
delay(RETRY_DELAY)
}
},
) {
transport.connect()
}
.getOrThrow()
}
@Suppress("LongMethod")
@@ -328,28 +326,11 @@ class Esp32OtaUpdateHandler(
onProgress = { progress ->
val bytesSent = (progress * firmwareData.size).toLong()
throughputTracker.record(bytesSent)
val percent = (progress * PERCENT_MAX).toInt()
val bytesPerSecond = throughputTracker.bytesPerSecond()
val speedText =
if (bytesPerSecond > 0) {
val kibPerSecond = bytesPerSecond.toFloat() / KIB_DIVISOR
val remainingBytes = firmwareData.size - bytesSent
val etaSeconds = remainingBytes.toFloat() / bytesPerSecond
"${NumberFormatter.format(kibPerSecond, 1)} KiB/s, ETA: ${etaSeconds.toInt()}s"
} else {
""
}
val details =
formatTransferProgress(progress, firmwareData.size, throughputTracker.bytesPerSecond())
updateState(
FirmwareUpdateState.Updating(
ProgressState(
message = uploadingMsg,
progress = progress,
details = "$percent% ($speedText)",
),
ProgressState(message = uploadingMsg, progress = progress, details = details),
),
)
},

View File

@@ -0,0 +1,61 @@
/*
* Copyright (c) 2026 Meshtastic LLC
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*/
package org.meshtastic.feature.firmware.ota
import kotlinx.coroutines.delay
import org.meshtastic.core.common.util.NumberFormatter
private const val PERCENT_MAX = 100
private const val KIB_DIVISOR = 1024f
/**
* Formats firmware-transfer progress as a human-readable detail string, e.g. `"42% (12.3 KiB/s, ETA: 5s)"`.
*
* When [bytesPerSecond] is non-positive (no throughput sample yet) only the percentage is returned — no empty
* parentheses. Shared by the ESP32 OTA and Nordic DFU update handlers, which differ only in how they obtain the inputs.
*/
internal fun formatTransferProgress(progress: Float, totalBytes: Int, bytesPerSecond: Long): String {
val percent = (progress * PERCENT_MAX).toInt()
if (bytesPerSecond <= 0L) return "$percent%"
val kibPerSecond = bytesPerSecond.toFloat() / KIB_DIVISOR
val bytesSent = (progress * totalBytes).toLong()
val etaSeconds = ((totalBytes - bytesSent).toFloat() / bytesPerSecond).toInt()
return "$percent% (${NumberFormatter.format(kibPerSecond, 1)} KiB/s, ETA: ${etaSeconds}s)"
}
/**
* Runs [block] up to [attempts] times, returning the first successful [Result]. [onAttempt] fires before each attempt
* (1-based) for progress reporting, and [retryDelayMillis] is waited between tries (never after the last). If every
* attempt fails, returns the last failure so the caller can surface it however it likes (rethrow as-is vs. wrap in a
* domain exception).
*/
internal suspend fun <T> retryWithDelay(
attempts: Int,
retryDelayMillis: Long,
onAttempt: (attempt: Int) -> Unit,
block: suspend (attempt: Int) -> Result<T>,
): Result<T> {
var lastError: Throwable? = null
for (attempt in 1..attempts) {
onAttempt(attempt)
val result = block(attempt)
if (result.isSuccess) return result
lastError = result.exceptionOrNull()
if (attempt < attempts) delay(retryDelayMillis)
}
return Result.failure(lastError ?: IllegalStateException("retryWithDelay: all $attempts attempts failed"))
}

View File

@@ -34,13 +34,10 @@ import kotlinx.coroutines.SupervisorJob
import kotlinx.coroutines.TimeoutCancellationException
import kotlinx.coroutines.cancel
import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.coroutineScope
import kotlinx.coroutines.delay
import kotlinx.coroutines.flow.catch
import kotlinx.coroutines.flow.first
import kotlinx.coroutines.flow.launchIn
import kotlinx.coroutines.flow.onEach
import kotlinx.coroutines.launch
import kotlinx.coroutines.withTimeout
import org.meshtastic.core.ble.BleConnectionFactory
import org.meshtastic.core.ble.BleConnectionState
@@ -50,6 +47,7 @@ import org.meshtastic.core.ble.BleWriteType
import org.meshtastic.core.common.util.safeCatching
import org.meshtastic.feature.firmware.ota.calculateMacPlusOne
import org.meshtastic.feature.firmware.ota.scanForBleDevice
import org.meshtastic.feature.firmware.ota.withDisconnectTripwire
import kotlin.time.Duration
import kotlin.time.Duration.Companion.milliseconds
import kotlin.time.Duration.Companion.minutes
@@ -119,12 +117,11 @@ class LegacyDfuTransport(
val subscribed = CompletableDeferred<Unit>()
service
.observe(controlChar)
.observe(controlChar) {
Logger.d { "Legacy DFU: Control Point subscribed" }
subscribed.complete(Unit)
}
.onEach { bytes ->
if (!subscribed.isCompleted) {
Logger.d { "Legacy DFU: Control Point subscribed" }
subscribed.complete(Unit)
}
val parsed = LegacyDfuResponse.parse(bytes)
Logger.d { "Legacy DFU: Notification → $parsed" }
notificationChannel.trySend(parsed)
@@ -135,9 +132,9 @@ class LegacyDfuTransport(
}
.launchIn(this)
delay(SUBSCRIPTION_SETTLE)
if (!subscribed.isCompleted) subscribed.complete(Unit)
subscribed.await()
// Conservative settle after CCCD confirmation before issuing commands.
delay(SUBSCRIPTION_SETTLE)
// Best-effort DFU Version read — gate out unsupported old bootloaders (SDK ≤ 6).
val versionChar = service.characteristic(LEGACY_DFU_VERSION_UUID)
@@ -306,60 +303,54 @@ class LegacyDfuTransport(
"(advertised='${dfuAdvertisedName ?: "?"}')"
}
coroutineScope {
// Trip-wire: cancels the streaming coroutine the moment Kable observes a disconnect.
val watcher = launch {
val state = bleConnection.connectionState.first { it is BleConnectionState.Disconnected }
bleConnection.withDisconnectTripwire(
onDrop = { state ->
Logger.w { "Legacy DFU: Link dropped mid-stream at offset $offset/${firmware.size} (state=$state)" }
throw DfuException.ConnectionFailed("BLE link dropped mid-upload at byte $offset/${firmware.size}")
}
try {
var packetsSincePrn = 0
var bytesAtLastPrn = 0L
bleConnection.profile(LegacyDfuUuids.SERVICE, timeout = STREAM_TIMEOUT) { service ->
val packetChar = service.characteristic(LEGACY_DFU_PACKET_UUID)
while (offset < firmware.size) {
val end = minOf(offset + mtu, firmware.size)
try {
service.write(packetChar, firmware.copyOfRange(offset, end), BleWriteType.WITHOUT_RESPONSE)
} catch (e: CancellationException) {
Logger.w(e) {
"Legacy DFU: Write CANCELLED at offset $offset/${firmware.size} cause=${e.cause}"
}
throw e
} catch (@Suppress("TooGenericExceptionCaught") e: Throwable) {
Logger.w(e) { "Legacy DFU: Write FAILED at offset $offset/${firmware.size}: ${e.message}" }
throw e
DfuException.ConnectionFailed("BLE link dropped mid-upload at byte $offset/${firmware.size}")
},
) {
var packetsSincePrn = 0
var bytesAtLastPrn = 0L
bleConnection.profile(LegacyDfuUuids.SERVICE, timeout = STREAM_TIMEOUT) { service ->
val packetChar = service.characteristic(LEGACY_DFU_PACKET_UUID)
while (offset < firmware.size) {
val end = minOf(offset + mtu, firmware.size)
try {
service.write(packetChar, firmware.copyOfRange(offset, end), BleWriteType.WITHOUT_RESPONSE)
} catch (e: CancellationException) {
Logger.w(e) {
"Legacy DFU: Write CANCELLED at offset $offset/${firmware.size} cause=${e.cause}"
}
offset = end
packetsSincePrn++
throw e
} catch (@Suppress("TooGenericExceptionCaught") e: Throwable) {
Logger.w(e) { "Legacy DFU: Write FAILED at offset $offset/${firmware.size}: ${e.message}" }
throw e
}
offset = end
packetsSincePrn++
if (packetsSincePrn >= PRN_INTERVAL_PACKETS && offset < firmware.size) {
Logger.d { "Legacy DFU: Awaiting PRN at offset $offset" }
val receipt =
try {
awaitPacketReceipt()
} catch (e: CancellationException) {
Logger.w(e) {
"Legacy DFU: awaitPacketReceipt CANCELLED at offset $offset cause=${e.cause}"
}
throw e
if (packetsSincePrn >= PRN_INTERVAL_PACKETS && offset < firmware.size) {
Logger.d { "Legacy DFU: Awaiting PRN at offset $offset" }
val receipt =
try {
awaitPacketReceipt()
} catch (e: CancellationException) {
Logger.w(e) {
"Legacy DFU: awaitPacketReceipt CANCELLED at offset $offset cause=${e.cause}"
}
val expected = offset.toLong()
if (receipt.bytesReceived != expected) {
throw LegacyDfuException.PacketReceiptMismatch(expected, receipt.bytesReceived)
throw e
}
bytesAtLastPrn = receipt.bytesReceived
packetsSincePrn = 0
onProgress(offset.toFloat() / firmware.size)
val expected = offset.toLong()
if (receipt.bytesReceived != expected) {
throw LegacyDfuException.PacketReceiptMismatch(expected, receipt.bytesReceived)
}
bytesAtLastPrn = receipt.bytesReceived
packetsSincePrn = 0
onProgress(offset.toFloat() / firmware.size)
}
}
Logger.d { "Legacy DFU: Streamed $offset/${firmware.size} bytes (lastPRN=$bytesAtLastPrn)" }
} finally {
watcher.cancel()
}
Logger.d { "Legacy DFU: Streamed $offset/${firmware.size} bytes (lastPRN=$bytesAtLastPrn)" }
}
}

View File

@@ -25,7 +25,6 @@ import org.koin.core.annotation.Single
import org.meshtastic.core.ble.BleConnectionFactory
import org.meshtastic.core.ble.BleScanner
import org.meshtastic.core.common.util.CommonUri
import org.meshtastic.core.common.util.NumberFormatter
import org.meshtastic.core.common.util.ioDispatcher
import org.meshtastic.core.database.entity.FirmwareRelease
import org.meshtastic.core.di.CoroutineDispatchers
@@ -52,6 +51,8 @@ import org.meshtastic.feature.firmware.FirmwareUpdateState
import org.meshtastic.feature.firmware.ProgressState
import org.meshtastic.feature.firmware.ota.ThroughputTracker
import org.meshtastic.feature.firmware.ota.calculateMacPlusOne
import org.meshtastic.feature.firmware.ota.formatTransferProgress
import org.meshtastic.feature.firmware.ota.retryWithDelay
import org.meshtastic.feature.firmware.ota.scanForBleDevice
import org.meshtastic.feature.firmware.stripFormatArgs
import kotlin.time.Duration.Companion.seconds
@@ -61,7 +62,6 @@ private const val GATT_RELEASE_DELAY_MS = 1_500L
private const val DFU_REBOOT_WAIT_MS = 3_000L
private const val RETRY_DELAY_MS = 2_000L
private const val CONNECT_ATTEMPTS = 4
private const val KIB_DIVISOR = 1024f
/**
* KMP [FirmwareUpdateHandler] for nRF52 devices.
@@ -173,25 +173,10 @@ class SecureDfuHandler(
transport
.transferFirmware(pkg.firmware) { progress ->
val pct = (progress * PERCENT_MAX).toInt()
val bytesSent = (progress * firmwareSize).toLong()
throughputTracker.record(bytesSent)
val bytesPerSecond = throughputTracker.bytesPerSecond()
val speedKib = bytesPerSecond.toFloat() / KIB_DIVISOR
val details = buildString {
append("$pct%")
if (speedKib > 0f) {
val remainingBytes = firmwareSize - bytesSent
val etaSeconds = remainingBytes.toFloat() / bytesPerSecond
append(
" (${NumberFormatter.format(speedKib, 1)} " +
"KiB/s, ETA: ${etaSeconds.toInt()}s)",
)
}
}
val details =
formatTransferProgress(progress, firmwareSize, throughputTracker.bytesPerSecond())
updateState(
FirmwareUpdateState.Updating(
ProgressState(uploadMsg, progress, details, hint = slowHint),
@@ -269,27 +254,34 @@ class SecureDfuHandler(
updateState(
FirmwareUpdateState.Processing(ProgressState(UiText.Resource(Res.string.firmware_update_waiting_reboot))),
)
var lastError: Throwable? = null
for (attempt in 1..CONNECT_ATTEMPTS) {
updateState(
FirmwareUpdateState.Processing(
ProgressState(
UiText.Resource(Res.string.firmware_update_connecting_attempt, attempt, CONNECT_ATTEMPTS),
retryWithDelay(
attempts = CONNECT_ATTEMPTS,
retryDelayMillis = RETRY_DELAY_MS,
onAttempt = { attempt ->
updateState(
FirmwareUpdateState.Processing(
ProgressState(
UiText.Resource(
Res.string.firmware_update_connecting_attempt,
attempt,
CONNECT_ATTEMPTS,
),
),
),
),
)
val result = transport.connectToDfuMode()
if (result.isSuccess) {
return
}
lastError = result.exceptionOrNull()
Logger.w { "DFU: Connect attempt $attempt/$CONNECT_ATTEMPTS failed: ${lastError?.message}" }
if (attempt < CONNECT_ATTEMPTS) delay(RETRY_DELAY_MS)
}
throw DfuException.ConnectionFailed(
"Failed to connect to DFU device after $CONNECT_ATTEMPTS attempts",
lastError,
)
},
block = { attempt ->
transport.connectToDfuMode().onFailure {
Logger.w { "DFU: Connect attempt $attempt/$CONNECT_ATTEMPTS failed: ${it.message}" }
}
},
)
.getOrElse {
throw DfuException.ConnectionFailed(
"Failed to connect to DFU device after $CONNECT_ATTEMPTS attempts",
it,
)
}
}
private suspend fun obtainZipFile(

View File

@@ -34,13 +34,10 @@ import kotlinx.coroutines.SupervisorJob
import kotlinx.coroutines.TimeoutCancellationException
import kotlinx.coroutines.cancel
import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.coroutineScope
import kotlinx.coroutines.delay
import kotlinx.coroutines.flow.catch
import kotlinx.coroutines.flow.first
import kotlinx.coroutines.flow.launchIn
import kotlinx.coroutines.flow.onEach
import kotlinx.coroutines.launch
import kotlinx.coroutines.withTimeout
import org.meshtastic.core.ble.BleConnectionFactory
import org.meshtastic.core.ble.BleConnectionState
@@ -51,7 +48,9 @@ import org.meshtastic.core.ble.DEFAULT_BLE_WRITE_VALUE_LENGTH
import org.meshtastic.core.ble.MeshtasticBleDevice
import org.meshtastic.core.common.util.safeCatching
import org.meshtastic.feature.firmware.ota.calculateMacPlusOne
import org.meshtastic.feature.firmware.ota.receiveWithin
import org.meshtastic.feature.firmware.ota.scanForBleDevice
import org.meshtastic.feature.firmware.ota.withDisconnectTripwire
import kotlin.time.Duration
import kotlin.time.Duration.Companion.milliseconds
import kotlin.time.Duration.Companion.minutes
@@ -253,27 +252,24 @@ class SecureDfuTransport(
bleConnection.profile(SecureDfuUuids.SERVICE) { service ->
val controlChar = service.characteristic(SecureDfuUuids.CONTROL_POINT)
// Subscribe to Control Point notifications before issuing any commands.
// launchIn(this) uses connectionScope so the subscription persists beyond this block.
// Subscribe to Control Point notifications before issuing any commands. onSubscription fires once the CCCD
// write completes; launchIn(this) uses connectionScope so the subscription persists beyond this block.
val subscribed = CompletableDeferred<Unit>()
service
.observe(controlChar)
.onEach { bytes ->
if (!subscribed.isCompleted) {
Logger.d { "DFU: Control Point subscribed" }
subscribed.complete(Unit)
}
notificationChannel.trySend(bytes)
.observe(controlChar) {
Logger.d { "DFU: Control Point subscribed" }
subscribed.complete(Unit)
}
.onEach { bytes -> notificationChannel.trySend(bytes) }
.catch { e ->
if (!subscribed.isCompleted) subscribed.completeExceptionally(e)
Logger.e(e) { "DFU: Control Point notification error" }
}
.launchIn(this)
delay(SUBSCRIPTION_SETTLE)
if (!subscribed.isCompleted) subscribed.complete(Unit)
subscribed.await()
// Conservative settle after CCCD confirmation before issuing commands.
delay(SUBSCRIPTION_SETTLE)
Logger.i { "DFU: Connected and ready (${device.address})" }
}
@@ -515,39 +511,33 @@ class SecureDfuTransport(
val mtu = bleConnection.maximumWriteValueLength(BleWriteType.WITHOUT_RESPONSE) ?: DEFAULT_BLE_WRITE_VALUE_LENGTH
var pos = from
coroutineScope {
// Trip-wire: cancels the streaming coroutine the moment Kable observes a disconnect.
val watcher = launch {
val state = bleConnection.connectionState.first { it is BleConnectionState.Disconnected }
bleConnection.withDisconnectTripwire(
onDrop = { state ->
Logger.w { "Secure DFU: Link dropped mid-stream at offset $pos/$until (state=$state)" }
throw DfuException.ConnectionFailed("BLE link dropped mid-upload at byte $pos/$until")
}
DfuException.ConnectionFailed("BLE link dropped mid-upload at byte $pos/$until")
},
) {
var packetsSincePrn = 0
bleConnection.profile(SecureDfuUuids.SERVICE, timeout = STREAM_TIMEOUT) { service ->
val packetChar = service.characteristic(SecureDfuUuids.PACKET)
while (pos < until) {
val chunkEnd = minOf(pos + mtu, until)
service.write(packetChar, data.copyOfRange(pos, chunkEnd), BleWriteType.WITHOUT_RESPONSE)
pos = chunkEnd
packetsSincePrn++
try {
var packetsSincePrn = 0
bleConnection.profile(SecureDfuUuids.SERVICE, timeout = STREAM_TIMEOUT) { service ->
val packetChar = service.characteristic(SecureDfuUuids.PACKET)
while (pos < until) {
val chunkEnd = minOf(pos + mtu, until)
service.write(packetChar, data.copyOfRange(pos, chunkEnd), BleWriteType.WITHOUT_RESPONSE)
pos = chunkEnd
packetsSincePrn++
if (prnInterval > 0 && packetsSincePrn >= prnInterval && pos < until) {
val response = awaitNotification(COMMAND_TIMEOUT)
if (response is DfuResponse.ChecksumResult) {
val expectedCrc = DfuCrc32.calculate(data, length = pos)
if (response.offset != pos || response.crc32 != expectedCrc) {
throw DfuException.ChecksumMismatch(expected = expectedCrc, actual = response.crc32)
}
Logger.d { "DFU: PRN checksum OK at offset $pos" }
if (prnInterval > 0 && packetsSincePrn >= prnInterval && pos < until) {
val response = awaitNotification(COMMAND_TIMEOUT)
if (response is DfuResponse.ChecksumResult) {
val expectedCrc = DfuCrc32.calculate(data, length = pos)
if (response.offset != pos || response.crc32 != expectedCrc) {
throw DfuException.ChecksumMismatch(expected = expectedCrc, actual = response.crc32)
}
packetsSincePrn = 0
Logger.d { "DFU: PRN checksum OK at offset $pos" }
}
packetsSincePrn = 0
}
}
} finally {
watcher.cancel()
}
}
}
@@ -608,13 +598,12 @@ class SecureDfuTransport(
Logger.d { "DFU: Object executed." }
}
private suspend fun awaitNotification(timeout: Duration): DfuResponse = try {
withTimeout(timeout) {
val bytes = notificationChannel.receive()
DfuResponse.parse(bytes).also { Logger.d { "DFU: Notification → $it" } }
}
} catch (_: TimeoutCancellationException) {
throw DfuException.Timeout("No response from Control Point after $timeout")
private suspend fun awaitNotification(timeout: Duration): DfuResponse {
val bytes =
notificationChannel.receiveWithin(timeout) {
DfuException.Timeout("No response from Control Point after $timeout")
}
return DfuResponse.parse(bytes).also { Logger.d { "DFU: Notification → $it" } }
}
private fun DfuResponse.requireSuccess(expectedOpcode: Byte) {

View File

@@ -0,0 +1,98 @@
/*
* Copyright (c) 2026 Meshtastic LLC
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*/
@file:Suppress("MagicNumber")
package org.meshtastic.feature.firmware.ota
import kotlinx.coroutines.ExperimentalCoroutinesApi
import kotlinx.coroutines.test.runTest
import kotlin.test.Test
import kotlin.test.assertEquals
import kotlin.test.assertSame
import kotlin.test.assertTrue
@OptIn(ExperimentalCoroutinesApi::class)
class FirmwareUpdateHelpersTest {
// ----- formatTransferProgress -----
@Test
fun `formatTransferProgress omits speed when throughput is zero`() {
assertEquals("50%", formatTransferProgress(progress = 0.5f, totalBytes = 1000, bytesPerSecond = 0))
}
@Test
fun `formatTransferProgress omits speed when throughput is non-positive`() {
assertEquals("0%", formatTransferProgress(progress = 0f, totalBytes = 1000, bytesPerSecond = -5))
}
@Test
fun `formatTransferProgress includes KiB per second and ETA`() {
// 50% of 2048 bytes (1024 remaining) at 1024 B/s → 1.0 KiB/s, 1s ETA.
assertEquals(
"50% (1.0 KiB/s, ETA: 1s)",
formatTransferProgress(progress = 0.5f, totalBytes = 2048, bytesPerSecond = 1024),
)
}
// ----- retryWithDelay -----
@Test
fun `retryWithDelay returns first success without further attempts`() = runTest {
var calls = 0
val result =
retryWithDelay(attempts = 3, retryDelayMillis = 1000, onAttempt = {}) {
calls++
Result.success("ok")
}
assertTrue(result.isSuccess)
assertEquals("ok", result.getOrNull())
assertEquals(1, calls)
}
@Test
fun `retryWithDelay retries until success and reports each attempt`() = runTest {
var calls = 0
val attemptsSeen = mutableListOf<Int>()
val result =
retryWithDelay(attempts = 5, retryDelayMillis = 1000, onAttempt = { attemptsSeen += it }) {
calls++
if (calls < 3) Result.failure(RuntimeException("nope $calls")) else Result.success(calls)
}
assertEquals(3, result.getOrNull())
assertEquals(3, calls)
assertEquals(listOf(1, 2, 3), attemptsSeen)
}
@Test
fun `retryWithDelay returns the last failure after exhausting attempts`() = runTest {
var calls = 0
val lastError = RuntimeException("final")
val result =
retryWithDelay<Unit>(attempts = 3, retryDelayMillis = 1000, onAttempt = {}) {
calls++
if (calls < 3) {
Result.failure<Unit>(RuntimeException("attempt $calls"))
} else {
Result.failure<Unit>(lastError)
}
}
assertTrue(result.isFailure)
assertSame(lastError, result.exceptionOrNull())
assertEquals(3, calls)
}
}