talking stick

This commit is contained in:
Jonathan Bennett
2026-05-29 22:18:32 +01:00
parent 134ec6dc54
commit f25e3e893d
3 changed files with 407 additions and 19 deletions

View File

@@ -33,6 +33,17 @@ HEARTBEAT_IDLE_DELAY_SEC = 5.0
HEARTBEAT_REPEAT_SEC = 15.0
HEARTBEAT_POLL_INTERVAL_SEC = 0.25
# --- Half-duplex turn-taking ("talking stick"), carried in RemoteShell.flags ---
# On a 2-party LoRa link, Meshtastic's CSMA-CA collapses when both ends transmit at once
# (synchronized same-slot collisions that CAD can't prevent). These flags let exactly one side
# transmit at a time. The client is the master / idle-owner: it holds the token silently when
# idle and grants it to the server whenever the server may need to respond.
FLAG_GRANT = 0x01 # I am handing you the turn; you may transmit now
FLAG_MORE = 0x02 # I yielded under a budget but still have data queued (grant back promptly)
FLAG_RTS = 0x04 # (server->client) I have output but no turn; please grant me one
CLIENT_TURN_BUDGET = 4 # max frames the client sends per turn before yielding
TURN_RECLAIM_SEC = 8.0 # reclaim the token if a grant goes unanswered this long (anti-deadlock)
def parse_args() -> argparse.Namespace:
parser = argparse.ArgumentParser(
@@ -62,6 +73,19 @@ def parse_args() -> argparse.Namespace:
parser.add_argument("--close-after", type=float, default=2.0, help="seconds to wait before closing in command mode")
parser.add_argument("--timeout", type=float, default=10.0, help="seconds to wait for API/session events")
parser.add_argument("--verbose", action="store_true", help="print extra protocol events")
parser.add_argument(
"--no-turn-taking",
dest="turn_taking",
action="store_false",
help="disable half-duplex turn-taking (revert to free-running send; needed for old peers)",
)
parser.add_argument(
"--turn-budget", type=int, default=CLIENT_TURN_BUDGET, help="max frames to send per turn before yielding"
)
parser.add_argument(
"--turn-reclaim", type=float, default=TURN_RECLAIM_SEC, help="seconds before reclaiming an unanswered turn grant"
)
parser.set_defaults(turn_taking=True)
return parser.parse_args()
@@ -281,6 +305,24 @@ class SentShellFrame:
last_rx_seq: int = 0
@dataclass
class TxIntent:
"""A frame queued for transmission; the sender thread allocates seq and adds turn flags."""
op: int
payload: bytes = b""
cols: int = 0
rows: int = 0
session_id: Optional[int] = None
ack_seq: Optional[int] = None
seq: Optional[int] = None
flags: int = 0
last_tx_seq: int = 0
last_rx_seq: int = 0
remember: bool = True
heartbeat: bool = False
@dataclass
class SessionState:
pb2: object # ProtoModules with mesh and portnums attributes
@@ -310,6 +352,38 @@ class SessionState:
last_transport_activity_time: float = field(default_factory=time.monotonic)
last_heartbeat_sent_time: float = 0.0
# --- Half-duplex turn-taking state ---
turn_taking: bool = True
turn_budget: int = CLIENT_TURN_BUDGET
turn_reclaim_sec: float = TURN_RECLAIM_SEC
has_token: bool = True # client is the master / idle-owner: starts holding the turn
grant_requested: bool = False # peer asked for a turn (FLAG_MORE / FLAG_RTS)
last_grant_time: float = 0.0 # when we last handed the turn to the peer
last_inbound_time: float = 0.0 # when we last heard anything (to detect an unanswered grant)
tx_intents: deque = field(default_factory=deque)
turn_cv: threading.Condition = field(default_factory=threading.Condition)
def enqueue_tx(self, intent: TxIntent) -> None:
with self.turn_cv:
self.tx_intents.append(intent)
self.turn_cv.notify_all()
def note_turn_flags(self, flags: int) -> None:
"""Apply channel-access flags from an inbound frame (called by the reader thread)."""
if not self.turn_taking:
return
with self.turn_cv:
self.last_inbound_time = time.monotonic()
if flags & FLAG_GRANT:
self.has_token = True
if flags & (FLAG_MORE | FLAG_RTS):
self.grant_requested = True
self.turn_cv.notify_all()
def wake_sender(self) -> None:
with self.turn_cv:
self.turn_cv.notify_all()
def alloc_seq(self) -> int:
with self.tx_lock:
value = self.next_seq
@@ -510,10 +584,40 @@ def send_shell_frame(
remember: bool = True,
heartbeat: bool = False,
) -> int:
"""Queue a frame for transmission. The sender thread allocates seq, adds turn flags, and sends.
(transport is unused here — the sender thread owns the transport — but kept for call-site
compatibility.)
"""
state.enqueue_tx(
TxIntent(
op=op,
payload=payload,
cols=cols,
rows=rows,
session_id=session_id,
ack_seq=ack_seq,
seq=seq,
flags=flags,
last_tx_seq=last_tx_seq,
last_rx_seq=last_rx_seq,
remember=remember,
heartbeat=heartbeat,
)
)
return seq if seq is not None else 0
def transmit_frame(transport, state: SessionState, intent: TxIntent) -> int:
"""Actually serialize and send one frame over the API socket. Only the sender thread calls this."""
op = intent.op
seq = intent.seq
if seq is None:
seq = 0 if op == state.pb2.mesh.RemoteShell.ACK else state.alloc_seq()
ack_seq = intent.ack_seq
if ack_seq is None:
ack_seq = state.current_ack_seq()
session_id = intent.session_id
if session_id is None:
session_id = state.session_id
@@ -522,34 +626,117 @@ def send_shell_frame(
shell.session_id = session_id
shell.seq = seq
shell.ack_seq = ack_seq
shell.cols = cols
shell.rows = rows
shell.flags = flags
shell.last_tx_seq = last_tx_seq
shell.last_rx_seq = last_rx_seq
if payload:
shell.payload = payload
shell.cols = intent.cols
shell.rows = intent.rows
shell.flags = intent.flags
shell.last_tx_seq = intent.last_tx_seq
shell.last_rx_seq = intent.last_rx_seq
if intent.payload:
shell.payload = intent.payload
with state.socket_lock:
send_toradio(transport, make_toradio_packet(state.pb2, state, shell))
if remember:
if intent.remember:
state.remember_sent_frame(
SentShellFrame(
op=op,
session_id=session_id,
seq=seq,
ack_seq=ack_seq,
payload=payload,
cols=cols,
rows=rows,
flags=flags,
last_tx_seq=last_tx_seq,
last_rx_seq=last_rx_seq,
payload=intent.payload,
cols=intent.cols,
rows=intent.rows,
flags=intent.flags,
last_tx_seq=intent.last_tx_seq,
last_rx_seq=intent.last_rx_seq,
)
)
state.note_outbound_packet(heartbeat=heartbeat)
state.note_outbound_packet(heartbeat=intent.heartbeat)
return seq
def sender_loop(transport, state: SessionState) -> None:
"""Single transmitter. Sends queued frames only while we hold the turn, then grants it away.
This is what makes the link half-duplex: with turn-taking on, only one side transmits at a
time, eliminating the synchronized collisions that wreck the CSMA-CA scheme on a 2-party link.
"""
while not state.stopped:
burst: list[TxIntent] = []
send_bare_grant = False
more_after = False
with state.turn_cv:
while not state.stopped:
if not state.turn_taking:
if state.tx_intents:
break
state.turn_cv.wait(timeout=1.0)
continue
if state.has_token:
if state.tx_intents or state.grant_requested:
break
# Hold the token and stay silent until we have work or the peer requests a turn.
state.turn_cv.wait(timeout=1.0)
else:
# We granted the turn away. Reclaim it only if we granted a while ago AND the
# link has gone quiet (no inbound) for that long — i.e. the peer isn't mid-turn,
# so either our grant or the peer's yield-grant was lost. Requiring quiet (not
# just grant age) avoids reclaiming mid-stream, which would cause double-talk.
now = time.monotonic()
if (
state.last_grant_time
and (now - state.last_grant_time) > state.turn_reclaim_sec
and (now - state.last_inbound_time) > state.turn_reclaim_sec
):
state.has_token = True
if state.verbose:
state.event_queue.put("turn: reclaimed unanswered grant")
continue
ref = max(state.last_grant_time, state.last_inbound_time)
timeout = max(0.05, state.turn_reclaim_sec - (now - ref)) if ref else state.turn_reclaim_sec
state.turn_cv.wait(timeout=timeout)
if state.stopped:
return
# We hold the turn (or turn-taking is off). Pull a burst of intents to send.
budget = None if not state.turn_taking else max(1, state.turn_budget)
while state.tx_intents and (budget is None or len(burst) < budget):
burst.append(state.tx_intents.popleft())
if state.turn_taking:
more_after = bool(state.tx_intents) # still queued beyond our budget
if burst:
# The burst's last frame grants the turn, satisfying any pending request.
state.grant_requested = False
state.has_token = False
state.last_grant_time = time.monotonic()
elif state.grant_requested:
send_bare_grant = True
state.grant_requested = False
state.has_token = False
state.last_grant_time = time.monotonic()
# Transmit outside the lock (socket I/O may block).
try:
if burst:
n = len(burst)
for i, intent in enumerate(burst):
if state.turn_taking and i == n - 1:
# Last frame of our turn: hand the token back to the peer.
intent.flags |= FLAG_GRANT
if more_after:
intent.flags |= FLAG_MORE
transmit_frame(transport, state, intent)
elif send_bare_grant:
transmit_frame(transport, state, TxIntent(op=state.pb2.mesh.RemoteShell.ACK, seq=0, remember=False, flags=FLAG_GRANT))
except Exception as exc:
if not state.stopped:
state.event_queue.put(f"sender error: {exc}")
state.closed_event.set()
return
def send_ack_frame(transport, state: SessionState, replay_from: Optional[int] = None) -> None:
send_shell_frame(
transport,
@@ -676,6 +863,9 @@ def reader_loop(transport, state: SessionState) -> None:
if not shell:
continue
state.note_inbound_packet()
# Honor channel-access flags (GRANT/MORE/RTS) on every inbound frame, regardless of
# payload ordering, so a granted turn is never lost to a gap.
state.note_turn_flags(shell.flags)
#state.prune_sent_frames(shell.ack_seq)
if shell.op == state.pb2.mesh.RemoteShell.ACK:
#state.event_queue.put("peer requested replay")
@@ -902,6 +1092,9 @@ def main() -> int:
target=parse_node_num(args.to),
channel=args.channel,
verbose=args.verbose,
turn_taking=args.turn_taking,
turn_budget=args.turn_budget,
turn_reclaim_sec=args.turn_reclaim,
)
cols, rows = resolve_initial_terminal_size(args.cols, args.rows)
@@ -913,6 +1106,11 @@ def main() -> int:
reader = threading.Thread(target=reader_loop, args=(transport, state), daemon=True)
reader.start()
# The sender thread owns all transmission and the turn-taking token. Start it before OPEN
# so the queued OPEN goes out (with a GRANT so the server can reply OPEN_OK).
sender = threading.Thread(target=sender_loop, args=(transport, state), daemon=True)
sender.start()
send_shell_frame(transport, state, pb2.mesh.RemoteShell.OPEN, cols=cols, rows=rows)
if not state.opened_event.wait(timeout=args.timeout):
raise SystemExit("timed out waiting for OPEN_OK from remote DMShell")
@@ -927,9 +1125,11 @@ def main() -> int:
run_interactive_mode(transport, state)
state.stopped = True
state.wake_sender()
drain_events(state)
reader.join(timeout=1.0)
heartbeat.join(timeout=1.0)
sender.join(timeout=1.0)
state.close_replay_log()
finally:
transport.close()

View File

@@ -13,6 +13,7 @@
#include "pb_encode.h"
#include <errno.h>
#include <fcntl.h>
#include <poll.h>
#include <pty.h>
#include <signal.h>
#include <stdlib.h>
@@ -29,6 +30,16 @@ namespace
constexpr uint16_t PTY_COLS_DEFAULT = 120;
constexpr uint16_t PTY_ROWS_DEFAULT = 40;
constexpr size_t MAX_MESSAGE_SIZE = 200;
// --- Half-duplex turn-taking ("talking stick") protocol, carried in RemoteShell.flags ---
// On a 2-party LoRa link, Meshtastic's CSMA-CA breaks down when both ends transmit at once
// (synchronized same-slot collisions that CAD can't prevent). These flags let exactly one
// side transmit at a time, eliminating those collisions. The client is the master/idle-owner.
constexpr uint32_t TURN_FLAG_GRANT = 0x01; // I am handing you the turn; you may transmit now
constexpr uint32_t TURN_FLAG_MORE = 0x02; // I yielded under a budget but still have data queued
constexpr uint32_t TURN_FLAG_RTS = 0x04; // I have output but no turn; please grant me one
constexpr size_t TURN_BUDGET_FRAMES = 4; // max output frames per granted turn before yielding
constexpr uint32_t RTS_RETRY_MS = 1000; // min interval between request-to-send frames
} // namespace
DMShellModule::DMShellModule()
@@ -51,8 +62,14 @@ ProcessMessage DMShellModule::handleReceived(const meshtastic_MeshPacket &mp)
}
if (frame.op == meshtastic_RemoteShell_OpCode_ACK) {
if (session.active && frame.session_id == session.sessionId && getFrom(&mp) == session.peer && frame.last_rx_seq > 0) {
resendFramesFrom(frame.last_rx_seq + 1);
if (session.active && frame.session_id == session.sessionId && getFrom(&mp) == session.peer) {
applyTurnFlags(frame);
if (frame.last_rx_seq > 0) {
resendFramesFrom(frame.last_rx_seq + 1);
}
// A standalone grant (client re-granting for MORE, replying to our RTS, or a heartbeat
// poll) is our cue to flush any pending shell output during this turn.
serviceTurn();
}
return ProcessMessage::CONTINUE;
}
@@ -88,7 +105,14 @@ ProcessMessage DMShellModule::handleReceived(const meshtastic_MeshPacket &mp)
return ProcessMessage::STOP;
}
// Honor channel-access flags before ordering checks: a GRANT transfers the turn regardless of
// whether this frame's payload is in order.
applyTurnFlags(frame);
if (!shouldProcessIncomingFrame(frame)) {
// We won't process the payload (gap/duplicate), but we may now hold the turn, so flush
// output and/or hand it back rather than stalling the link.
serviceTurn();
return ProcessMessage::STOP;
}
@@ -98,7 +122,9 @@ ProcessMessage DMShellModule::handleReceived(const meshtastic_MeshPacket &mp)
case meshtastic_RemoteShell_OpCode_INPUT:
if (!writeSessionInput(frame)) {
sendError("input_write_failed");
} else {
} else if (!session.turnManaged) {
// Legacy peer (no turn-taking): echo immediately as before. In managed mode the
// serviceTurn() call at the end of handleReceived drains the echo and yields the turn.
uint8_t outBuf[MAX_MESSAGE_SIZE];
const ssize_t bytesRead = read(session.masterFd, outBuf, sizeof(outBuf));
if (bytesRead > 0) {
@@ -164,6 +190,9 @@ ProcessMessage DMShellModule::handleReceived(const meshtastic_MeshPacket &mp)
break;
}
// If the peer granted us the turn, flush pending shell output and hand the turn back.
serviceTurn();
return ProcessMessage::STOP;
}
@@ -189,6 +218,19 @@ int32_t DMShellModule::runOnce()
return 50;
}
if (session.turnManaged) {
if (session.hasToken) {
// We hold the turn: flush output and hand it back.
serviceTurn();
} else if (ptyHasOutput() && !Throttle::isWithinTimespanMs(session.lastRtsMs, RTS_RETRY_MS)) {
// Unsolicited shell output but no turn: ask the client to grant us one.
sendRts();
session.lastRtsMs = millis();
}
return 50;
}
// Legacy free-send path (peer is not using turn-taking).
uint8_t outBuf[MAX_MESSAGE_SIZE];
while (session.masterFd >= 0) {
const ssize_t bytesRead = read(session.masterFd, outBuf, sizeof(outBuf));
@@ -321,6 +363,12 @@ bool DMShellModule::openSession(const meshtastic_MeshPacket &mp, const meshtasti
session.nextExpectedRxSeq = frame.seq + 1;
session.highestSeenRxSeq = frame.seq;
session.lastActivityMs = millis();
session.turnManaged = false;
session.hasToken = false;
session.lastRtsMs = 0;
// Honor any GRANT the client put on OPEN (opts this session into turn-taking).
applyTurnFlags(frame);
meshtastic_RemoteShell newFrame = {
.op = meshtastic_RemoteShell_OpCode_OPEN_OK,
@@ -329,10 +377,14 @@ bool DMShellModule::openSession(const meshtastic_MeshPacket &mp, const meshtasti
.ack_seq = frame.seq,
.cols = ws.ws_col,
.rows = ws.ws_row,
.flags = 0,
.flags = session.turnManaged ? TURN_FLAG_GRANT : 0u,
};
newFrame.payload.size = 0;
sendFrameToPeer(session.peer, newFrame, true);
if (session.turnManaged) {
// OPEN_OK handed the turn back to the client; it is now the idle-owner.
session.hasToken = false;
}
LOG_INFO("DMShell: opened session=0x%x peer=0x%x pid=%d", session.sessionId, session.peer, session.childPid);
return true;
@@ -579,6 +631,130 @@ void DMShellModule::sendFrameToPeer(NodeNum peer, meshtastic_RemoteShell frame,
service->sendToMesh(packet);
}
void DMShellModule::applyTurnFlags(const meshtastic_RemoteShell &frame)
{
if (frame.flags & (TURN_FLAG_GRANT | TURN_FLAG_MORE | TURN_FLAG_RTS)) {
session.turnManaged = true; // peer speaks turn-taking; enable gating for this session
}
if (frame.flags & TURN_FLAG_GRANT) {
session.hasToken = true;
}
}
bool DMShellModule::ptyHasOutput()
{
if (session.masterFd < 0) {
return false;
}
struct pollfd pfd = {};
pfd.fd = session.masterFd;
pfd.events = POLLIN;
return poll(&pfd, 1, 0) > 0 && (pfd.revents & POLLIN);
}
void DMShellModule::sendOutputFrame(const uint8_t *data, size_t len, uint32_t extraFlags)
{
meshtastic_RemoteShell frame = {
.op = meshtastic_RemoteShell_OpCode_OUTPUT,
.session_id = session.sessionId,
.seq = session.nextTxSeq++,
.ack_seq = session.lastAckedRxSeq,
.cols = 0,
.rows = 0,
.flags = extraFlags,
};
assert(len <= sizeof(frame.payload.bytes));
memcpy(frame.payload.bytes, data, len);
frame.payload.size = len;
sendFrameToPeer(session.peer, frame, true);
}
void DMShellModule::sendTurnGrant(bool more)
{
meshtastic_RemoteShell frame = {
.op = meshtastic_RemoteShell_OpCode_ACK,
.session_id = session.sessionId,
.seq = 0,
.ack_seq = session.lastAckedRxSeq,
.cols = 0,
.rows = 0,
.flags = TURN_FLAG_GRANT | (more ? TURN_FLAG_MORE : 0u),
.last_rx_seq = 0,
};
frame.payload.size = 0;
sendFrameToPeer(session.peer, frame, false);
}
void DMShellModule::sendRts()
{
meshtastic_RemoteShell frame = {
.op = meshtastic_RemoteShell_OpCode_ACK,
.session_id = session.sessionId,
.seq = 0,
.ack_seq = session.lastAckedRxSeq,
.cols = 0,
.rows = 0,
.flags = TURN_FLAG_RTS,
.last_rx_seq = 0,
};
frame.payload.size = 0;
sendFrameToPeer(session.peer, frame, false);
}
// Called when we hold the turn: send up to TURN_BUDGET_FRAMES chunks of shell output, then hand
// the turn back to the client. The grant is piggybacked on the last output frame (or sent as a
// standalone ACK if there was nothing to send), so the token always settles back at the client.
void DMShellModule::serviceTurn()
{
if (!session.active || !session.turnManaged || !session.hasToken) {
return;
}
uint8_t chunks[TURN_BUDGET_FRAMES][MAX_MESSAGE_SIZE];
size_t chunkLen[TURN_BUDGET_FRAMES] = {0};
size_t nChunks = 0;
bool eof = false;
bool readError = false;
while (nChunks < TURN_BUDGET_FRAMES && session.masterFd >= 0) {
const ssize_t n = read(session.masterFd, chunks[nChunks], MAX_MESSAGE_SIZE);
if (n > 0) {
chunkLen[nChunks] = (size_t)n;
nChunks++;
} else if (n == 0) {
eof = true;
break;
} else {
if (errno != EAGAIN && errno != EWOULDBLOCK) {
readError = true;
}
break;
}
}
// If we filled the budget, there may still be more output waiting; ask for a prompt re-grant.
const bool more = (nChunks == TURN_BUDGET_FRAMES) && ptyHasOutput();
session.hasToken = false; // we are handing the turn back below
if (nChunks > 0) {
session.lastActivityMs = millis();
for (size_t i = 0; i < nChunks; i++) {
const uint32_t flags = (i + 1 == nChunks) ? (TURN_FLAG_GRANT | (more ? TURN_FLAG_MORE : 0u)) : 0u;
sendOutputFrame(chunks[i], chunkLen[i], flags);
}
} else {
// Nothing to send: hand the turn straight back so the link goes quiet at the client.
sendTurnGrant(false);
}
if (eof) {
closeSession("pty_eof", true);
} else if (readError) {
LOG_WARN("DMShell: PTY read error errno=%d", errno);
closeSession("pty_read_error", true);
}
}
void DMShellModule::sendError(const char *message, NodeNum peer)
{
const size_t len = strnlen(message, MAX_MESSAGE_SIZE);

View File

@@ -24,6 +24,10 @@ struct DMShellSession {
uint32_t nextExpectedRxSeq = 1;
uint32_t highestSeenRxSeq = 0;
uint32_t lastActivityMs = 0;
// --- Half-duplex turn-taking ("talking stick") state ---
bool turnManaged = false; // becomes true once the peer uses turn-taking flags; enables gating
bool hasToken = false; // do we currently hold the right to transmit?
uint32_t lastRtsMs = 0; // last time we asked the client for a turn (rate-limit)
struct SentFrame {
bool valid = false;
meshtastic_RemoteShell_OpCode op = meshtastic_RemoteShell_OpCode_ERROR;
@@ -70,6 +74,14 @@ class DMShellModule : private concurrency::OSThread, public SinglePortModule
void sendAck(uint32_t replayFromSeq = 0);
void sendFrameToPeer(NodeNum peer, meshtastic_RemoteShell frame, bool remember = true);
void sendError(const char *message, NodeNum peer = 0);
// --- Turn-taking helpers ---
void applyTurnFlags(const meshtastic_RemoteShell &frame);
bool ptyHasOutput();
void serviceTurn();
void sendOutputFrame(const uint8_t *data, size_t len, uint32_t extraFlags);
void sendTurnGrant(bool more);
void sendRts();
};
extern DMShellModule *dmShellModule;