diff --git a/src/configuration.h b/src/configuration.h index a5f2cd9a9..cc623455f 100644 --- a/src/configuration.h +++ b/src/configuration.h @@ -499,6 +499,7 @@ along with this program. If not, see . #define MESHTASTIC_EXCLUDE_REMOTEHARDWARE 1 #define MESHTASTIC_EXCLUDE_STOREFORWARD 1 #define MESHTASTIC_EXCLUDE_TEXTMESSAGE 1 +#define MESHTASTIC_EXCLUDE_TRAFFIC_MANAGEMENT 1 #define MESHTASTIC_EXCLUDE_ATAK 1 #define MESHTASTIC_EXCLUDE_CANNEDMESSAGES 1 #define MESHTASTIC_EXCLUDE_NEIGHBORINFO 1 diff --git a/src/mesh/Default.h b/src/mesh/Default.h index 4cfbdbcc8..f4633790e 100644 --- a/src/mesh/Default.h +++ b/src/mesh/Default.h @@ -30,6 +30,11 @@ #define min_node_info_broadcast_secs 60 * 60 // No regular broadcasts of more than once an hour #define min_neighbor_info_broadcast_secs 4 * 60 * 60 #define default_map_publish_interval_secs 60 * 60 + +// Traffic management defaults +#define default_traffic_mgmt_position_precision_bits 24 // ~10m grid cells +#define default_traffic_mgmt_position_min_interval_secs ONE_DAY // 1 day between identical positions + #ifdef USERPREFS_RINGTONE_NAG_SECS #define default_ringtone_nag_secs USERPREFS_RINGTONE_NAG_SECS #else diff --git a/src/mesh/NextHopRouter.cpp b/src/mesh/NextHopRouter.cpp index 5230e5b85..d686e9056 100644 --- a/src/mesh/NextHopRouter.cpp +++ b/src/mesh/NextHopRouter.cpp @@ -4,6 +4,9 @@ #if !MESHTASTIC_EXCLUDE_TRACEROUTE #include "modules/TraceRouteModule.h" #endif +#if HAS_TRAFFIC_MANAGEMENT +#include "modules/TrafficManagementModule.h" +#endif #include "NodeDB.h" NextHopRouter::NextHopRouter() {} @@ -126,15 +129,28 @@ void NextHopRouter::sniffReceived(const meshtastic_MeshPacket *p, const meshtast /* Check if we should be rebroadcasting this packet if so, do so. */ bool NextHopRouter::perhapsRebroadcast(const meshtastic_MeshPacket *p) { - if (!isToUs(p) && !isFromUs(p) && p->hop_limit > 0) { + // Check if traffic management wants to exhaust this packet's hops + bool exhaustHops = false; +#if HAS_TRAFFIC_MANAGEMENT + if (trafficManagementModule && trafficManagementModule->shouldExhaustHops(*p)) { + exhaustHops = true; + } +#endif + + // Allow rebroadcast if hop_limit > 0 OR if we're exhausting hops (which sets hop_limit = 0 but still needs one relay) + if (!isToUs(p) && !isFromUs(p) && (p->hop_limit > 0 || exhaustHops)) { if (p->id != 0) { if (isRebroadcaster()) { if (p->next_hop == NO_NEXT_HOP_PREFERENCE || p->next_hop == nodeDB->getLastByteOfNodeNum(getNodeNum())) { meshtastic_MeshPacket *tosend = packetPool.allocCopy(*p); // keep a copy because we will be sending it LOG_INFO("Rebroadcast received message coming from %x", p->relay_node); - // Use shared logic to determine if hop_limit should be decremented - if (shouldDecrementHopLimit(p)) { + // If exhausting hops, force hop_limit = 0 regardless of other logic + if (exhaustHops) { + tosend->hop_limit = 0; + LOG_INFO("Traffic management: exhausting hops for 0x%08x, setting hop_limit=0", getFrom(p)); + } else if (shouldDecrementHopLimit(p)) { + // Use shared logic to determine if hop_limit should be decremented tosend->hop_limit--; // bump down the hop count } else { LOG_INFO("favorite-ROUTER/CLIENT_BASE-to-ROUTER/CLIENT_BASE rebroadcast: preserving hop_limit"); diff --git a/src/mesh/PhoneAPI.cpp b/src/mesh/PhoneAPI.cpp index a02f96ac5..7df6720a2 100644 --- a/src/mesh/PhoneAPI.cpp +++ b/src/mesh/PhoneAPI.cpp @@ -449,6 +449,11 @@ size_t PhoneAPI::getFromRadio(uint8_t *buf) fromRadioScratch.moduleConfig.which_payload_variant = meshtastic_ModuleConfig_paxcounter_tag; fromRadioScratch.moduleConfig.payload_variant.paxcounter = moduleConfig.paxcounter; break; + case meshtastic_ModuleConfig_traffic_management_tag: + LOG_DEBUG("Send module config: traffic management"); + fromRadioScratch.moduleConfig.which_payload_variant = meshtastic_ModuleConfig_traffic_management_tag; + fromRadioScratch.moduleConfig.payload_variant.traffic_management = moduleConfig.traffic_management; + break; default: LOG_ERROR("Unknown module config type %d", config_state); } diff --git a/src/mesh/Router.cpp b/src/mesh/Router.cpp index 2e4c4d7d9..db4b88413 100644 --- a/src/mesh/Router.cpp +++ b/src/mesh/Router.cpp @@ -11,6 +11,9 @@ #include "mesh-pb-constants.h" #include "meshUtils.h" #include "modules/RoutingModule.h" +#if HAS_TRAFFIC_MANAGEMENT +#include "modules/TrafficManagementModule.h" +#endif #if !MESHTASTIC_EXCLUDE_MQTT #include "mqtt/MQTT.h" #endif @@ -95,6 +98,20 @@ bool Router::shouldDecrementHopLimit(const meshtastic_MeshPacket *p) return true; } +#if HAS_TRAFFIC_MANAGEMENT + // When router_preserve_hops is enabled, preserve hops for decoded packets that are not + // position or telemetry (those have their own exhaust_hop controls). + if (moduleConfig.has_traffic_management && moduleConfig.traffic_management.enabled && + moduleConfig.traffic_management.router_preserve_hops && p->which_payload_variant == meshtastic_MeshPacket_decoded_tag && + p->decoded.portnum != meshtastic_PortNum_POSITION_APP && p->decoded.portnum != meshtastic_PortNum_TELEMETRY_APP) { + LOG_DEBUG("Router hop preserved: port=%d from=0x%08x (traffic_management)", p->decoded.portnum, getFrom(p)); + if (trafficManagementModule) { + trafficManagementModule->recordRouterHopPreserved(); + } + return false; + } +#endif + // For subsequent hops, check if previous relay is a favorite router // Optimized search for favorite routers with matching last byte // Check ordering optimized for IoT devices (cheapest checks first) diff --git a/src/mesh/mesh-pb-constants.h b/src/mesh/mesh-pb-constants.h index e4f65aa28..eea7d4efc 100644 --- a/src/mesh/mesh-pb-constants.h +++ b/src/mesh/mesh-pb-constants.h @@ -69,6 +69,22 @@ static inline int get_max_num_nodes() /// Max number of channels allowed #define MAX_NUM_CHANNELS (member_size(meshtastic_ChannelFile, channels) / member_size(meshtastic_ChannelFile, channels[0])) +// Traffic Management module configuration +// Enable per-variant by defining HAS_TRAFFIC_MANAGEMENT=1 in variant.h +#ifndef HAS_TRAFFIC_MANAGEMENT +#define HAS_TRAFFIC_MANAGEMENT 0 +#endif + +// Cache size for traffic management (number of nodes to track) +// Can be overridden per-variant based on available memory +#ifndef TRAFFIC_MANAGEMENT_CACHE_SIZE +#if HAS_TRAFFIC_MANAGEMENT +#define TRAFFIC_MANAGEMENT_CACHE_SIZE 1000 +#else +#define TRAFFIC_MANAGEMENT_CACHE_SIZE 0 +#endif +#endif + /// helper function for encoding a record as a protobuf, any failures to encode are fatal and we will panic /// returns the encoded packet size size_t pb_encode_to_bytes(uint8_t *destbuf, size_t destbufsize, const pb_msgdesc_t *fields, const void *src_struct); @@ -90,4 +106,4 @@ bool writecb(pb_ostream_t *stream, const uint8_t *buf, size_t count); */ bool is_in_helper(uint32_t n, const uint32_t *array, pb_size_t count); -#define is_in_repeated(name, n) is_in_helper(n, name, name##_count) \ No newline at end of file +#define is_in_repeated(name, n) is_in_helper(n, name, name##_count) diff --git a/src/modules/AdminModule.cpp b/src/modules/AdminModule.cpp index 8f0296227..6ffde70db 100644 --- a/src/modules/AdminModule.cpp +++ b/src/modules/AdminModule.cpp @@ -1008,6 +1008,11 @@ bool AdminModule::handleSetModuleConfig(const meshtastic_ModuleConfig &c) moduleConfig.statusmessage = c.payload_variant.statusmessage; shouldReboot = false; break; + case meshtastic_ModuleConfig_traffic_management_tag: + LOG_INFO("Set module config: Traffic Management"); + moduleConfig.has_traffic_management = true; + moduleConfig.traffic_management = c.payload_variant.traffic_management; + break; } saveChanges(SEGMENT_MODULECONFIG, shouldReboot); return true; @@ -1193,6 +1198,11 @@ void AdminModule::handleGetModuleConfig(const meshtastic_MeshPacket &req, const res.get_module_config_response.which_payload_variant = meshtastic_ModuleConfig_statusmessage_tag; res.get_module_config_response.payload_variant.statusmessage = moduleConfig.statusmessage; break; + case meshtastic_AdminMessage_ModuleConfigType_TRAFFICMANAGEMENT_CONFIG: + LOG_INFO("Get module config: Traffic Management"); + res.get_module_config_response.which_payload_variant = meshtastic_ModuleConfig_traffic_management_tag; + res.get_module_config_response.payload_variant.traffic_management = moduleConfig.traffic_management; + break; } // NOTE: The phone app needs to know the ls_secsvalue so it can properly expect sleep behavior. diff --git a/src/modules/Modules.cpp b/src/modules/Modules.cpp index 64e90c9c2..d3ab9076d 100644 --- a/src/modules/Modules.cpp +++ b/src/modules/Modules.cpp @@ -38,6 +38,9 @@ #include "modules/PowerStressModule.h" #endif #include "modules/RoutingModule.h" +#if HAS_TRAFFIC_MANAGEMENT && !MESHTASTIC_EXCLUDE_TRAFFIC_MANAGEMENT +#include "modules/TrafficManagementModule.h" +#endif #include "modules/TextMessageModule.h" #if !MESHTASTIC_EXCLUDE_TRACEROUTE #include "modules/TraceRouteModule.h" @@ -120,6 +123,14 @@ void setupModules() #if !MESHTASTIC_EXCLUDE_REPLYBOT new ReplyBotModule(); #endif + +#if HAS_TRAFFIC_MANAGEMENT && !MESHTASTIC_EXCLUDE_TRAFFIC_MANAGEMENT + // Instantiate only when enabled to avoid extra memory use and background work. + if (moduleConfig.has_traffic_management && moduleConfig.traffic_management.enabled) { + trafficManagementModule = new TrafficManagementModule(); + } +#endif + #if !MESHTASTIC_EXCLUDE_ADMIN adminModule = new AdminModule(); #endif diff --git a/src/modules/TrafficManagementModule.cpp b/src/modules/TrafficManagementModule.cpp new file mode 100644 index 000000000..6936ef682 --- /dev/null +++ b/src/modules/TrafficManagementModule.cpp @@ -0,0 +1,1410 @@ +#include "TrafficManagementModule.h" + +#if HAS_TRAFFIC_MANAGEMENT + +#include "Default.h" +#include "MeshService.h" +#include "NodeDB.h" +#include "Router.h" +#include "TypeConversions.h" +#include "concurrency/LockGuard.h" +#include "configuration.h" +#include "mesh-pb-constants.h" +#include "meshUtils.h" +#include +#include + +#define TM_LOG_DEBUG(fmt, ...) LOG_DEBUG("[TM] " fmt, ##__VA_ARGS__) +#define TM_LOG_INFO(fmt, ...) LOG_INFO("[TM] " fmt, ##__VA_ARGS__) +#define TM_LOG_WARN(fmt, ...) LOG_WARN("[TM] " fmt, ##__VA_ARGS__) + +// ============================================================================= +// Anonymous Namespace - Internal Helpers +// ============================================================================= + +namespace +{ + +constexpr uint32_t kMaintenanceIntervalMs = 60 * 1000UL; // Cache cleanup interval +constexpr uint32_t kUnknownResetMs = 60 * 1000UL; // Unknown packet window +constexpr uint8_t kMaxCuckooKicks = 16; // Max displacement chain length + +// NodeInfo direct response: enforced maximum hops by device role +// Both use maxHops logic (respond when hopsAway <= threshold) +// Config value is clamped to these role-based limits +// Note: nodeinfo_direct_response must also be enabled for this to take effect +constexpr uint32_t kRouterDefaultMaxHops = 3; // Routers: max 3 hops (can set lower via config) +constexpr uint32_t kClientDefaultMaxHops = 0; // Clients: direct only (cannot increase) + +/** + * Convert seconds to milliseconds with overflow protection. + */ +uint32_t secsToMs(uint32_t secs) +{ + uint64_t ms = static_cast(secs) * 1000ULL; + if (ms > UINT32_MAX) + return UINT32_MAX; + return static_cast(ms); +} + +/** + * Clamp precision to a valid dedup range. + * Invalid values use the module default precision. + */ +uint8_t sanitizePositionPrecision(uint8_t precision) +{ + if (precision > 0 && precision <= 32) + return precision; + + const uint8_t defaultPrecision = static_cast(default_traffic_mgmt_position_precision_bits); + if (defaultPrecision > 0 && defaultPrecision <= 32) + return defaultPrecision; + + // Someone done messed up if we reach here + return 32; +} + +/** + * Check if a timestamp is within a time window. + * Handles wrap-around correctly using unsigned subtraction. + */ +bool isWithinWindow(uint32_t nowMs, uint32_t startMs, uint32_t intervalMs) +{ + if (intervalMs == 0 || startMs == 0) + return false; + return (nowMs - startMs) < intervalMs; +} + +/** + * Truncate lat/lon to specified precision for position deduplication. + * + * The truncation works by masking off lower bits and rounding to the center + * of the resulting grid cell. This creates a stable truncated value even + * when GPS jitter causes small coordinate changes. + * + * @param value Raw latitude_i or longitude_i from position + * @param precision Number of significant bits to keep (0-32) + * @return Truncated and centered coordinate value + */ +int32_t truncateLatLon(int32_t value, uint8_t precision) +{ + if (precision == 0 || precision >= 32) + return value; + + // Create mask to zero out lower bits + uint32_t mask = UINT32_MAX << (32 - precision); + uint32_t truncated = static_cast(value) & mask; + + // Add half the truncation step to center in the grid cell + truncated += (1u << (31 - precision)); + return static_cast(truncated); +} + +/** + * Saturating increment for uint8_t counters. + * Prevents overflow by capping at UINT8_MAX (255). + */ +inline void saturatingIncrement(uint8_t &counter) +{ + if (counter < UINT8_MAX) + counter++; +} + +/** + * Return a short human-readable name for common port numbers. + * Falls back to "port:" for unknown ports. + */ +const char *portName(int portnum) +{ + switch (portnum) { + case meshtastic_PortNum_TEXT_MESSAGE_APP: + return "text"; + case meshtastic_PortNum_POSITION_APP: + return "position"; + case meshtastic_PortNum_NODEINFO_APP: + return "nodeinfo"; + case meshtastic_PortNum_ROUTING_APP: + return "routing"; + case meshtastic_PortNum_ADMIN_APP: + return "admin"; + case meshtastic_PortNum_TELEMETRY_APP: + return "telemetry"; + case meshtastic_PortNum_TRACEROUTE_APP: + return "traceroute"; + case meshtastic_PortNum_NEIGHBORINFO_APP: + return "neighborinfo"; + case meshtastic_PortNum_STORE_FORWARD_APP: + return "store-forward"; + case meshtastic_PortNum_WAYPOINT_APP: + return "waypoint"; + default: + return nullptr; + } +} + +} // namespace + +// ============================================================================= +// Module Instance +// ============================================================================= + +TrafficManagementModule *trafficManagementModule; + +// ============================================================================= +// Constructor +// ============================================================================= + +TrafficManagementModule::TrafficManagementModule() : MeshModule("TrafficManagement"), concurrency::OSThread("TrafficManagement") +{ + // Module configuration + isPromiscuous = true; // See all packets, not just those addressed to us + encryptedOk = true; // Can process encrypted packets + stats = meshtastic_TrafficManagementStats_init_zero; + + // Initialize rolling epoch for relative timestamps + cacheEpochMs = millis(); + + // Calculate adaptive time resolutions from config (config changes require reboot) + // Resolution = max(60, min(339, interval/2)) for ~24 hour range with good precision + posTimeResolution = calcTimeResolution(Default::getConfiguredOrDefault( + moduleConfig.traffic_management.position_min_interval_secs, default_traffic_mgmt_position_min_interval_secs)); + rateTimeResolution = calcTimeResolution(moduleConfig.traffic_management.rate_limit_window_secs); + unknownTimeResolution = calcTimeResolution(kUnknownResetMs / 1000); // ~5 min default + + const auto &cfg = moduleConfig.traffic_management; + TM_LOG_INFO("Enabled: pos_dedup=%d nodeinfo_resp=%d rate_limit=%d drop_unknown=%d exhaust_telem=%d exhaust_pos=%d " + "preserve_hops=%d", + cfg.position_dedup_enabled, cfg.nodeinfo_direct_response, cfg.rate_limit_enabled, cfg.drop_unknown_enabled, + cfg.exhaust_hop_telemetry, cfg.exhaust_hop_position, cfg.router_preserve_hops); + TM_LOG_DEBUG("Time resolutions: pos=%us, rate=%us, unknown=%us", posTimeResolution, rateTimeResolution, + unknownTimeResolution); + +// Allocate unified cache (10 bytes/entry for all platforms) +#if TRAFFIC_MANAGEMENT_CACHE_SIZE > 0 + const uint16_t allocSize = cacheSize(); + TM_LOG_INFO("Allocating unified cache: %u entries (%u bytes)", allocSize, + static_cast(allocSize * sizeof(UnifiedCacheEntry))); + +#if defined(ARCH_ESP32) && defined(BOARD_HAS_PSRAM) + // ESP32 with PSRAM: prefer PSRAM for large allocations + cache = static_cast(ps_calloc(allocSize, sizeof(UnifiedCacheEntry))); + if (cache) { + cacheFromPsram = true; + } else { + TM_LOG_WARN("PSRAM allocation failed, falling back to heap"); + cache = new UnifiedCacheEntry[allocSize](); + } +#else + // All other platforms: heap allocation + cache = new UnifiedCacheEntry[allocSize](); +#endif + +#endif // TRAFFIC_MANAGEMENT_CACHE_SIZE > 0 + +#if defined(ARCH_ESP32) && defined(BOARD_HAS_PSRAM) + TM_LOG_INFO("Allocating NodeInfo cache: target=%u occupancy=%u%% payload=%u bytes (PSRAM) tags=%u bytes (%u-bit, %u slots, " + "%u buckets x %u)", + static_cast(nodeInfoTargetEntries()), static_cast(nodeInfoTargetOccupancyPercent()), + static_cast(nodeInfoTargetEntries() * sizeof(NodeInfoPayloadEntry)), + static_cast(nodeInfoIndexMetadataBudgetBytes()), static_cast(nodeInfoTagBits()), + static_cast(nodeInfoIndexSlots()), static_cast(nodeInfoBucketCount()), + static_cast(nodeInfoBucketSize())); + + nodeInfoIndex = static_cast(calloc(nodeInfoIndexMetadataBudgetBytes(), sizeof(uint8_t))); + if (!nodeInfoIndex) { + TM_LOG_WARN("NodeInfo index allocation failed; direct responses will fall back to NodeDB"); + } else { + nodeInfoPayload = static_cast(ps_calloc(nodeInfoTargetEntries(), sizeof(NodeInfoPayloadEntry))); + if (nodeInfoPayload) { + nodeInfoPayloadFromPsram = true; + TM_LOG_INFO("NodeInfo bucketed cuckoo cache ready"); + } else { + TM_LOG_WARN("NodeInfo PSRAM payload allocation failed; direct responses will fall back to NodeDB"); + free(nodeInfoIndex); + nodeInfoIndex = nullptr; + } + } +#else + TM_LOG_DEBUG("NodeInfo PSRAM cache not available on this target"); +#endif + + setIntervalFromNow(kMaintenanceIntervalMs); +} + +// Cache may have been allocated via ps_calloc (PSRAM, C allocator) or new[] (heap). +// Must use the matching deallocator: free() for ps_calloc, delete[] for new[]. +TrafficManagementModule::~TrafficManagementModule() +{ +#if TRAFFIC_MANAGEMENT_CACHE_SIZE > 0 + if (cache) { + // Cache may be from ps_calloc (PSRAM, C allocator) or new[] (heap). + // Use the matching deallocator for the allocation source. + if (cacheFromPsram) + free(cache); + else + delete[] cache; + cache = nullptr; + } +#endif + + if (nodeInfoPayload) { + if (nodeInfoPayloadFromPsram) + free(nodeInfoPayload); + else + delete[] nodeInfoPayload; + nodeInfoPayload = nullptr; + } + + if (nodeInfoIndex) { + free(nodeInfoIndex); + nodeInfoIndex = nullptr; + } +} + +// ============================================================================= +// Statistics +// ============================================================================= + +meshtastic_TrafficManagementStats TrafficManagementModule::getStats() const +{ + concurrency::LockGuard guard(&cacheLock); + return stats; +} + +void TrafficManagementModule::resetStats() +{ + concurrency::LockGuard guard(&cacheLock); + stats = meshtastic_TrafficManagementStats_init_zero; +} + +void TrafficManagementModule::recordRouterHopPreserved() +{ + if (!moduleConfig.has_traffic_management || !moduleConfig.traffic_management.enabled) + return; + incrementStat(&stats.router_hops_preserved); +} + +void TrafficManagementModule::incrementStat(uint32_t *field) +{ + concurrency::LockGuard guard(&cacheLock); + (*field)++; +} + +// ============================================================================= +// Cuckoo Hash Table Operations +// ============================================================================= + +/** + * Find an existing entry for the given node. + * + * Cuckoo hashing guarantees that if an entry exists, it's in one of exactly + * two locations: hash1(node) or hash2(node). This provides O(1) lookup. + * + * @param node NodeNum to search for + * @return Pointer to entry if found, nullptr otherwise + */ +TrafficManagementModule::UnifiedCacheEntry *TrafficManagementModule::findEntry(NodeNum node) +{ +#if TRAFFIC_MANAGEMENT_CACHE_SIZE == 0 + (void)node; + return nullptr; +#else + if (!cache || node == 0) + return nullptr; + + // Check primary location + uint16_t h1 = cuckooHash1(node); + if (cache[h1].node == node) + return &cache[h1]; + + // Check alternate location + uint16_t h2 = cuckooHash2(node); + if (cache[h2].node == node) + return &cache[h2]; + + return nullptr; +#endif +} + +/** + * Find or create an entry for the given node using cuckoo hashing. + * + * If the node exists, returns the existing entry. Otherwise, attempts to + * insert a new entry using cuckoo displacement: + * + * 1. Try to insert at h1(node) - if empty, done + * 2. Try to insert at h2(node) - if empty, done + * 3. Kick existing entry from h1 to its alternate location + * 4. Repeat up to kMaxCuckooKicks times + * 5. If cycle detected or max kicks exceeded, evict oldest entry + * + * @param node NodeNum to find or create + * @param isNew Set to true if a new entry was created + * @return Pointer to entry, or nullptr if allocation failed + */ +TrafficManagementModule::UnifiedCacheEntry *TrafficManagementModule::findOrCreateEntry(NodeNum node, bool *isNew) +{ +#if TRAFFIC_MANAGEMENT_CACHE_SIZE == 0 + (void)node; + if (isNew) + *isNew = false; + return nullptr; +#else + if (!cache || node == 0) { + if (isNew) + *isNew = false; + return nullptr; + } + + // Check if entry already exists (O(1) lookup) + uint16_t h1 = cuckooHash1(node); + if (cache[h1].node == node) { + if (isNew) + *isNew = false; + return &cache[h1]; + } + + uint16_t h2 = cuckooHash2(node); + if (cache[h2].node == node) { + if (isNew) + *isNew = false; + return &cache[h2]; + } + + // Entry doesn't exist - try to insert + + // Prefer empty slot at h1 + if (cache[h1].node == 0) { + memset(&cache[h1], 0, sizeof(UnifiedCacheEntry)); + cache[h1].node = node; + if (isNew) + *isNew = true; + return &cache[h1]; + } + + // Try empty slot at h2 + if (cache[h2].node == 0) { + memset(&cache[h2], 0, sizeof(UnifiedCacheEntry)); + cache[h2].node = node; + if (isNew) + *isNew = true; + return &cache[h2]; + } + + // Both slots occupied - perform cuckoo displacement + // Start by kicking entry at h1 to its alternate location + UnifiedCacheEntry displaced = cache[h1]; + memset(&cache[h1], 0, sizeof(UnifiedCacheEntry)); + cache[h1].node = node; + + for (uint8_t kicks = 0; kicks < kMaxCuckooKicks; kicks++) { + // Find alternate location for displaced entry + uint16_t altH1 = cuckooHash1(displaced.node); + uint16_t altH2 = cuckooHash2(displaced.node); + uint16_t altSlot = (altH1 == h1) ? altH2 : altH1; + + if (cache[altSlot].node == 0) { + // Found empty slot - insert displaced entry + cache[altSlot] = displaced; + if (isNew) + *isNew = true; + return &cache[h1]; + } + + // Kick entry from alternate slot + UnifiedCacheEntry temp = cache[altSlot]; + cache[altSlot] = displaced; + displaced = temp; + h1 = altSlot; + } + + // Cuckoo cycle detected or max kicks exceeded. + // The displaced entry has no valid cuckoo slot — drop it to preserve cache integrity. + // Placing it at an arbitrary slot would make it unreachable by findEntry(). + TM_LOG_DEBUG("Cuckoo cycle, evicting node 0x%08x", displaced.node); + + if (isNew) + *isNew = true; + return &cache[cuckooHash1(node)]; +#endif +} + +const TrafficManagementModule::NodeInfoPayloadEntry *TrafficManagementModule::findNodeInfoEntry(NodeNum node) const +{ +#if defined(ARCH_ESP32) && defined(BOARD_HAS_PSRAM) + if (!nodeInfoPayload || !nodeInfoIndex || node == 0) + return nullptr; + + uint16_t payloadIndex = findNodeInfoPayloadIndex(node); + if (payloadIndex >= nodeInfoTargetEntries()) + return nullptr; + + return &nodeInfoPayload[payloadIndex]; +#else + (void)node; + return nullptr; +#endif +} + +uint16_t TrafficManagementModule::encodeNodeInfoTag(uint16_t payloadIndex) const +{ + if (payloadIndex >= nodeInfoTargetEntries()) + return 0; + return static_cast(payloadIndex + 1u); +} + +uint16_t TrafficManagementModule::decodeNodeInfoPayloadIndex(uint16_t tag) const +{ + if (tag == 0 || tag > nodeInfoTargetEntries()) + return UINT16_MAX; + return static_cast(tag - 1u); +} + +uint16_t TrafficManagementModule::getNodeInfoTag(uint16_t slot) const +{ +#if defined(ARCH_ESP32) && defined(BOARD_HAS_PSRAM) + if (!nodeInfoIndex || slot >= nodeInfoIndexSlots()) + return 0; + + const uint32_t bitOffset = static_cast(slot) * nodeInfoTagBits(); + const uint16_t byteOffset = static_cast(bitOffset >> 3); + const uint8_t shift = static_cast(bitOffset & 7u); + uint32_t packed = 0; + + if (byteOffset < nodeInfoIndexMetadataBudgetBytes()) + packed |= static_cast(nodeInfoIndex[byteOffset]); + if (static_cast(byteOffset + 1u) < nodeInfoIndexMetadataBudgetBytes()) + packed |= static_cast(nodeInfoIndex[byteOffset + 1u]) << 8; + if (static_cast(byteOffset + 2u) < nodeInfoIndexMetadataBudgetBytes()) + packed |= static_cast(nodeInfoIndex[byteOffset + 2u]) << 16; + + return static_cast((packed >> shift) & nodeInfoTagMask()); +#else + (void)slot; + return 0; +#endif +} + +void TrafficManagementModule::setNodeInfoTag(uint16_t slot, uint16_t tag) +{ +#if defined(ARCH_ESP32) && defined(BOARD_HAS_PSRAM) + if (!nodeInfoIndex || slot >= nodeInfoIndexSlots()) + return; + + const uint16_t normalizedTag = static_cast(tag & nodeInfoTagMask()); + const uint32_t bitOffset = static_cast(slot) * nodeInfoTagBits(); + const uint16_t byteOffset = static_cast(bitOffset >> 3); + const uint8_t shift = static_cast(bitOffset & 7u); + uint32_t packed = 0; + + if (byteOffset < nodeInfoIndexMetadataBudgetBytes()) + packed |= static_cast(nodeInfoIndex[byteOffset]); + if (static_cast(byteOffset + 1u) < nodeInfoIndexMetadataBudgetBytes()) + packed |= static_cast(nodeInfoIndex[byteOffset + 1u]) << 8; + if (static_cast(byteOffset + 2u) < nodeInfoIndexMetadataBudgetBytes()) + packed |= static_cast(nodeInfoIndex[byteOffset + 2u]) << 16; + + const uint32_t mask = static_cast(nodeInfoTagMask()) << shift; + packed = (packed & ~mask) | ((static_cast(normalizedTag) << shift) & mask); + + if (byteOffset < nodeInfoIndexMetadataBudgetBytes()) + nodeInfoIndex[byteOffset] = static_cast(packed & 0xFFu); + if (static_cast(byteOffset + 1u) < nodeInfoIndexMetadataBudgetBytes()) + nodeInfoIndex[byteOffset + 1u] = static_cast((packed >> 8) & 0xFFu); + if (static_cast(byteOffset + 2u) < nodeInfoIndexMetadataBudgetBytes()) + nodeInfoIndex[byteOffset + 2u] = static_cast((packed >> 16) & 0xFFu); +#else + (void)slot; + (void)tag; +#endif +} + +uint16_t TrafficManagementModule::findNodeInfoPayloadIndex(NodeNum node) const +{ +#if defined(ARCH_ESP32) && defined(BOARD_HAS_PSRAM) + if (!nodeInfoPayload || !nodeInfoIndex || node == 0) + return UINT16_MAX; + + const uint16_t buckets[2] = {nodeInfoHash1(node), nodeInfoHash2(node)}; + + for (uint8_t b = 0; b < 2; b++) { + const uint16_t base = static_cast(buckets[b] * nodeInfoBucketSize()); + for (uint8_t slot = 0; slot < nodeInfoBucketSize(); slot++) { + uint16_t tag = getNodeInfoTag(static_cast(base + slot)); + if (tag == 0) + continue; + + uint16_t payloadIndex = decodeNodeInfoPayloadIndex(tag); + if (payloadIndex >= nodeInfoTargetEntries()) + continue; + + if (nodeInfoPayload[payloadIndex].node == node) + return payloadIndex; + } + } + + return UINT16_MAX; +#else + (void)node; + return UINT16_MAX; +#endif +} + +bool TrafficManagementModule::removeNodeInfoIndexEntry(NodeNum node, uint16_t payloadIndex) +{ +#if defined(ARCH_ESP32) && defined(BOARD_HAS_PSRAM) + if (!nodeInfoIndex || node == 0 || payloadIndex >= nodeInfoTargetEntries()) + return false; + + const uint16_t payloadTag = encodeNodeInfoTag(payloadIndex); + if (payloadTag == 0) + return false; + const uint16_t buckets[2] = {nodeInfoHash1(node), nodeInfoHash2(node)}; + + for (uint8_t b = 0; b < 2; b++) { + const uint16_t base = static_cast(buckets[b] * nodeInfoBucketSize()); + for (uint8_t slot = 0; slot < nodeInfoBucketSize(); slot++) { + const uint16_t indexSlot = static_cast(base + slot); + if (getNodeInfoTag(indexSlot) == payloadTag) { + setNodeInfoTag(indexSlot, 0); + return true; + } + } + } + + return false; +#else + (void)node; + (void)payloadIndex; + return false; +#endif +} + +uint16_t TrafficManagementModule::allocateNodeInfoPayloadSlot() +{ +#if defined(ARCH_ESP32) && defined(BOARD_HAS_PSRAM) + if (!nodeInfoPayload) + return UINT16_MAX; + + for (uint16_t tries = 0; tries < nodeInfoTargetEntries(); tries++) { + uint16_t idx = static_cast((nodeInfoAllocHint + tries) % nodeInfoTargetEntries()); + if (nodeInfoPayload[idx].node == 0) { + nodeInfoAllocHint = static_cast((idx + 1u) % nodeInfoTargetEntries()); + return idx; + } + } +#endif + return UINT16_MAX; +} + +uint16_t TrafficManagementModule::evictNodeInfoPayloadSlot() +{ +#if defined(ARCH_ESP32) && defined(BOARD_HAS_PSRAM) + if (!nodeInfoPayload || !nodeInfoIndex) + return UINT16_MAX; + + for (uint16_t tries = 0; tries < nodeInfoTargetEntries(); tries++) { + uint16_t idx = static_cast(nodeInfoEvictCursor % nodeInfoTargetEntries()); + nodeInfoEvictCursor = static_cast((nodeInfoEvictCursor + 1u) % nodeInfoTargetEntries()); + + NodeNum oldNode = nodeInfoPayload[idx].node; + if (oldNode == 0) + continue; + + removeNodeInfoIndexEntry(oldNode, idx); // best effort; cache tolerates occasional stale miss + nodeInfoPayload[idx].node = 0; + return idx; + } +#endif + return UINT16_MAX; +} + +bool TrafficManagementModule::tryInsertNodeInfoEntryInBucket(uint16_t bucket, uint16_t tag) +{ +#if defined(ARCH_ESP32) && defined(BOARD_HAS_PSRAM) + if (!nodeInfoIndex || !nodeInfoPayload || bucket >= nodeInfoBucketCount() || tag == 0) + return false; + + const uint16_t base = static_cast(bucket * nodeInfoBucketSize()); + for (uint8_t slot = 0; slot < nodeInfoBucketSize(); slot++) { + const uint16_t indexSlot = static_cast(base + slot); + const uint16_t existingTag = getNodeInfoTag(indexSlot); + if (existingTag == 0) { + setNodeInfoTag(indexSlot, tag); + return true; + } + + // Opportunistically reuse stale tags that point at empty/invalid payload slots. + const uint16_t payloadIndex = decodeNodeInfoPayloadIndex(existingTag); + if (payloadIndex >= nodeInfoTargetEntries() || nodeInfoPayload[payloadIndex].node == 0) { + setNodeInfoTag(indexSlot, tag); + return true; + } + } +#else + (void)bucket; + (void)tag; +#endif + return false; +} + +TrafficManagementModule::NodeInfoPayloadEntry *TrafficManagementModule::findOrCreateNodeInfoEntry(NodeNum node, + bool *usedEmptySlot) +{ + if (usedEmptySlot) + *usedEmptySlot = false; + +#if defined(ARCH_ESP32) && defined(BOARD_HAS_PSRAM) + if (!nodeInfoPayload || !nodeInfoIndex || node == 0) + return nullptr; + + uint16_t existing = findNodeInfoPayloadIndex(node); + if (existing < nodeInfoTargetEntries()) + return &nodeInfoPayload[existing]; + + const uint16_t beforeCount = countNodeInfoEntriesLocked(); + + uint16_t payloadIndex = allocateNodeInfoPayloadSlot(); + if (payloadIndex == UINT16_MAX) { + payloadIndex = evictNodeInfoPayloadSlot(); + if (payloadIndex == UINT16_MAX) + return nullptr; + } + + nodeInfoPayload[payloadIndex].node = node; + + // 4-way bucketed cuckoo insertion mirrors Cuckoo Filter practice from + // Fan et al. (CoNEXT 2014): high occupancy with short relocation chains. + uint16_t pending = encodeNodeInfoTag(payloadIndex); + uint16_t h1 = nodeInfoHash1(node); + uint16_t h2 = nodeInfoHash2(node); + + if (!tryInsertNodeInfoEntryInBucket(h1, pending) && !tryInsertNodeInfoEntryInBucket(h2, pending)) { + uint16_t currentBucket = h1; + for (uint8_t kicks = 0; kicks < kMaxCuckooKicks; kicks++) { + const uint16_t base = static_cast(currentBucket * nodeInfoBucketSize()); + const uint16_t kickSlot = static_cast((node + kicks) & (nodeInfoBucketSize() - 1u)); + const uint16_t pos = static_cast(base + kickSlot); + + uint16_t displaced = getNodeInfoTag(pos); + setNodeInfoTag(pos, pending); + pending = displaced; + + uint16_t displacedPayload = decodeNodeInfoPayloadIndex(pending); + if (displacedPayload >= nodeInfoTargetEntries()) { + pending = 0; + break; + } + + NodeNum displacedNode = nodeInfoPayload[displacedPayload].node; + if (displacedNode == 0) { + pending = 0; + break; + } + + uint16_t altH1 = nodeInfoHash1(displacedNode); + uint16_t altH2 = nodeInfoHash2(displacedNode); + uint16_t altBucket = (altH1 == currentBucket) ? altH2 : altH1; + + if (tryInsertNodeInfoEntryInBucket(altBucket, pending)) { + pending = 0; + break; + } + + currentBucket = altBucket; + } + + if (pending != 0) { + uint16_t droppedPayload = decodeNodeInfoPayloadIndex(pending); + if (droppedPayload < nodeInfoTargetEntries()) + nodeInfoPayload[droppedPayload].node = 0; + TM_LOG_DEBUG("NodeInfo bucketed cuckoo overflow, dropped payload idx=%u", + static_cast(droppedPayload < nodeInfoTargetEntries() ? droppedPayload : UINT16_MAX)); + } + } + + uint16_t finalIndex = findNodeInfoPayloadIndex(node); + if (finalIndex >= nodeInfoTargetEntries()) { + // New entry did not survive insertion chain. + if (payloadIndex < nodeInfoTargetEntries() && nodeInfoPayload[payloadIndex].node == node) + nodeInfoPayload[payloadIndex].node = 0; + return nullptr; + } + + if (usedEmptySlot) { + const uint16_t afterCount = countNodeInfoEntriesLocked(); + *usedEmptySlot = afterCount > beforeCount; + } + + return &nodeInfoPayload[finalIndex]; +#else + (void)node; + return nullptr; +#endif +} + +uint16_t TrafficManagementModule::countNodeInfoEntriesLocked() const +{ +#if defined(ARCH_ESP32) && defined(BOARD_HAS_PSRAM) + if (!nodeInfoIndex) + return 0; + + uint16_t count = 0; + for (uint16_t i = 0; i < nodeInfoIndexSlots(); i++) { + if (getNodeInfoTag(i) != 0) + count++; + } + return count; +#else + return 0; +#endif +} + +void TrafficManagementModule::cacheNodeInfoPacket(const meshtastic_MeshPacket &mp) +{ +#if defined(ARCH_ESP32) && defined(BOARD_HAS_PSRAM) + if (!nodeInfoPayload || !nodeInfoIndex || mp.decoded.payload.size == 0) + return; + + meshtastic_User user = meshtastic_User_init_zero; + if (!pb_decode_from_bytes(mp.decoded.payload.bytes, mp.decoded.payload.size, &meshtastic_User_msg, &user)) + return; + + // Normalize user.id to the packet sender's node number. + snprintf(user.id, sizeof(user.id), "!%08x", getFrom(&mp)); + + bool usedEmptySlot = false; + uint16_t cachedCount = 0; + { + concurrency::LockGuard guard(&cacheLock); + NodeInfoPayloadEntry *entry = findOrCreateNodeInfoEntry(getFrom(&mp), &usedEmptySlot); + if (!entry) + return; + + // Cache both payload and response metadata so direct replies can use + // richer context than "just the user protobuf" when PSRAM is present. + // This path is intentionally independent from NodeInfoModule/NodeDB. + entry->user = user; + entry->lastObservedMs = millis(); + entry->lastObservedRxTime = mp.rx_time; + entry->sourceChannel = mp.channel; + entry->hasDecodedBitfield = mp.decoded.has_bitfield; + entry->decodedBitfield = mp.decoded.bitfield; + + if (usedEmptySlot) + cachedCount = countNodeInfoEntriesLocked(); + } + + if (usedEmptySlot) { + TM_LOG_INFO("NodeInfo PSRAM cache entries: %u/%u target (%u packed slots, %u-bit tags, %u-byte DRAM index)", + static_cast(cachedCount), static_cast(nodeInfoTargetEntries()), + static_cast(nodeInfoIndexSlots()), static_cast(nodeInfoTagBits()), + static_cast(nodeInfoIndexMetadataBudgetBytes())); + } +#else + (void)mp; +#endif +} + +// ============================================================================= +// Epoch Management +// ============================================================================= + +/** + * Reset the timestamp epoch when relative offsets approach overflow. + * + * Called when epoch age exceeds ~19 hours (approaching 8-bit minute overflow). + * Invalidates all cached per-node traffic state. + */ +void TrafficManagementModule::resetEpoch(uint32_t nowMs) +{ +#if TRAFFIC_MANAGEMENT_CACHE_SIZE > 0 + TM_LOG_DEBUG("Resetting cache epoch"); + cacheEpochMs = nowMs; + + // Full flush avoids stale dedup identity/counters surviving epoch rollover. + memset(cache, 0, static_cast(cacheSize()) * sizeof(UnifiedCacheEntry)); +#else + (void)nowMs; +#endif +} + +// ============================================================================= +// Position Hash (Compact Mode) +// ============================================================================= + +/** + * Compute 8-bit position fingerprint from truncated lat/lon coordinates. + * + * Unlike a hash, this is deterministic: adjacent grid cells have sequential + * fingerprints, so nearby positions never collide. The fingerprint extracts + * the lower 4 significant bits from each truncated coordinate. + * + * Example with precision=16: + * lat_truncated = 0x12340000 (top 16 bits significant) + * Significant portion = 0x1234, lower 4 bits = 0x4 + * + * fingerprint = (lat_low4 << 4) | lon_low4 = 8 bits total + * + * Collision: Two positions collide only if they differ by a multiple of 16 + * grid cells in BOTH lat and lon dimensions simultaneously - very unlikely + * for typical position update patterns. + * + * @param lat_truncated Precision-truncated latitude + * @param lon_truncated Precision-truncated longitude + * @param precision Number of significant bits (1-32) + * @return 8-bit fingerprint (4 bits lat + 4 bits lon) + */ +uint8_t TrafficManagementModule::computePositionFingerprint(int32_t lat_truncated, int32_t lon_truncated, uint8_t precision) +{ + precision = sanitizePositionPrecision(precision); + + // Guard: if precision < 4, we have fewer bits to work with + // Take min(precision, 4) bits from each coordinate + uint8_t bitsToTake = (precision < 4) ? precision : 4; + + // Shift to move significant bits to bottom, then mask lower bits + // For precision=16: shift by 16 to get the 16 significant bits at bottom + uint8_t shift = 32 - precision; + uint8_t latBits = (static_cast(lat_truncated) >> shift) & ((1u << bitsToTake) - 1); + uint8_t lonBits = (static_cast(lon_truncated) >> shift) & ((1u << bitsToTake) - 1); + + return static_cast((latBits << 4) | lonBits); +} + +// ============================================================================= +// Packet Handling +// ============================================================================= + +// Processing order matters: this module runs BEFORE RoutingModule in the callModules() loop. +// - STOP prevents RoutingModule from calling sniffReceived() → perhapsRebroadcast(), +// so the packet is fully consumed (not forwarded). +// - ignoreRequest suppresses the default "no one responded" NAK for want_response packets. +// - exhaustRequested is set by alterReceived() and checked by perhapsRebroadcast() to +// force hop_limit=0 on the rebroadcast copy, allowing one final relay hop. +ProcessMessage TrafficManagementModule::handleReceived(const meshtastic_MeshPacket &mp) +{ + if (!moduleConfig.has_traffic_management || !moduleConfig.traffic_management.enabled) + return ProcessMessage::CONTINUE; + + ignoreRequest = false; + exhaustRequested = false; // Reset per-packet; may be set by alterReceived() below + exhaustRequestedFrom = 0; + exhaustRequestedId = 0; + incrementStat(&stats.packets_inspected); + + const auto &cfg = moduleConfig.traffic_management; + const uint32_t nowMs = millis(); + + // ------------------------------------------------------------------------- + // Undecoded Packet Handling + // ------------------------------------------------------------------------- + // Packets we can't decode (wrong key, corruption, etc.) may indicate + // a misbehaving node. Track and optionally drop repeat offenders. + + if (mp.which_payload_variant != meshtastic_MeshPacket_decoded_tag) { + if (cfg.drop_unknown_enabled && cfg.unknown_packet_threshold > 0) { + if (shouldDropUnknown(&mp, nowMs)) { + logAction("drop", &mp, "unknown"); + incrementStat(&stats.unknown_packet_drops); + ignoreRequest = true; // Suppress NAK for want_response packets + return ProcessMessage::STOP; // Consumed — will not be rebroadcast + } + } + return ProcessMessage::CONTINUE; + } + + // Learn NodeInfo payloads into the dedicated PSRAM cache. + if (mp.decoded.portnum == meshtastic_PortNum_NODEINFO_APP) + cacheNodeInfoPacket(mp); + + // ------------------------------------------------------------------------- + // NodeInfo Direct Response + // ------------------------------------------------------------------------- + // When we see a unicast NodeInfo request for a node we know about, + // respond directly from cache instead of forwarding the request. + // STOP prevents the request from being rebroadcast toward the target node, + // and our cached response is sent back to the requestor with hop_limit=0. + + if (cfg.nodeinfo_direct_response && mp.decoded.portnum == meshtastic_PortNum_NODEINFO_APP && mp.decoded.want_response && + !isBroadcast(mp.to) && !isToUs(&mp) && !isFromUs(&mp)) { + if (shouldRespondToNodeInfo(&mp, true)) { + meshtastic_User requester = meshtastic_User_init_zero; + if (pb_decode_from_bytes(mp.decoded.payload.bytes, mp.decoded.payload.size, &meshtastic_User_msg, &requester)) { + nodeDB->updateUser(getFrom(&mp), requester, mp.channel); + } + logAction("respond", &mp, "nodeinfo-cache"); + incrementStat(&stats.nodeinfo_cache_hits); + ignoreRequest = true; // We responded; suppress default NAK + return ProcessMessage::STOP; // Consumed — request will not be forwarded + } + } + + // ------------------------------------------------------------------------- + // Position Deduplication + // ------------------------------------------------------------------------- + // Drop position broadcasts that haven't moved significantly since the + // last broadcast from this node. Uses truncated coordinates to ignore + // GPS jitter within the configured precision. + + if (!isFromUs(&mp) && !isToUs(&mp)) { + if (cfg.position_dedup_enabled && mp.decoded.portnum == meshtastic_PortNum_POSITION_APP) { + meshtastic_Position pos = meshtastic_Position_init_zero; + if (pb_decode_from_bytes(mp.decoded.payload.bytes, mp.decoded.payload.size, &meshtastic_Position_msg, &pos)) { + if (shouldDropPosition(&mp, &pos, nowMs)) { + logAction("drop", &mp, "position-dedup"); + incrementStat(&stats.position_dedup_drops); + ignoreRequest = true; // Suppress NAK + return ProcessMessage::STOP; // Consumed — duplicate will not be rebroadcast + } + } + } + + // --------------------------------------------------------------------- + // Rate Limiting + // --------------------------------------------------------------------- + // Throttle nodes sending too many packets within a time window. + // Excludes routing and admin packets which are essential for mesh operation. + + if (cfg.rate_limit_enabled && cfg.rate_limit_window_secs > 0 && cfg.rate_limit_max_packets > 0) { + if (mp.decoded.portnum != meshtastic_PortNum_ROUTING_APP && mp.decoded.portnum != meshtastic_PortNum_ADMIN_APP) { + if (isRateLimited(mp.from, nowMs)) { + logAction("drop", &mp, "rate-limit"); + incrementStat(&stats.rate_limit_drops); + ignoreRequest = true; // Suppress NAK + return ProcessMessage::STOP; // Consumed — throttled packet will not be rebroadcast + } + } + } + } + + return ProcessMessage::CONTINUE; +} + +void TrafficManagementModule::alterReceived(meshtastic_MeshPacket &mp) +{ + if (!moduleConfig.has_traffic_management || !moduleConfig.traffic_management.enabled) + return; + + if (mp.which_payload_variant != meshtastic_MeshPacket_decoded_tag) + return; + + if (isFromUs(&mp)) + return; + + // ------------------------------------------------------------------------- + // Relayed Broadcast Hop Exhaustion + // ------------------------------------------------------------------------- + // For relayed telemetry or position broadcasts from other nodes, optionally + // set hop_limit=0 so they don't propagate further through the mesh. + + const auto &cfg = moduleConfig.traffic_management; + const bool isTelemetry = mp.decoded.portnum == meshtastic_PortNum_TELEMETRY_APP; + const bool isPosition = mp.decoded.portnum == meshtastic_PortNum_POSITION_APP; + const bool shouldExhaust = (isTelemetry && cfg.exhaust_hop_telemetry) || (isPosition && cfg.exhaust_hop_position); + + if (!shouldExhaust || !isBroadcast(mp.to)) + return; + + if (mp.hop_limit > 0) { + const char *reason = isTelemetry ? "exhaust-hop-telemetry" : "exhaust-hop-position"; + logAction("exhaust", &mp, reason); + // Adjust hop_start so downstream nodes compute correct hopsAway (hop_start - hop_limit). + // Without this, hop_limit=0 with original hop_start would show inflated hopsAway. + mp.hop_start = mp.hop_start - mp.hop_limit + 1; + mp.hop_limit = 0; + // Signal perhapsRebroadcast() to allow one final relay with hop_limit=0. + // Without this flag, perhapsRebroadcast() would skip the packet since hop_limit==0. + // The packet-scoped flag is checked in NextHopRouter::perhapsRebroadcast() + // and forces tosend->hop_limit=0, ensuring no further propagation beyond the + // next node. + exhaustRequested = true; + exhaustRequestedFrom = getFrom(&mp); + exhaustRequestedId = mp.id; + incrementStat(&stats.hop_exhausted_packets); + } +} + +// ============================================================================= +// Periodic Maintenance +// ============================================================================= + +int32_t TrafficManagementModule::runOnce() +{ + if (!moduleConfig.has_traffic_management || !moduleConfig.traffic_management.enabled) + return INT32_MAX; + +#if TRAFFIC_MANAGEMENT_CACHE_SIZE > 0 + const uint32_t nowMs = millis(); + + // Check if epoch reset needed (~3.5 hours approaching 8-bit minute overflow) + if (needsEpochReset(nowMs)) { + concurrency::LockGuard guard(&cacheLock); + resetEpoch(nowMs); + return kMaintenanceIntervalMs; + } + + // Calculate TTLs for cache expiration + const uint32_t positionIntervalMs = secsToMs(Default::getConfiguredOrDefault( + moduleConfig.traffic_management.position_min_interval_secs, default_traffic_mgmt_position_min_interval_secs)); + const uint32_t positionTtlMs = positionIntervalMs * 4; + + const uint32_t rateIntervalMs = secsToMs(moduleConfig.traffic_management.rate_limit_window_secs); + const uint32_t rateTtlMs = (rateIntervalMs > 0) ? rateIntervalMs * 2 : (10 * 60 * 1000UL); + + const uint32_t unknownTtlMs = kUnknownResetMs * 5; + + // Sweep cache and clear expired entries + uint16_t activeEntries = 0; + uint16_t expiredEntries = 0; + const uint32_t sweepStartMs = millis(); + + concurrency::LockGuard guard(&cacheLock); + for (uint16_t i = 0; i < cacheSize(); i++) { + if (cache[i].node == 0) + continue; + + bool anyValid = false; + + // Check and clear expired position data + if (cache[i].pos_time != 0) { + uint32_t posTimeMs = fromRelativePosTime(cache[i].pos_time); + if (!isWithinWindow(nowMs, posTimeMs, positionTtlMs)) { + cache[i].pos_fingerprint = 0; + cache[i].pos_time = 0; + } else { + anyValid = true; + } + } + + // Check and clear expired rate limit data + if (cache[i].rate_time != 0) { + uint32_t rateTimeMs = fromRelativeRateTime(cache[i].rate_time); + if (!isWithinWindow(nowMs, rateTimeMs, rateTtlMs)) { + cache[i].rate_count = 0; + cache[i].rate_time = 0; + } else { + anyValid = true; + } + } + + // Check and clear expired unknown tracking data + if (cache[i].unknown_time != 0) { + uint32_t unknownTimeMs = fromRelativeUnknownTime(cache[i].unknown_time); + if (!isWithinWindow(nowMs, unknownTimeMs, unknownTtlMs)) { + cache[i].unknown_count = 0; + cache[i].unknown_time = 0; + } else { + anyValid = true; + } + } + + // If all data expired, free the slot entirely + if (!anyValid) { + memset(&cache[i], 0, sizeof(UnifiedCacheEntry)); + expiredEntries++; + } else { + activeEntries++; + } + } + + TM_LOG_DEBUG("Maintenance: %u active, %u expired, %u/%u slots, %lums elapsed", activeEntries, expiredEntries, + static_cast(activeEntries), static_cast(cacheSize()), + static_cast(millis() - sweepStartMs)); + +#if defined(ARCH_ESP32) && defined(BOARD_HAS_PSRAM) + if (nodeInfoPayload && nodeInfoIndex) { + TM_LOG_DEBUG("NodeInfo PSRAM cache: %u/%u target (%u packed slots, %u buckets, %u-bit tags, %u-byte index)", + static_cast(countNodeInfoEntriesLocked()), static_cast(nodeInfoTargetEntries()), + static_cast(nodeInfoIndexSlots()), static_cast(nodeInfoBucketCount()), + static_cast(nodeInfoTagBits()), static_cast(nodeInfoIndexMetadataBudgetBytes())); + } +#endif + +#endif // TRAFFIC_MANAGEMENT_CACHE_SIZE > 0 + + return kMaintenanceIntervalMs; +} + +// ============================================================================= +// Traffic Management Logic +// ============================================================================= + +bool TrafficManagementModule::shouldDropPosition(const meshtastic_MeshPacket *p, const meshtastic_Position *pos, uint32_t nowMs) +{ +#if TRAFFIC_MANAGEMENT_CACHE_SIZE == 0 + (void)p; + (void)pos; + (void)nowMs; + return false; +#else + if (!pos->has_latitude_i || !pos->has_longitude_i) + return false; + + uint8_t precision = Default::getConfiguredOrDefault(moduleConfig.traffic_management.position_precision_bits, + default_traffic_mgmt_position_precision_bits); + precision = sanitizePositionPrecision(precision); + + const int32_t lat_truncated = truncateLatLon(pos->latitude_i, precision); + const int32_t lon_truncated = truncateLatLon(pos->longitude_i, precision); + const uint8_t fingerprint = computePositionFingerprint(lat_truncated, lon_truncated, precision); + const uint32_t minIntervalMs = secsToMs(Default::getConfiguredOrDefault( + moduleConfig.traffic_management.position_min_interval_secs, default_traffic_mgmt_position_min_interval_secs)); + + bool isNew = false; + concurrency::LockGuard guard(&cacheLock); + UnifiedCacheEntry *entry = findOrCreateEntry(p->from, &isNew); + if (!entry) + return false; + + // Compare fingerprint and check time window + // When minIntervalMs == 0, deduplication is disabled (withinInterval = false means never drop) + const bool hasPositionState = !isNew && entry->pos_time != 0; + const bool samePosition = hasPositionState && entry->pos_fingerprint == fingerprint; + const bool withinInterval = + hasPositionState && (minIntervalMs != 0) && isWithinWindow(nowMs, fromRelativePosTime(entry->pos_time), minIntervalMs); + + TM_LOG_DEBUG("Position dedup 0x%08x: fp=0x%02x prev=0x%02x same=%d within=%d new=%d", p->from, fingerprint, + entry->pos_fingerprint, samePosition, withinInterval, isNew); + + // Update cache entry + entry->pos_fingerprint = fingerprint; + entry->pos_time = toRelativePosTime(nowMs); + + // Drop only if same position AND within the minimum interval + return samePosition && withinInterval; +#endif +} + +bool TrafficManagementModule::shouldRespondToNodeInfo(const meshtastic_MeshPacket *p, bool sendResponse) +{ + // Caller already verified: nodeinfo_direct_response, portnum, want_response, + // !isBroadcast, !isToUs, !isFromUs + + if (!isMinHopsFromRequestor(p)) + return false; + + meshtastic_User cachedUser = meshtastic_User_init_zero; + bool hasCachedUser = false; + + // Extra metadata consumed only by the PSRAM-backed cache path. + // Defaults preserve previous behavior when cache metadata is unavailable. + bool cachedHasDecodedBitfield = false; + uint8_t cachedDecodedBitfield = 0; + uint8_t cachedSourceChannel = 0; + uint32_t cachedLastObservedMs = 0; + uint32_t cachedLastObservedRxTime = 0; + + { + concurrency::LockGuard guard(&cacheLock); + const NodeInfoPayloadEntry *entry = findNodeInfoEntry(p->to); + if (entry) { + cachedUser = entry->user; + hasCachedUser = true; + cachedHasDecodedBitfield = entry->hasDecodedBitfield; + cachedDecodedBitfield = entry->decodedBitfield; + cachedSourceChannel = entry->sourceChannel; + cachedLastObservedMs = entry->lastObservedMs; + cachedLastObservedRxTime = entry->lastObservedRxTime; + } + } + + if (!hasCachedUser) { + // If the PSRAM cache exists but misses, we intentionally do not fall back + // to the node-wide table. This keeps the PSRAM direct-reply path separate + // from NodeInfoModule/NodeDB behavior when PSRAM is available. + if (nodeInfoPayload && nodeInfoIndex) { + TM_LOG_DEBUG("NodeInfo PSRAM cache miss for node=0x%08x", p->to); + return false; + } + + // Fallback only when PSRAM cache is unavailable on this target. + // In this mode we use the node-wide table maintained by NodeInfoModule. + const meshtastic_NodeInfoLite *node = nodeDB->getMeshNode(p->to); + if (!node || !node->has_user) + return false; + cachedUser = TypeConversions::ConvertToUser(node->num, node->user); + } + + if (!sendResponse) + return true; + + meshtastic_MeshPacket *reply = router->allocForSending(); + if (!reply) { + TM_LOG_WARN("NodeInfo direct response dropped: no packet buffer"); + return false; + } + + reply->decoded.portnum = meshtastic_PortNum_NODEINFO_APP; + reply->decoded.payload.size = + pb_encode_to_bytes(reply->decoded.payload.bytes, sizeof(reply->decoded.payload.bytes), &meshtastic_User_msg, &cachedUser); + reply->decoded.want_response = false; + + // Start from cached bitfield metadata when available. This lets direct + // responses preserve more of the original packet semantics (PSRAM path), + // while still enforcing local policy for OK_TO_MQTT below. + if (cachedHasDecodedBitfield) + reply->decoded.bitfield = cachedDecodedBitfield; + else + reply->decoded.bitfield = 0; + + // Respect the node-wide config_ok_to_mqtt setting for direct NodeInfo replies. + // This response is spoofed from another node, so Router::perhapsEncode() + // will not auto-populate the bitfield via config_ok_to_mqtt for us. + reply->decoded.has_bitfield = true; + // Update only the OK_TO_MQTT bit; keep any other cached bits intact. + reply->decoded.bitfield &= ~BITFIELD_OK_TO_MQTT_MASK; + if (config.lora.config_ok_to_mqtt) + reply->decoded.bitfield |= BITFIELD_OK_TO_MQTT_MASK; + + if (hasCachedUser && cachedLastObservedMs != 0) { + uint32_t ageMs = millis() - cachedLastObservedMs; + TM_LOG_DEBUG("NodeInfo PSRAM hit node=0x%08x age=%lu ms src_ch=%u req_ch=%u rx_time=%lu", p->to, + static_cast(ageMs), static_cast(cachedSourceChannel), + static_cast(p->channel), static_cast(cachedLastObservedRxTime)); + } + + // Spoof the sender as the target node so the requestor sees a valid NodeInfo response. + // hop_limit=0 ensures this reply travels only one hop (direct to requestor). + reply->from = p->to; + reply->to = getFrom(p); + reply->channel = p->channel; + reply->decoded.request_id = p->id; + reply->hop_limit = 0; + // hop_start=0 is set explicitly because Router::send() only sets it for isFromUs(), + // and our spoofed from means isFromUs() is false. + reply->hop_start = 0; + reply->next_hop = nodeDB->getLastByteOfNodeNum(getFrom(p)); + reply->priority = meshtastic_MeshPacket_Priority_DEFAULT; + + service->sendToMesh(reply); + return true; +} + +bool TrafficManagementModule::isMinHopsFromRequestor(const meshtastic_MeshPacket *p) const +{ + int8_t hopsAway = getHopsAway(*p, -1); + if (hopsAway < 0) + return false; + + // Both routers and clients use maxHops logic (respond when hopsAway <= threshold) + // Role determines the maximum allowed value (enforced limit, not just default) + bool isRouter = IS_ONE_OF(config.device.role, meshtastic_Config_DeviceConfig_Role_ROUTER, + meshtastic_Config_DeviceConfig_Role_ROUTER_LATE, meshtastic_Config_DeviceConfig_Role_CLIENT_BASE); + + uint32_t roleLimit = isRouter ? kRouterDefaultMaxHops : kClientDefaultMaxHops; + uint32_t configValue = moduleConfig.traffic_management.nodeinfo_direct_response_max_hops; + + // Use config value if set, otherwise use role default, but always clamp to role limit + uint32_t maxHops = (configValue > 0) ? configValue : roleLimit; + if (maxHops > roleLimit) + maxHops = roleLimit; + + bool result = static_cast(hopsAway) <= maxHops; + TM_LOG_DEBUG("NodeInfo hops check: hopsAway=%d maxHops=%u roleLimit=%u isRouter=%d -> %s", hopsAway, maxHops, roleLimit, + isRouter, result ? "respond" : "skip"); + return result; +} + +bool TrafficManagementModule::isRateLimited(NodeNum from, uint32_t nowMs) +{ +#if TRAFFIC_MANAGEMENT_CACHE_SIZE == 0 + (void)from; + (void)nowMs; + return false; +#else + const uint32_t windowMs = secsToMs(moduleConfig.traffic_management.rate_limit_window_secs); + if (windowMs == 0 || moduleConfig.traffic_management.rate_limit_max_packets == 0) + return false; + + bool isNew = false; + concurrency::LockGuard guard(&cacheLock); + UnifiedCacheEntry *entry = findOrCreateEntry(from, &isNew); + if (!entry) + return false; + + // Check if window has expired + if (isNew || !isWithinWindow(nowMs, fromRelativeRateTime(entry->rate_time), windowMs)) { + entry->rate_time = toRelativeRateTime(nowMs); + entry->rate_count = 1; + return false; + } + + // Increment counter (saturates at 255) + saturatingIncrement(entry->rate_count); + + // Check against threshold (uint8_t max is 255, but config is uint32_t) + uint32_t threshold = moduleConfig.traffic_management.rate_limit_max_packets; + if (threshold > 255) + threshold = 255; + + bool limited = entry->rate_count > threshold; + if (limited || entry->rate_count == threshold) { + TM_LOG_DEBUG("Rate limit 0x%08x: count=%u threshold=%u -> %s", from, entry->rate_count, threshold, + limited ? "DROP" : "at-limit"); + } + return limited; +#endif +} + +bool TrafficManagementModule::shouldDropUnknown(const meshtastic_MeshPacket *p, uint32_t nowMs) +{ +#if TRAFFIC_MANAGEMENT_CACHE_SIZE == 0 + (void)p; + (void)nowMs; + return false; +#else + if (!moduleConfig.traffic_management.drop_unknown_enabled || moduleConfig.traffic_management.unknown_packet_threshold == 0) + return false; + + uint32_t windowMs = kUnknownResetMs; + if (moduleConfig.traffic_management.rate_limit_window_secs > 0) + windowMs = secsToMs(moduleConfig.traffic_management.rate_limit_window_secs); + + bool isNew = false; + concurrency::LockGuard guard(&cacheLock); + UnifiedCacheEntry *entry = findOrCreateEntry(p->from, &isNew); + if (!entry) + return false; + + // Check if window has expired + if (isNew || !isWithinWindow(nowMs, fromRelativeUnknownTime(entry->unknown_time), windowMs)) { + entry->unknown_time = toRelativeUnknownTime(nowMs); + entry->unknown_count = 0; + } + + // Increment counter (saturates at 255) + saturatingIncrement(entry->unknown_count); + + // Check against threshold + uint32_t threshold = moduleConfig.traffic_management.unknown_packet_threshold; + if (threshold > 255) + threshold = 255; + + bool drop = entry->unknown_count > threshold; + if (drop || entry->unknown_count == threshold) { + TM_LOG_DEBUG("Unknown packets 0x%08x: count=%u threshold=%u -> %s", p->from, entry->unknown_count, threshold, + drop ? "DROP" : "at-limit"); + } + return drop; +#endif +} + +void TrafficManagementModule::logAction(const char *action, const meshtastic_MeshPacket *p, const char *reason) const +{ + if (p->which_payload_variant == meshtastic_MeshPacket_decoded_tag) { + const char *name = portName(p->decoded.portnum); + if (name) { + TM_LOG_INFO("%s %s from=0x%08x to=0x%08x hop=%d/%d reason=%s", action, name, getFrom(p), p->to, p->hop_limit, + p->hop_start, reason); + } else { + TM_LOG_INFO("%s port=%d from=0x%08x to=0x%08x hop=%d/%d reason=%s", action, p->decoded.portnum, getFrom(p), p->to, + p->hop_limit, p->hop_start, reason); + } + } else { + TM_LOG_INFO("%s encrypted from=0x%08x to=0x%08x hop=%d/%d reason=%s", action, getFrom(p), p->to, p->hop_limit, + p->hop_start, reason); + } +} + +#endif diff --git a/src/modules/TrafficManagementModule.h b/src/modules/TrafficManagementModule.h new file mode 100644 index 000000000..fe3483a8e --- /dev/null +++ b/src/modules/TrafficManagementModule.h @@ -0,0 +1,434 @@ +#pragma once + +#include "MeshModule.h" +#include "concurrency/Lock.h" +#include "concurrency/OSThread.h" +#include "mesh/generated/meshtastic/mesh.pb.h" +#include "mesh/generated/meshtastic/telemetry.pb.h" + +#if HAS_TRAFFIC_MANAGEMENT + +/** + * TrafficManagementModule - Packet inspection and traffic shaping for mesh networks. + * + * This module provides: + * - Position deduplication (drop redundant position broadcasts) + * - Per-node rate limiting (throttle chatty nodes) + * - Unknown packet filtering (drop undecoded packets from repeat offenders) + * - NodeInfo direct response (answer queries from cache to reduce mesh chatter) + * - Local-only telemetry/position (exhaust hop_limit for local broadcasts) + * - Router hop preservation (maintain hop_limit for router-to-router traffic) + * + * Memory Optimization: + * Uses a unified cache with cuckoo hashing for O(1) lookups and 56% memory reduction + * compared to separate per-feature caches. Timestamps are stored as 8-bit relative + * offsets from a rolling epoch to further reduce memory footprint. + */ +class TrafficManagementModule : public MeshModule, private concurrency::OSThread +{ + public: + TrafficManagementModule(); + ~TrafficManagementModule(); + + // Singleton — no copying or moving + TrafficManagementModule(const TrafficManagementModule &) = delete; + TrafficManagementModule &operator=(const TrafficManagementModule &) = delete; + + meshtastic_TrafficManagementStats getStats() const; + void resetStats(); + void recordRouterHopPreserved(); + + /** + * Check if this packet should have its hops exhausted. + * Called from perhapsRebroadcast() to force hop_limit = 0 regardless of + * router_preserve_hops or favorite node logic. + */ + bool shouldExhaustHops(const meshtastic_MeshPacket &mp) const + { + return exhaustRequested && exhaustRequestedFrom == getFrom(&mp) && exhaustRequestedId == mp.id; + } + + protected: + ProcessMessage handleReceived(const meshtastic_MeshPacket &mp) override; + bool wantPacket(const meshtastic_MeshPacket *p) override { return true; } + void alterReceived(meshtastic_MeshPacket &mp) override; + int32_t runOnce() override; + // Protected so test shims can force epoch rollover behavior. + void resetEpoch(uint32_t nowMs); + + private: + // ========================================================================= + // Unified Cache Entry (10 bytes) - Same for ALL platforms + // ========================================================================= + // + // A single compact structure used across ESP32, NRF52, and all other platforms. + // Memory: 10 bytes × 2048 entries = 20KB + // + // Position Fingerprinting: + // Instead of storing full coordinates (8 bytes) or a computed hash, + // we store an 8-bit fingerprint derived deterministically from the + // truncated lat/lon. This extracts the lower 4 significant bits from + // each coordinate: fingerprint = (lat_low4 << 4) | lon_low4 + // + // Benefits over hash: + // - Adjacent grid cells have sequential fingerprints (no collision) + // - Two positions only collide if 16+ grid cells apart in BOTH dimensions + // - Deterministic: same input always produces same output + // + // Adaptive Timestamp Resolution: + // All timestamps use 8-bit values with adaptive resolution calculated + // from config at startup. Resolution = max(60, min(339, interval/2)). + // - Min 60 seconds ensures reasonable precision + // - Max 339 seconds allows ~24 hour range (255 * 339 = 86445 sec) + // - interval/2 ensures at least 2 ticks per configured interval + // + // Layout: + // [0-3] node - NodeNum (4 bytes) + // [4] pos_fingerprint - 4 bits lat + 4 bits lon (1 byte) + // [5] rate_count - Packets in current window (1 byte) + // [6] unknown_count - Unknown packets count (1 byte) + // [7] pos_time - Position timestamp (1 byte, adaptive resolution) + // [8] rate_time - Rate window start (1 byte, adaptive resolution) + // [9] unknown_time - Unknown tracking start (1 byte, adaptive resolution) + // + struct __attribute__((packed)) UnifiedCacheEntry { + NodeNum node; // 4 bytes - Node identifier (0 = empty slot) + uint8_t pos_fingerprint; // 1 byte - Lower 4 bits of lat + lon + uint8_t rate_count; // 1 byte - Packet count (saturates at 255) + uint8_t unknown_count; // 1 byte - Unknown packet count (saturates at 255) + uint8_t pos_time; // 1 byte - Position timestamp (adaptive resolution) + uint8_t rate_time; // 1 byte - Rate window start (adaptive resolution) + uint8_t unknown_time; // 1 byte - Unknown tracking start (adaptive resolution) + }; + static_assert(sizeof(UnifiedCacheEntry) == 10, "UnifiedCacheEntry should be 10 bytes"); + + // ========================================================================= + // Cuckoo Hash Table Implementation + // ========================================================================= + // + // Cuckoo hashing provides O(1) worst-case lookup time using two hash functions. + // Each key can be in one of two possible locations (h1 or h2). On collision, + // the existing entry is "kicked" to its alternate location. + // + // Benefits over linear scan: + // - O(1) lookup vs O(n) - critical at packet processing rates + // - O(1) insertion (amortized) with simple eviction on cycles + // - ~95% load factor achievable + // + // Cache size rounds to power-of-2 for fast modulo via bitmask. + // TRAFFIC_MANAGEMENT_CACHE_SIZE=2000 → cacheSize()=2048 + // + static constexpr uint16_t cacheSize(); + static constexpr uint16_t cacheMask(); + + // Hash functions for cuckoo hashing + inline uint16_t cuckooHash1(NodeNum node) const { return node & cacheMask(); } + inline uint16_t cuckooHash2(NodeNum node) const { return ((node * 2654435769u) >> (32 - cuckooHashBits())) & cacheMask(); } + static constexpr uint8_t cuckooHashBits(); + + // NodeInfo cache configuration (PSRAM path): + // - Payload lives in PSRAM + // - DRAM keeps packed 12-bit tags with 4-way bucketed cuckoo hashing + // (Fan et al., CoNEXT 2014). Tag value 0 is reserved as "empty". + static constexpr uint16_t kNodeInfoIndexMetadataBudgetBytes = 3072; // 3KB DRAM tag store + static constexpr uint8_t kNodeInfoTargetOccupancyPercent = 95; + static constexpr uint8_t kNodeInfoBucketSize = 4; + static constexpr uint8_t kNodeInfoTagBits = 12; + static constexpr uint16_t kNodeInfoTagMask = static_cast((1u << kNodeInfoTagBits) - 1u); + static constexpr uint16_t kNodeInfoIndexSlotsRaw = + static_cast((kNodeInfoIndexMetadataBudgetBytes * 8u) / kNodeInfoTagBits); + static constexpr uint16_t kNodeInfoIndexSlots = + static_cast(kNodeInfoIndexSlotsRaw - (kNodeInfoIndexSlotsRaw % kNodeInfoBucketSize)); + static constexpr uint16_t kNodeInfoTargetEntries = + static_cast((kNodeInfoIndexSlots * kNodeInfoTargetOccupancyPercent) / 100u); + static_assert((kNodeInfoIndexSlots % kNodeInfoBucketSize) == 0, "NodeInfo slot count must align to bucket size"); + static_assert(kNodeInfoTargetEntries < (1u << kNodeInfoTagBits), "NodeInfo tag bits must encode payload index"); + + static constexpr uint16_t nodeInfoTargetEntries(); + static constexpr uint16_t nodeInfoIndexMetadataBudgetBytes(); + static constexpr uint8_t nodeInfoTargetOccupancyPercent(); + static constexpr uint8_t nodeInfoBucketSize(); + static constexpr uint8_t nodeInfoTagBits(); + static constexpr uint16_t nodeInfoTagMask(); + static constexpr uint16_t nodeInfoIndexSlots(); + static constexpr uint16_t nodeInfoBucketCount(); + static constexpr uint16_t nodeInfoBucketMask(); + static constexpr uint8_t nodeInfoBucketHashBits(); + inline uint16_t nodeInfoHash1(NodeNum node) const { return node & nodeInfoBucketMask(); } + inline uint16_t nodeInfoHash2(NodeNum node) const + { + return ((node * 2246822519u) >> (32 - nodeInfoBucketHashBits())) & nodeInfoBucketMask(); + } + + // ========================================================================= + // Adaptive Timestamp Resolution + // ========================================================================= + // + // All timestamps use 8-bit values with adaptive resolution calculated from + // config at startup. This allows ~24 hour range while maintaining precision. + // + // Resolution formula: max(60, min(339, interval/2)) + // - 60 sec minimum ensures reasonable precision + // - 339 sec maximum allows 24 hour range (255 * 339 ≈ 86400 sec) + // - interval/2 ensures at least 2 ticks per configured interval + // + // Since config changes require reboot, resolution is calculated once. + // + uint32_t cacheEpochMs = 0; + uint16_t posTimeResolution = 60; // Seconds per tick for position + uint16_t rateTimeResolution = 60; // Seconds per tick for rate limiting + uint16_t unknownTimeResolution = 60; // Seconds per tick for unknown tracking + + // Calculate resolution from configured interval (called once at startup) + static uint16_t calcTimeResolution(uint32_t intervalSecs) + { + // Resolution = interval/2 to ensure at least 2 ticks per interval + // Clamped to [60, 339] for min precision and max 24h range + uint32_t res = (intervalSecs > 0) ? (intervalSecs / 2) : 60; + if (res < 60) + res = 60; + if (res > 339) + res = 339; + return static_cast(res); + } + + // Convert to/from 8-bit relative timestamps with given resolution + uint8_t toRelativeTime(uint32_t nowMs, uint16_t resolutionSecs) const + { + uint32_t ticks = (nowMs - cacheEpochMs) / (resolutionSecs * 1000UL); + return (ticks > UINT8_MAX) ? UINT8_MAX : static_cast(ticks); + } + uint32_t fromRelativeTime(uint8_t ticks, uint16_t resolutionSecs) const + { + return cacheEpochMs + (static_cast(ticks) * resolutionSecs * 1000UL); + } + + // Convenience wrappers for each timestamp type + uint8_t toRelativePosTime(uint32_t nowMs) const { return toRelativeTime(nowMs, posTimeResolution); } + uint32_t fromRelativePosTime(uint8_t t) const { return fromRelativeTime(t, posTimeResolution); } + + uint8_t toRelativeRateTime(uint32_t nowMs) const { return toRelativeTime(nowMs, rateTimeResolution); } + uint32_t fromRelativeRateTime(uint8_t t) const { return fromRelativeTime(t, rateTimeResolution); } + + uint8_t toRelativeUnknownTime(uint32_t nowMs) const { return toRelativeTime(nowMs, unknownTimeResolution); } + uint32_t fromRelativeUnknownTime(uint8_t t) const { return fromRelativeTime(t, unknownTimeResolution); } + + // Epoch reset when any timestamp approaches overflow + // With max resolution of 339 sec, 200 ticks = ~19 hours (safe margin for 24h max) + bool needsEpochReset(uint32_t nowMs) const + { + uint16_t maxRes = posTimeResolution; + if (rateTimeResolution > maxRes) + maxRes = rateTimeResolution; + if (unknownTimeResolution > maxRes) + maxRes = unknownTimeResolution; + return (nowMs - cacheEpochMs) > (200UL * maxRes * 1000UL); + } + // ========================================================================= + // Position Fingerprint + // ========================================================================= + // + // Computes 8-bit fingerprint from truncated lat/lon coordinates. + // Extracts lower 4 significant bits from each coordinate. + // + // fingerprint = (lat_low4 << 4) | lon_low4 + // + // Unlike a hash, adjacent grid cells have sequential fingerprints, + // so nearby positions never collide. Collisions only occur for + // positions 16+ grid cells apart in both dimensions. + // + // Guards: If precision < 4 bits, uses min(precision, 4) bits. + // + static uint8_t computePositionFingerprint(int32_t lat_truncated, int32_t lon_truncated, uint8_t precision); + + // ========================================================================= + // Cache Storage + // ========================================================================= + + mutable concurrency::Lock cacheLock; // Protects all cache access + UnifiedCacheEntry *cache = nullptr; // Cuckoo hash table (unified for all platforms) + bool cacheFromPsram = false; // Tracks allocator for correct deallocation + + struct NodeInfoPayloadEntry { + // Node identifier associated with this payload slot. + // 0 means the slot is currently unused. + NodeNum node; + + // Cached NODEINFO_APP payload body. This is separate from NodeDB and is only + // used by the PSRAM-backed direct-response path in this module. + meshtastic_User user; + + // Extra response metadata captured from the latest observed NODEINFO_APP + // packet for this node. shouldRespondToNodeInfo() uses this metadata when + // building spoofed replies for requesting clients. + + // Last local uptime tick (millis) when this entry was refreshed. + uint32_t lastObservedMs; + + // Last RTC/packet timestamp (seconds) observed for this NodeInfo frame. + // If unavailable in packet, remains 0. + uint32_t lastObservedRxTime; + + // Channel where we most recently heard this node's NodeInfo. + uint8_t sourceChannel; + + // Cached decoded bitfield metadata from the source packet. + // We preserve non-OK_TO_MQTT bits in direct replies when available. + bool hasDecodedBitfield; + uint8_t decodedBitfield; + }; + + NodeInfoPayloadEntry *nodeInfoPayload = nullptr; // NodeInfo payloads in PSRAM + bool nodeInfoPayloadFromPsram = false; // Tracks allocator for correct deallocation + uint8_t *nodeInfoIndex = nullptr; // Packed 12-bit NodeInfo tags in DRAM + uint16_t nodeInfoAllocHint = 0; + uint16_t nodeInfoEvictCursor = 0; + + meshtastic_TrafficManagementStats stats; + + // Flag set during alterReceived() when packet should be exhausted. + // Checked by perhapsRebroadcast() to force hop_limit = 0 only for the + // matching packet key (from + id). Reset at start of handleReceived(). + bool exhaustRequested = false; + NodeNum exhaustRequestedFrom = 0; + PacketId exhaustRequestedId = 0; + + // ========================================================================= + // Cache Operations + // ========================================================================= + + // Find or create entry for node using cuckoo hashing + // Returns nullptr if cache is full and eviction fails + UnifiedCacheEntry *findOrCreateEntry(NodeNum node, bool *isNew); + + // Find existing entry (no creation) + UnifiedCacheEntry *findEntry(NodeNum node); + + // NodeInfo cache operations (bucketed cuckoo index + PSRAM payloads) + const NodeInfoPayloadEntry *findNodeInfoEntry(NodeNum node) const; + NodeInfoPayloadEntry *findOrCreateNodeInfoEntry(NodeNum node, bool *usedEmptySlot); + uint16_t findNodeInfoPayloadIndex(NodeNum node) const; + bool removeNodeInfoIndexEntry(NodeNum node, uint16_t payloadIndex); + uint16_t allocateNodeInfoPayloadSlot(); + uint16_t evictNodeInfoPayloadSlot(); + bool tryInsertNodeInfoEntryInBucket(uint16_t bucket, uint16_t tag); + uint16_t encodeNodeInfoTag(uint16_t payloadIndex) const; + uint16_t decodeNodeInfoPayloadIndex(uint16_t tag) const; + uint16_t getNodeInfoTag(uint16_t slot) const; + void setNodeInfoTag(uint16_t slot, uint16_t tag); + uint16_t countNodeInfoEntriesLocked() const; + void cacheNodeInfoPacket(const meshtastic_MeshPacket &mp); + + // ========================================================================= + // Traffic Management Logic + // ========================================================================= + + bool shouldDropPosition(const meshtastic_MeshPacket *p, const meshtastic_Position *pos, uint32_t nowMs); + bool shouldRespondToNodeInfo(const meshtastic_MeshPacket *p, bool sendResponse); + bool isMinHopsFromRequestor(const meshtastic_MeshPacket *p) const; + bool isRateLimited(NodeNum from, uint32_t nowMs); + bool shouldDropUnknown(const meshtastic_MeshPacket *p, uint32_t nowMs); + + void logAction(const char *action, const meshtastic_MeshPacket *p, const char *reason) const; + void incrementStat(uint32_t *field); +}; + +// ========================================================================= +// Compile-time Cache Size Calculations +// ========================================================================= +// +// Round TRAFFIC_MANAGEMENT_CACHE_SIZE up to next power of 2 for efficient +// cuckoo hash indexing (allows bitmask instead of modulo). +// +// These use C++11-compatible constexpr (single return statement). +// + +namespace detail +{ +// Helper: round up to next power of 2 using bit manipulation +constexpr uint16_t nextPow2(uint16_t n) +{ + return n == 0 ? 0 : (((n - 1) | ((n - 1) >> 1) | ((n - 1) >> 2) | ((n - 1) >> 4) | ((n - 1) >> 8)) + 1); +} + +// Helper: floor(log2(n)) for n >= 0, C++11-compatible constexpr. +constexpr uint8_t log2Floor(uint16_t n) +{ + return n <= 1 ? 0 : static_cast(1 + log2Floor(static_cast(n >> 1))); +} + +// Helper: ceil(log2(n)) for n >= 1, C++11-compatible constexpr. +constexpr uint8_t log2Ceil(uint16_t n) +{ + return n <= 1 ? 0 : static_cast(1 + log2Floor(static_cast(n - 1))); +} +} // namespace detail + +constexpr uint16_t TrafficManagementModule::cacheSize() +{ + return detail::nextPow2(TRAFFIC_MANAGEMENT_CACHE_SIZE); +} + +constexpr uint16_t TrafficManagementModule::cacheMask() +{ + return cacheSize() > 0 ? cacheSize() - 1 : 0; +} + +constexpr uint8_t TrafficManagementModule::cuckooHashBits() +{ + return detail::log2Floor(cacheSize()); +} + +constexpr uint16_t TrafficManagementModule::nodeInfoTargetEntries() +{ + return kNodeInfoTargetEntries; +} + +constexpr uint16_t TrafficManagementModule::nodeInfoIndexMetadataBudgetBytes() +{ + return kNodeInfoIndexMetadataBudgetBytes; +} + +constexpr uint8_t TrafficManagementModule::nodeInfoTargetOccupancyPercent() +{ + return kNodeInfoTargetOccupancyPercent; +} + +constexpr uint8_t TrafficManagementModule::nodeInfoBucketSize() +{ + return kNodeInfoBucketSize; +} + +constexpr uint8_t TrafficManagementModule::nodeInfoTagBits() +{ + return kNodeInfoTagBits; +} + +constexpr uint16_t TrafficManagementModule::nodeInfoTagMask() +{ + return kNodeInfoTagMask; +} + +constexpr uint16_t TrafficManagementModule::nodeInfoIndexSlots() +{ + return kNodeInfoIndexSlots; +} + +constexpr uint16_t TrafficManagementModule::nodeInfoBucketCount() +{ + return static_cast(nodeInfoIndexSlots() / nodeInfoBucketSize()); +} + +constexpr uint16_t TrafficManagementModule::nodeInfoBucketMask() +{ + return nodeInfoBucketCount() > 0 ? nodeInfoBucketCount() - 1 : 0; +} + +constexpr uint8_t TrafficManagementModule::nodeInfoBucketHashBits() +{ + return detail::log2Floor(nodeInfoBucketCount()); +} + +extern TrafficManagementModule *trafficManagementModule; + +#endif diff --git a/test/test_traffic_management/test_main.cpp b/test/test_traffic_management/test_main.cpp new file mode 100644 index 000000000..ec54f2312 --- /dev/null +++ b/test/test_traffic_management/test_main.cpp @@ -0,0 +1,1160 @@ +#include "TestUtil.h" +#include + +#if defined(ARCH_PORTDUINO) +#define TM_TEST_ENTRY extern "C" +#else +#define TM_TEST_ENTRY +#endif + +#if HAS_TRAFFIC_MANAGEMENT + +#include "mesh/CryptoEngine.h" +#include "mesh/MeshService.h" +#include "mesh/NodeDB.h" +#include "mesh/Router.h" +#include "modules/TrafficManagementModule.h" +#include +#include +#include +#include +#include + +namespace +{ + +constexpr NodeNum kLocalNode = 0x11111111; +constexpr NodeNum kRemoteNode = 0x22222222; +constexpr NodeNum kTargetNode = 0x33333333; + +class MockNodeDB : public NodeDB +{ + public: + meshtastic_NodeInfoLite *getMeshNode(NodeNum n) override + { + if (hasCachedNode && n == cachedNodeNum) + return &cachedNode; + return NodeDB::getMeshNode(n); + } + + void clearCachedNode() + { + hasCachedNode = false; + cachedNodeNum = 0; + cachedNode = meshtastic_NodeInfoLite_init_zero; + } + + void setCachedNode(NodeNum n) + { + clearCachedNode(); + hasCachedNode = true; + cachedNodeNum = n; + cachedNode.num = n; + cachedNode.has_user = true; + } + + private: + bool hasCachedNode = false; + NodeNum cachedNodeNum = 0; + meshtastic_NodeInfoLite cachedNode = meshtastic_NodeInfoLite_init_zero; +}; + +class MockRadioInterface : public RadioInterface +{ + public: + ErrorCode send(meshtastic_MeshPacket *p) override + { + packetPool.release(p); + return ERRNO_OK; + } + + uint32_t getPacketTime(uint32_t totalPacketLen, bool received = false) override + { + (void)totalPacketLen; + (void)received; + return 0; + } +}; + +class MockRouter : public Router +{ + public: + ~MockRouter() + { + // Router allocates a global crypt lock in its constructor. + // Clean it up here so each test can build a fresh mock router. + delete cryptLock; + cryptLock = nullptr; + } + + ErrorCode send(meshtastic_MeshPacket *p) override + { + sentPackets.push_back(*p); + packetPool.release(p); + return ERRNO_OK; + } + + std::vector sentPackets; +}; + +class TrafficManagementModuleTestShim : public TrafficManagementModule +{ + public: + using TrafficManagementModule::alterReceived; + using TrafficManagementModule::handleReceived; + using TrafficManagementModule::resetEpoch; + using TrafficManagementModule::runOnce; + + bool ignoreRequestFlag() const { return ignoreRequest; } +}; + +MockNodeDB *mockNodeDB = nullptr; + +static void resetTrafficConfig() +{ + moduleConfig = meshtastic_LocalModuleConfig_init_zero; + moduleConfig.has_traffic_management = true; + moduleConfig.traffic_management = meshtastic_ModuleConfig_TrafficManagementConfig_init_zero; + moduleConfig.traffic_management.enabled = true; + + config = meshtastic_LocalConfig_init_zero; + config.device.role = meshtastic_Config_DeviceConfig_Role_CLIENT; + + myNodeInfo.my_node_num = kLocalNode; + + router = nullptr; + service = nullptr; + + mockNodeDB->resetNodes(); + mockNodeDB->clearCachedNode(); + nodeDB = mockNodeDB; +} + +static meshtastic_MeshPacket makeDecodedPacket(meshtastic_PortNum port, NodeNum from, NodeNum to = NODENUM_BROADCAST) +{ + meshtastic_MeshPacket packet = meshtastic_MeshPacket_init_zero; + packet.from = from; + packet.to = to; + packet.id = 0x1001; + packet.channel = 0; + packet.hop_start = 3; + packet.hop_limit = 3; + packet.which_payload_variant = meshtastic_MeshPacket_decoded_tag; + packet.decoded.portnum = port; + packet.decoded.has_bitfield = true; + packet.decoded.bitfield = 0; + return packet; +} + +static meshtastic_MeshPacket makeUnknownPacket(NodeNum from, NodeNum to = NODENUM_BROADCAST) +{ + meshtastic_MeshPacket packet = meshtastic_MeshPacket_init_zero; + packet.from = from; + packet.to = to; + packet.id = 0x2001; + packet.channel = 0; + packet.hop_start = 3; + packet.hop_limit = 3; + packet.which_payload_variant = meshtastic_MeshPacket_encrypted_tag; + packet.encrypted.size = 0; + return packet; +} + +static meshtastic_MeshPacket makePositionPacket(NodeNum from, int32_t lat, int32_t lon, NodeNum to = NODENUM_BROADCAST) +{ + meshtastic_MeshPacket packet = makeDecodedPacket(meshtastic_PortNum_POSITION_APP, from, to); + meshtastic_Position pos = meshtastic_Position_init_zero; + pos.has_latitude_i = true; + pos.has_longitude_i = true; + pos.latitude_i = lat; + pos.longitude_i = lon; + + packet.decoded.payload.size = + pb_encode_to_bytes(packet.decoded.payload.bytes, sizeof(packet.decoded.payload.bytes), &meshtastic_Position_msg, &pos); + return packet; +} + +static meshtastic_MeshPacket makeNodeInfoPacket(NodeNum from, const char *longName, const char *shortName) +{ + meshtastic_MeshPacket packet = makeDecodedPacket(meshtastic_PortNum_NODEINFO_APP, from, NODENUM_BROADCAST); + + meshtastic_User user = meshtastic_User_init_zero; + snprintf(user.id, sizeof(user.id), "!%08x", from); + strncpy(user.long_name, longName, sizeof(user.long_name) - 1); + strncpy(user.short_name, shortName, sizeof(user.short_name) - 1); + + packet.decoded.payload.size = + pb_encode_to_bytes(packet.decoded.payload.bytes, sizeof(packet.decoded.payload.bytes), &meshtastic_User_msg, &user); + return packet; +} + +/** + * Verify the module is a no-op when traffic management is disabled. + * Important so config toggles cannot accidentally change routing behavior. + */ +static void test_tm_moduleDisabled_doesNothing(void) +{ + moduleConfig.has_traffic_management = false; + TrafficManagementModuleTestShim module; + meshtastic_MeshPacket packet = makeDecodedPacket(meshtastic_PortNum_TEXT_MESSAGE_APP, kRemoteNode); + + ProcessMessage result = module.handleReceived(packet); + meshtastic_TrafficManagementStats stats = module.getStats(); + + TEST_ASSERT_EQUAL_INT(static_cast(ProcessMessage::CONTINUE), static_cast(result)); + TEST_ASSERT_EQUAL_UINT32(0, stats.packets_inspected); + TEST_ASSERT_EQUAL_UINT32(0, stats.unknown_packet_drops); + TEST_ASSERT_FALSE(module.ignoreRequestFlag()); +} + +/** + * Verify unknown-packet dropping uses N+1 threshold semantics. + * Important to catch off-by-one regressions in drop decisions. + */ +static void test_tm_unknownPackets_dropOnNPlusOne(void) +{ + moduleConfig.traffic_management.drop_unknown_enabled = true; + moduleConfig.traffic_management.unknown_packet_threshold = 2; + TrafficManagementModuleTestShim module; + meshtastic_MeshPacket packet = makeUnknownPacket(kRemoteNode); + + ProcessMessage r1 = module.handleReceived(packet); + ProcessMessage r2 = module.handleReceived(packet); + ProcessMessage r3 = module.handleReceived(packet); + meshtastic_TrafficManagementStats stats = module.getStats(); + + TEST_ASSERT_EQUAL_INT(static_cast(ProcessMessage::CONTINUE), static_cast(r1)); + TEST_ASSERT_EQUAL_INT(static_cast(ProcessMessage::CONTINUE), static_cast(r2)); + TEST_ASSERT_EQUAL_INT(static_cast(ProcessMessage::STOP), static_cast(r3)); + TEST_ASSERT_EQUAL_UINT32(1, stats.unknown_packet_drops); + TEST_ASSERT_EQUAL_UINT32(3, stats.packets_inspected); + TEST_ASSERT_TRUE(module.ignoreRequestFlag()); +} + +/** + * Verify duplicate position broadcasts inside the dedup window are dropped. + * Important because this is the primary airtime-saving behavior. + */ +static void test_tm_positionDedup_dropsDuplicateWithinWindow(void) +{ + moduleConfig.traffic_management.position_dedup_enabled = true; + moduleConfig.traffic_management.position_precision_bits = 16; + moduleConfig.traffic_management.position_min_interval_secs = 300; + TrafficManagementModuleTestShim module; + + meshtastic_MeshPacket first = makePositionPacket(kRemoteNode, 374221234, -1220845678); + meshtastic_MeshPacket second = makePositionPacket(kRemoteNode, 374221234, -1220845678); + + ProcessMessage r1 = module.handleReceived(first); + ProcessMessage r2 = module.handleReceived(second); + meshtastic_TrafficManagementStats stats = module.getStats(); + + TEST_ASSERT_GREATER_THAN_UINT32(0, first.decoded.payload.size); + TEST_ASSERT_EQUAL_INT(static_cast(ProcessMessage::CONTINUE), static_cast(r1)); + TEST_ASSERT_EQUAL_INT(static_cast(ProcessMessage::STOP), static_cast(r2)); + TEST_ASSERT_EQUAL_UINT32(1, stats.position_dedup_drops); + TEST_ASSERT_TRUE(module.ignoreRequestFlag()); +} + +/** + * Verify changed coordinates are forwarded even with dedup enabled. + * Important so real movement updates are never suppressed as duplicates. + */ +static void test_tm_positionDedup_allowsMovedPosition(void) +{ + moduleConfig.traffic_management.position_dedup_enabled = true; + moduleConfig.traffic_management.position_precision_bits = 16; + moduleConfig.traffic_management.position_min_interval_secs = 300; + TrafficManagementModuleTestShim module; + + meshtastic_MeshPacket first = makePositionPacket(kRemoteNode, 374221234, -1220845678); + meshtastic_MeshPacket moved = makePositionPacket(kRemoteNode, 384221234, -1210845678); + + ProcessMessage r1 = module.handleReceived(first); + ProcessMessage r2 = module.handleReceived(moved); + meshtastic_TrafficManagementStats stats = module.getStats(); + + TEST_ASSERT_EQUAL_INT(static_cast(ProcessMessage::CONTINUE), static_cast(r1)); + TEST_ASSERT_EQUAL_INT(static_cast(ProcessMessage::CONTINUE), static_cast(r2)); + TEST_ASSERT_EQUAL_UINT32(0, stats.position_dedup_drops); +} + +/** + * Verify rate limiting drops only after exceeding the configured threshold. + * Important to protect threshold semantics from off-by-one regressions. + */ +static void test_tm_rateLimit_dropsOnlyAfterThreshold(void) +{ + moduleConfig.traffic_management.rate_limit_enabled = true; + moduleConfig.traffic_management.rate_limit_window_secs = 60; + moduleConfig.traffic_management.rate_limit_max_packets = 3; + TrafficManagementModuleTestShim module; + meshtastic_MeshPacket packet = makeDecodedPacket(meshtastic_PortNum_TEXT_MESSAGE_APP, kRemoteNode); + + ProcessMessage r1 = module.handleReceived(packet); + ProcessMessage r2 = module.handleReceived(packet); + ProcessMessage r3 = module.handleReceived(packet); + ProcessMessage r4 = module.handleReceived(packet); + meshtastic_TrafficManagementStats stats = module.getStats(); + + TEST_ASSERT_EQUAL_INT(static_cast(ProcessMessage::CONTINUE), static_cast(r1)); + TEST_ASSERT_EQUAL_INT(static_cast(ProcessMessage::CONTINUE), static_cast(r2)); + TEST_ASSERT_EQUAL_INT(static_cast(ProcessMessage::CONTINUE), static_cast(r3)); + TEST_ASSERT_EQUAL_INT(static_cast(ProcessMessage::STOP), static_cast(r4)); + TEST_ASSERT_EQUAL_UINT32(1, stats.rate_limit_drops); + TEST_ASSERT_TRUE(module.ignoreRequestFlag()); +} + +/** + * Verify routing/admin traffic is exempt from rate limiting. + * Important because throttling control traffic can destabilize the mesh. + */ +static void test_tm_rateLimit_skipsRoutingAndAdminPorts(void) +{ + moduleConfig.traffic_management.rate_limit_enabled = true; + moduleConfig.traffic_management.rate_limit_window_secs = 60; + moduleConfig.traffic_management.rate_limit_max_packets = 1; + TrafficManagementModuleTestShim module; + meshtastic_MeshPacket routingPacket = makeDecodedPacket(meshtastic_PortNum_ROUTING_APP, kRemoteNode); + meshtastic_MeshPacket adminPacket = makeDecodedPacket(meshtastic_PortNum_ADMIN_APP, kRemoteNode); + + for (int i = 0; i < 4; i++) { + ProcessMessage rr = module.handleReceived(routingPacket); + ProcessMessage ar = module.handleReceived(adminPacket); + TEST_ASSERT_EQUAL_INT(static_cast(ProcessMessage::CONTINUE), static_cast(rr)); + TEST_ASSERT_EQUAL_INT(static_cast(ProcessMessage::CONTINUE), static_cast(ar)); + } + + meshtastic_TrafficManagementStats stats = module.getStats(); + TEST_ASSERT_EQUAL_UINT32(0, stats.rate_limit_drops); +} + +/** + * Verify packets sourced from this node bypass dedup and rate limiting. + * Important so local transmissions are not accidentally self-throttled. + */ +static void test_tm_fromUs_bypassesPositionAndRateFilters(void) +{ + moduleConfig.traffic_management.position_dedup_enabled = true; + moduleConfig.traffic_management.position_precision_bits = 16; + moduleConfig.traffic_management.position_min_interval_secs = 300; + moduleConfig.traffic_management.rate_limit_enabled = true; + moduleConfig.traffic_management.rate_limit_window_secs = 60; + moduleConfig.traffic_management.rate_limit_max_packets = 1; + TrafficManagementModuleTestShim module; + + meshtastic_MeshPacket positionPacket = makePositionPacket(kLocalNode, 374221234, -1220845678); + meshtastic_MeshPacket textPacket = makeDecodedPacket(meshtastic_PortNum_TEXT_MESSAGE_APP, kLocalNode); + + ProcessMessage p1 = module.handleReceived(positionPacket); + ProcessMessage p2 = module.handleReceived(positionPacket); + ProcessMessage t1 = module.handleReceived(textPacket); + ProcessMessage t2 = module.handleReceived(textPacket); + + meshtastic_TrafficManagementStats stats = module.getStats(); + TEST_ASSERT_EQUAL_INT(static_cast(ProcessMessage::CONTINUE), static_cast(p1)); + TEST_ASSERT_EQUAL_INT(static_cast(ProcessMessage::CONTINUE), static_cast(p2)); + TEST_ASSERT_EQUAL_INT(static_cast(ProcessMessage::CONTINUE), static_cast(t1)); + TEST_ASSERT_EQUAL_INT(static_cast(ProcessMessage::CONTINUE), static_cast(t2)); + TEST_ASSERT_EQUAL_UINT32(0, stats.position_dedup_drops); + TEST_ASSERT_EQUAL_UINT32(0, stats.rate_limit_drops); +} + +/** + * Verify locally addressed packets are never dropped by transit shaping. + * Important so dedup/rate limiting do not suppress end-user delivery. + */ +static void test_tm_localDestination_bypassesTransitFilters(void) +{ + moduleConfig.traffic_management.position_dedup_enabled = true; + moduleConfig.traffic_management.position_precision_bits = 16; + moduleConfig.traffic_management.position_min_interval_secs = 300; + moduleConfig.traffic_management.rate_limit_enabled = true; + moduleConfig.traffic_management.rate_limit_window_secs = 60; + moduleConfig.traffic_management.rate_limit_max_packets = 1; + TrafficManagementModuleTestShim module; + + meshtastic_MeshPacket position1 = makePositionPacket(kRemoteNode, 374221234, -1220845678, kLocalNode); + meshtastic_MeshPacket position2 = makePositionPacket(kRemoteNode, 374221234, -1220845678, kLocalNode); + meshtastic_MeshPacket text1 = makeDecodedPacket(meshtastic_PortNum_TEXT_MESSAGE_APP, kRemoteNode, kLocalNode); + meshtastic_MeshPacket text2 = makeDecodedPacket(meshtastic_PortNum_TEXT_MESSAGE_APP, kRemoteNode, kLocalNode); + + ProcessMessage p1 = module.handleReceived(position1); + ProcessMessage p2 = module.handleReceived(position2); + ProcessMessage t1 = module.handleReceived(text1); + ProcessMessage t2 = module.handleReceived(text2); + meshtastic_TrafficManagementStats stats = module.getStats(); + + TEST_ASSERT_EQUAL_INT(static_cast(ProcessMessage::CONTINUE), static_cast(p1)); + TEST_ASSERT_EQUAL_INT(static_cast(ProcessMessage::CONTINUE), static_cast(p2)); + TEST_ASSERT_EQUAL_INT(static_cast(ProcessMessage::CONTINUE), static_cast(t1)); + TEST_ASSERT_EQUAL_INT(static_cast(ProcessMessage::CONTINUE), static_cast(t2)); + TEST_ASSERT_EQUAL_UINT32(0, stats.position_dedup_drops); + TEST_ASSERT_EQUAL_UINT32(0, stats.rate_limit_drops); +} + +/** + * Verify router role clamps NodeInfo response hops to router-safe maximum. + * Important so large config values cannot widen response scope unexpectedly. + */ +static void test_tm_nodeinfo_routerClamp_skipsWhenTooManyHops(void) +{ + moduleConfig.traffic_management.nodeinfo_direct_response = true; + moduleConfig.traffic_management.nodeinfo_direct_response_max_hops = 10; + config.device.role = meshtastic_Config_DeviceConfig_Role_ROUTER; + mockNodeDB->setCachedNode(kTargetNode); + + TrafficManagementModuleTestShim module; + meshtastic_MeshPacket request = makeDecodedPacket(meshtastic_PortNum_NODEINFO_APP, kRemoteNode, kTargetNode); + request.decoded.want_response = true; + request.hop_start = 5; + request.hop_limit = 1; // 4 hops away; router clamp should cap max at 3 + + ProcessMessage result = module.handleReceived(request); + meshtastic_TrafficManagementStats stats = module.getStats(); + + TEST_ASSERT_EQUAL_INT(static_cast(ProcessMessage::CONTINUE), static_cast(result)); + TEST_ASSERT_EQUAL_UINT32(0, stats.nodeinfo_cache_hits); + TEST_ASSERT_FALSE(module.ignoreRequestFlag()); +} + +/** + * Verify NodeInfo direct-response success path and reply packet fields. + * Important because this path consumes the request and generates a spoofed cached reply. + */ +static void test_tm_nodeinfo_directResponse_respondsFromCache(void) +{ + moduleConfig.traffic_management.nodeinfo_direct_response = true; + moduleConfig.traffic_management.nodeinfo_direct_response_max_hops = 10; + config.device.role = meshtastic_Config_DeviceConfig_Role_CLIENT; + config.lora.config_ok_to_mqtt = true; + mockNodeDB->setCachedNode(kTargetNode); + + MockRouter mockRouter; + mockRouter.addInterface(std::unique_ptr(new MockRadioInterface())); + MeshService mockService; + router = &mockRouter; + service = &mockService; + + TrafficManagementModuleTestShim module; + meshtastic_MeshPacket request = makeDecodedPacket(meshtastic_PortNum_NODEINFO_APP, kRemoteNode, kTargetNode); + request.decoded.want_response = true; + request.id = 0x13572468; + request.hop_start = 3; + request.hop_limit = 3; // direct request (0 hops away) + + ProcessMessage result = module.handleReceived(request); + meshtastic_TrafficManagementStats stats = module.getStats(); + + TEST_ASSERT_EQUAL_INT(static_cast(ProcessMessage::STOP), static_cast(result)); + TEST_ASSERT_TRUE(module.ignoreRequestFlag()); + TEST_ASSERT_EQUAL_UINT32(1, stats.nodeinfo_cache_hits); + TEST_ASSERT_EQUAL_UINT32(1, static_cast(mockRouter.sentPackets.size())); + + const meshtastic_MeshPacket &reply = mockRouter.sentPackets.front(); + TEST_ASSERT_EQUAL_INT(meshtastic_PortNum_NODEINFO_APP, reply.decoded.portnum); + TEST_ASSERT_EQUAL_UINT32(kTargetNode, reply.from); + TEST_ASSERT_EQUAL_UINT32(kRemoteNode, reply.to); + TEST_ASSERT_EQUAL_UINT32(request.id, reply.decoded.request_id); + TEST_ASSERT_FALSE(reply.decoded.want_response); + TEST_ASSERT_EQUAL_UINT8(0, reply.hop_limit); + TEST_ASSERT_EQUAL_UINT8(0, reply.hop_start); + TEST_ASSERT_EQUAL_UINT8(mockNodeDB->getLastByteOfNodeNum(kRemoteNode), reply.next_hop); + TEST_ASSERT_TRUE(reply.decoded.has_bitfield); + TEST_ASSERT_EQUAL_UINT8(BITFIELD_OK_TO_MQTT_MASK, reply.decoded.bitfield); +} + +/** + * Verify cached direct replies still preserve requester NodeInfo learning. + * Important so consuming the request does not skip NodeDB refresh for observers. + */ +static void test_tm_nodeinfo_directResponse_learnsRequestorNodeInfo(void) +{ + moduleConfig.traffic_management.nodeinfo_direct_response = true; + moduleConfig.traffic_management.nodeinfo_direct_response_max_hops = 10; + config.device.role = meshtastic_Config_DeviceConfig_Role_CLIENT; + mockNodeDB->setCachedNode(kTargetNode); + + MockRouter mockRouter; + mockRouter.addInterface(std::unique_ptr(new MockRadioInterface())); + MeshService mockService; + router = &mockRouter; + service = &mockService; + + TrafficManagementModuleTestShim module; + meshtastic_MeshPacket request = makeNodeInfoPacket(kRemoteNode, "requester-long", "rq"); + request.to = kTargetNode; + request.decoded.want_response = true; + request.id = 0x01020304; + request.hop_start = 3; + request.hop_limit = 3; + + ProcessMessage result = module.handleReceived(request); + meshtastic_NodeInfoLite *requestor = mockNodeDB->getMeshNode(kRemoteNode); + + TEST_ASSERT_EQUAL_INT(static_cast(ProcessMessage::STOP), static_cast(result)); + TEST_ASSERT_NOT_NULL(requestor); + TEST_ASSERT_TRUE(requestor->has_user); + TEST_ASSERT_EQUAL_STRING("requester-long", requestor->user.long_name); + TEST_ASSERT_EQUAL_STRING("rq", requestor->user.short_name); + TEST_ASSERT_EQUAL_UINT8(request.channel, requestor->channel); +} + +/** + * Verify client role only answers direct (0-hop) NodeInfo requests. + * Important so clients do not answer relayed requests outside intended scope. + */ +static void test_tm_nodeinfo_clientClamp_skipsWhenNotDirect(void) +{ + moduleConfig.traffic_management.nodeinfo_direct_response = true; + moduleConfig.traffic_management.nodeinfo_direct_response_max_hops = 10; + config.device.role = meshtastic_Config_DeviceConfig_Role_CLIENT; + mockNodeDB->setCachedNode(kTargetNode); + + TrafficManagementModuleTestShim module; + meshtastic_MeshPacket request = makeDecodedPacket(meshtastic_PortNum_NODEINFO_APP, kRemoteNode, kTargetNode); + request.decoded.want_response = true; + request.hop_start = 2; + request.hop_limit = 1; // 1 hop away; clients are clamped to max 0 + + ProcessMessage result = module.handleReceived(request); + meshtastic_TrafficManagementStats stats = module.getStats(); + + TEST_ASSERT_EQUAL_INT(static_cast(ProcessMessage::CONTINUE), static_cast(result)); + TEST_ASSERT_EQUAL_UINT32(0, stats.nodeinfo_cache_hits); + TEST_ASSERT_FALSE(module.ignoreRequestFlag()); +} + +#if !(defined(ARCH_ESP32) && defined(BOARD_HAS_PSRAM)) +/** + * Verify non-PSRAM builds require NodeDB for direct NodeInfo responses. + * Important because fallback should only happen through node-wide data when + * the dedicated PSRAM cache does not exist. + */ +static void test_tm_nodeinfo_directResponse_withoutNodeDbEntry_skips(void) +{ + moduleConfig.traffic_management.nodeinfo_direct_response = true; + moduleConfig.traffic_management.nodeinfo_direct_response_max_hops = 10; + config.device.role = meshtastic_Config_DeviceConfig_Role_CLIENT; + mockNodeDB->clearCachedNode(); + + MockRouter mockRouter; + mockRouter.addInterface(std::unique_ptr(new MockRadioInterface())); + MeshService mockService; + router = &mockRouter; + service = &mockService; + + TrafficManagementModuleTestShim module; + meshtastic_MeshPacket request = makeDecodedPacket(meshtastic_PortNum_NODEINFO_APP, kRemoteNode, kTargetNode); + request.decoded.want_response = true; + request.hop_start = 3; + request.hop_limit = 3; + + ProcessMessage result = module.handleReceived(request); + meshtastic_TrafficManagementStats stats = module.getStats(); + + TEST_ASSERT_EQUAL_INT(static_cast(ProcessMessage::CONTINUE), static_cast(result)); + TEST_ASSERT_FALSE(module.ignoreRequestFlag()); + TEST_ASSERT_EQUAL_UINT32(0, stats.nodeinfo_cache_hits); + TEST_ASSERT_EQUAL_UINT32(0, static_cast(mockRouter.sentPackets.size())); +} +#endif + +#if defined(ARCH_ESP32) && defined(BOARD_HAS_PSRAM) +/** + * Verify PSRAM NodeInfo cache can answer requests without NodeDB and that + * shouldRespondToNodeInfo() uses cached bitfield metadata. + */ +static void test_tm_nodeinfo_directResponse_psramCacheRespondsAndPreservesBitfield(void) +{ + moduleConfig.traffic_management.nodeinfo_direct_response = true; + moduleConfig.traffic_management.nodeinfo_direct_response_max_hops = 10; + config.device.role = meshtastic_Config_DeviceConfig_Role_CLIENT; + config.lora.config_ok_to_mqtt = true; + mockNodeDB->clearCachedNode(); + + MockRouter mockRouter; + mockRouter.addInterface(std::unique_ptr(new MockRadioInterface())); + MeshService mockService; + router = &mockRouter; + service = &mockService; + + TrafficManagementModuleTestShim module; + + meshtastic_MeshPacket observed = makeNodeInfoPacket(kTargetNode, "target-long", "tg"); + observed.decoded.has_bitfield = true; + observed.decoded.bitfield = BITFIELD_WANT_RESPONSE_MASK; + observed.channel = 2; + observed.rx_time = 123456; + + ProcessMessage observedResult = module.handleReceived(observed); + TEST_ASSERT_EQUAL_INT(static_cast(ProcessMessage::CONTINUE), static_cast(observedResult)); + + meshtastic_MeshPacket request = makeDecodedPacket(meshtastic_PortNum_NODEINFO_APP, kRemoteNode, kTargetNode); + request.decoded.want_response = true; + request.id = 0x24681357; + request.channel = 1; + request.hop_start = 3; + request.hop_limit = 3; + + ProcessMessage result = module.handleReceived(request); + meshtastic_TrafficManagementStats stats = module.getStats(); + + TEST_ASSERT_EQUAL_INT(static_cast(ProcessMessage::STOP), static_cast(result)); + TEST_ASSERT_TRUE(module.ignoreRequestFlag()); + TEST_ASSERT_EQUAL_UINT32(1, stats.nodeinfo_cache_hits); + TEST_ASSERT_EQUAL_UINT32(1, static_cast(mockRouter.sentPackets.size())); + + const meshtastic_MeshPacket &reply = mockRouter.sentPackets.front(); + TEST_ASSERT_TRUE(reply.decoded.has_bitfield); + TEST_ASSERT_EQUAL_UINT8(static_cast(BITFIELD_WANT_RESPONSE_MASK | BITFIELD_OK_TO_MQTT_MASK), reply.decoded.bitfield); + TEST_ASSERT_EQUAL_UINT32(kTargetNode, reply.from); + TEST_ASSERT_EQUAL_UINT32(kRemoteNode, reply.to); + TEST_ASSERT_EQUAL_UINT8(request.channel, reply.channel); + TEST_ASSERT_EQUAL_UINT32(request.id, reply.decoded.request_id); +} + +/** + * Verify PSRAM cache misses do not fall back to NodeDB. + * Important so the dedicated PSRAM index stays logically separate from + * NodeInfoModule/NodeDB when PSRAM is available. + */ +static void test_tm_nodeinfo_directResponse_psramMissDoesNotFallbackToNodeDb(void) +{ + moduleConfig.traffic_management.nodeinfo_direct_response = true; + moduleConfig.traffic_management.nodeinfo_direct_response_max_hops = 10; + config.device.role = meshtastic_Config_DeviceConfig_Role_CLIENT; + mockNodeDB->setCachedNode(kTargetNode); + + MockRouter mockRouter; + mockRouter.addInterface(std::unique_ptr(new MockRadioInterface())); + MeshService mockService; + router = &mockRouter; + service = &mockService; + + TrafficManagementModuleTestShim module; + meshtastic_MeshPacket request = makeDecodedPacket(meshtastic_PortNum_NODEINFO_APP, kRemoteNode, kTargetNode); + request.decoded.want_response = true; + request.hop_start = 3; + request.hop_limit = 3; + + ProcessMessage result = module.handleReceived(request); + meshtastic_TrafficManagementStats stats = module.getStats(); + + TEST_ASSERT_EQUAL_INT(static_cast(ProcessMessage::CONTINUE), static_cast(result)); + TEST_ASSERT_FALSE(module.ignoreRequestFlag()); + TEST_ASSERT_EQUAL_UINT32(0, stats.nodeinfo_cache_hits); + TEST_ASSERT_EQUAL_UINT32(0, static_cast(mockRouter.sentPackets.size())); +} +#endif + +/** + * Verify relayed telemetry broadcasts are hop-exhausted when enabled. + * Important to prevent further mesh propagation while still allowing one relay step. + */ +static void test_tm_alterReceived_exhaustsRelayedTelemetryBroadcast(void) +{ + moduleConfig.traffic_management.exhaust_hop_telemetry = true; + TrafficManagementModuleTestShim module; + meshtastic_MeshPacket packet = makeDecodedPacket(meshtastic_PortNum_TELEMETRY_APP, kRemoteNode, NODENUM_BROADCAST); + packet.hop_start = 5; + packet.hop_limit = 3; + + module.alterReceived(packet); + meshtastic_TrafficManagementStats stats = module.getStats(); + + TEST_ASSERT_EQUAL_UINT8(0, packet.hop_limit); + TEST_ASSERT_EQUAL_UINT8(3, packet.hop_start); + TEST_ASSERT_TRUE(module.shouldExhaustHops(packet)); + TEST_ASSERT_EQUAL_UINT32(1, stats.hop_exhausted_packets); +} + +/** + * Verify hop exhaustion skips unicast and local-origin packets. + * Important to avoid mutating traffic that should retain normal forwarding behavior. + */ +static void test_tm_alterReceived_skipsLocalAndUnicast(void) +{ + moduleConfig.traffic_management.exhaust_hop_telemetry = true; + TrafficManagementModuleTestShim module; + + meshtastic_MeshPacket unicast = makeDecodedPacket(meshtastic_PortNum_TELEMETRY_APP, kRemoteNode, kTargetNode); + unicast.hop_start = 5; + unicast.hop_limit = 3; + module.alterReceived(unicast); + TEST_ASSERT_EQUAL_UINT8(3, unicast.hop_limit); + TEST_ASSERT_FALSE(module.shouldExhaustHops(unicast)); + + meshtastic_MeshPacket fromUs = makeDecodedPacket(meshtastic_PortNum_TELEMETRY_APP, kLocalNode, NODENUM_BROADCAST); + fromUs.hop_start = 5; + fromUs.hop_limit = 3; + module.alterReceived(fromUs); + TEST_ASSERT_EQUAL_UINT8(3, fromUs.hop_limit); + TEST_ASSERT_FALSE(module.shouldExhaustHops(fromUs)); + + meshtastic_TrafficManagementStats stats = module.getStats(); + TEST_ASSERT_EQUAL_UINT32(0, stats.hop_exhausted_packets); +} + +/** + * Verify position dedup window expires and later duplicates are allowed. + * Important so periodic identical reports can resume after cooldown. + */ +static void test_tm_positionDedup_allowsDuplicateAfterIntervalExpires(void) +{ + moduleConfig.traffic_management.position_dedup_enabled = true; + moduleConfig.traffic_management.position_precision_bits = 16; + moduleConfig.traffic_management.position_min_interval_secs = 1; + TrafficManagementModuleTestShim module; + + meshtastic_MeshPacket first = makePositionPacket(kRemoteNode, 374221234, -1220845678); + meshtastic_MeshPacket second = makePositionPacket(kRemoteNode, 374221234, -1220845678); + meshtastic_MeshPacket third = makePositionPacket(kRemoteNode, 374221234, -1220845678); + + ProcessMessage r1 = module.handleReceived(first); + ProcessMessage r2 = module.handleReceived(second); + testDelay(1200); + ProcessMessage r3 = module.handleReceived(third); + meshtastic_TrafficManagementStats stats = module.getStats(); + + TEST_ASSERT_EQUAL_INT(static_cast(ProcessMessage::CONTINUE), static_cast(r1)); + TEST_ASSERT_EQUAL_INT(static_cast(ProcessMessage::STOP), static_cast(r2)); + TEST_ASSERT_EQUAL_INT(static_cast(ProcessMessage::CONTINUE), static_cast(r3)); + TEST_ASSERT_EQUAL_UINT32(1, stats.position_dedup_drops); +} + +/** + * Verify interval=0 disables position deduplication. + * Important because this is an explicit configuration escape hatch. + */ +static void test_tm_positionDedup_intervalZero_neverDrops(void) +{ + moduleConfig.traffic_management.position_dedup_enabled = true; + moduleConfig.traffic_management.position_precision_bits = 16; + moduleConfig.traffic_management.position_min_interval_secs = 0; + TrafficManagementModuleTestShim module; + + meshtastic_MeshPacket first = makePositionPacket(kRemoteNode, 374221234, -1220845678); + meshtastic_MeshPacket second = makePositionPacket(kRemoteNode, 374221234, -1220845678); + + ProcessMessage r1 = module.handleReceived(first); + ProcessMessage r2 = module.handleReceived(second); + meshtastic_TrafficManagementStats stats = module.getStats(); + + TEST_ASSERT_EQUAL_INT(static_cast(ProcessMessage::CONTINUE), static_cast(r1)); + TEST_ASSERT_EQUAL_INT(static_cast(ProcessMessage::CONTINUE), static_cast(r2)); + TEST_ASSERT_EQUAL_UINT32(0, stats.position_dedup_drops); +} + +/** + * Verify precision values above 32 fall back to default precision. + * Important so invalid config uses the documented default behavior. + */ +static void test_tm_positionDedup_precisionAbove32_usesDefaultPrecision(void) +{ + moduleConfig.traffic_management.position_dedup_enabled = true; + moduleConfig.traffic_management.position_precision_bits = 99; + moduleConfig.traffic_management.position_min_interval_secs = 300; + TrafficManagementModuleTestShim module; + + meshtastic_MeshPacket first = makePositionPacket(kRemoteNode, 374221234, -1220845678); + meshtastic_MeshPacket second = makePositionPacket(kRemoteNode, 384221234, -1210845678); + + ProcessMessage r1 = module.handleReceived(first); + ProcessMessage r2 = module.handleReceived(second); + meshtastic_TrafficManagementStats stats = module.getStats(); + + TEST_ASSERT_EQUAL_INT(static_cast(ProcessMessage::CONTINUE), static_cast(r1)); + TEST_ASSERT_EQUAL_INT(static_cast(ProcessMessage::CONTINUE), static_cast(r2)); + TEST_ASSERT_EQUAL_UINT32(0, stats.position_dedup_drops); +} + +/** + * Verify precision=32 does not collapse all positions to one fingerprint. + * Important to prevent false duplicate drops at the full-precision boundary. + */ +static void test_tm_positionDedup_precision32_allowsDistinctPositions(void) +{ + moduleConfig.traffic_management.position_dedup_enabled = true; + moduleConfig.traffic_management.position_precision_bits = 32; + moduleConfig.traffic_management.position_min_interval_secs = 300; + TrafficManagementModuleTestShim module; + + meshtastic_MeshPacket first = makePositionPacket(kRemoteNode, 374221234, -1220845678); + meshtastic_MeshPacket second = makePositionPacket(kRemoteNode, 374221235, -1220845677); + + ProcessMessage r1 = module.handleReceived(first); + ProcessMessage r2 = module.handleReceived(second); + meshtastic_TrafficManagementStats stats = module.getStats(); + + TEST_ASSERT_EQUAL_INT(static_cast(ProcessMessage::CONTINUE), static_cast(r1)); + TEST_ASSERT_EQUAL_INT(static_cast(ProcessMessage::CONTINUE), static_cast(r2)); + TEST_ASSERT_EQUAL_UINT32(0, stats.position_dedup_drops); +} + +/** + * Verify invalid precision=0 is treated as full precision. + * Important so invalid config does not collapse all positions into one fingerprint. + */ +static void test_tm_positionDedup_precisionZero_allowsDistinctPositions(void) +{ + moduleConfig.traffic_management.position_dedup_enabled = true; + moduleConfig.traffic_management.position_precision_bits = 0; + moduleConfig.traffic_management.position_min_interval_secs = 300; + TrafficManagementModuleTestShim module; + + meshtastic_MeshPacket first = makePositionPacket(kRemoteNode, 374221234, -1220845678); + meshtastic_MeshPacket second = makePositionPacket(kRemoteNode, 374221235, -1220845677); + + ProcessMessage r1 = module.handleReceived(first); + ProcessMessage r2 = module.handleReceived(second); + meshtastic_TrafficManagementStats stats = module.getStats(); + + TEST_ASSERT_EQUAL_INT(static_cast(ProcessMessage::CONTINUE), static_cast(r1)); + TEST_ASSERT_EQUAL_INT(static_cast(ProcessMessage::CONTINUE), static_cast(r2)); + TEST_ASSERT_EQUAL_UINT32(0, stats.position_dedup_drops); +} + +/** + * Verify epoch reset invalidates stale position identity for dedup. + * Important so reset paths cannot leak prior packet identity into new windows. + */ +static void test_tm_positionDedup_epochReset_doesNotDropFirstPacketAfterReset(void) +{ + moduleConfig.traffic_management.position_dedup_enabled = true; + moduleConfig.traffic_management.position_precision_bits = 16; + moduleConfig.traffic_management.position_min_interval_secs = 300; + TrafficManagementModuleTestShim module; + + meshtastic_MeshPacket first = makePositionPacket(kRemoteNode, 374221234, -1220845678); + meshtastic_MeshPacket afterReset = makePositionPacket(kRemoteNode, 374221234, -1220845678); + meshtastic_MeshPacket duplicate = makePositionPacket(kRemoteNode, 374221234, -1220845678); + + ProcessMessage r1 = module.handleReceived(first); + module.resetEpoch(millis()); + ProcessMessage r2 = module.handleReceived(afterReset); + ProcessMessage r3 = module.handleReceived(duplicate); + meshtastic_TrafficManagementStats stats = module.getStats(); + + TEST_ASSERT_EQUAL_INT(static_cast(ProcessMessage::CONTINUE), static_cast(r1)); + TEST_ASSERT_EQUAL_INT(static_cast(ProcessMessage::CONTINUE), static_cast(r2)); + TEST_ASSERT_EQUAL_INT(static_cast(ProcessMessage::STOP), static_cast(r3)); + TEST_ASSERT_EQUAL_UINT32(1, stats.position_dedup_drops); +} + +/** + * Verify non-position cache state does not make the first fingerprint-0 position look duplicated. + * Important so unified cache entries from other features cannot leak into dedup decisions. + */ +static void test_tm_positionDedup_priorRateState_doesNotDropFirstFingerprintZero(void) +{ + moduleConfig.traffic_management.position_dedup_enabled = true; + moduleConfig.traffic_management.position_precision_bits = 16; + moduleConfig.traffic_management.position_min_interval_secs = 300; + moduleConfig.traffic_management.rate_limit_enabled = true; + moduleConfig.traffic_management.rate_limit_window_secs = 60; + moduleConfig.traffic_management.rate_limit_max_packets = 10; + TrafficManagementModuleTestShim module; + + meshtastic_MeshPacket text = makeDecodedPacket(meshtastic_PortNum_TEXT_MESSAGE_APP, kRemoteNode); + meshtastic_MeshPacket first = makePositionPacket(kRemoteNode, 0x12300000, 0x45600000); + meshtastic_MeshPacket duplicate = makePositionPacket(kRemoteNode, 0x12300000, 0x45600000); + + ProcessMessage seeded = module.handleReceived(text); + ProcessMessage r1 = module.handleReceived(first); + ProcessMessage r2 = module.handleReceived(duplicate); + meshtastic_TrafficManagementStats stats = module.getStats(); + + TEST_ASSERT_EQUAL_INT(static_cast(ProcessMessage::CONTINUE), static_cast(seeded)); + TEST_ASSERT_EQUAL_INT(static_cast(ProcessMessage::CONTINUE), static_cast(r1)); + TEST_ASSERT_EQUAL_INT(static_cast(ProcessMessage::STOP), static_cast(r2)); + TEST_ASSERT_EQUAL_UINT32(1, stats.position_dedup_drops); +} + +/** + * Verify rate-limit counters reset after the window expires. + * Important so temporary bursts do not cause persistent throttling. + */ +static void test_tm_rateLimit_resetsAfterWindowExpires(void) +{ + moduleConfig.traffic_management.rate_limit_enabled = true; + moduleConfig.traffic_management.rate_limit_window_secs = 1; + moduleConfig.traffic_management.rate_limit_max_packets = 1; + TrafficManagementModuleTestShim module; + meshtastic_MeshPacket packet = makeDecodedPacket(meshtastic_PortNum_TEXT_MESSAGE_APP, kRemoteNode); + + ProcessMessage r1 = module.handleReceived(packet); + ProcessMessage r2 = module.handleReceived(packet); + testDelay(1200); + ProcessMessage r3 = module.handleReceived(packet); + meshtastic_TrafficManagementStats stats = module.getStats(); + + TEST_ASSERT_EQUAL_INT(static_cast(ProcessMessage::CONTINUE), static_cast(r1)); + TEST_ASSERT_EQUAL_INT(static_cast(ProcessMessage::STOP), static_cast(r2)); + TEST_ASSERT_EQUAL_INT(static_cast(ProcessMessage::CONTINUE), static_cast(r3)); + TEST_ASSERT_EQUAL_UINT32(1, stats.rate_limit_drops); +} + +/** + * Verify rate-limit thresholds above 255 effectively clamp to 255. + * Important because counters are uint8_t and must not overflow behavior. + */ +static void test_tm_rateLimit_thresholdAbove255_clamps(void) +{ + moduleConfig.traffic_management.rate_limit_enabled = true; + moduleConfig.traffic_management.rate_limit_window_secs = 60; + moduleConfig.traffic_management.rate_limit_max_packets = 300; + TrafficManagementModuleTestShim module; + meshtastic_MeshPacket packet = makeDecodedPacket(meshtastic_PortNum_TEXT_MESSAGE_APP, kRemoteNode); + + for (int i = 0; i < 255; i++) { + ProcessMessage result = module.handleReceived(packet); + TEST_ASSERT_EQUAL_INT(static_cast(ProcessMessage::CONTINUE), static_cast(result)); + } + ProcessMessage dropped = module.handleReceived(packet); + meshtastic_TrafficManagementStats stats = module.getStats(); + + TEST_ASSERT_EQUAL_INT(static_cast(ProcessMessage::STOP), static_cast(dropped)); + TEST_ASSERT_EQUAL_UINT32(1, stats.rate_limit_drops); +} + +/** + * Verify unknown-packet tracking resets after its active window expires. + * Important so old unknown traffic does not trigger delayed drops. + */ +static void test_tm_unknownPackets_resetAfterWindowExpires(void) +{ + moduleConfig.traffic_management.drop_unknown_enabled = true; + moduleConfig.traffic_management.unknown_packet_threshold = 1; + moduleConfig.traffic_management.rate_limit_window_secs = 1; + TrafficManagementModuleTestShim module; + meshtastic_MeshPacket packet = makeUnknownPacket(kRemoteNode); + + ProcessMessage r1 = module.handleReceived(packet); + ProcessMessage r2 = module.handleReceived(packet); + testDelay(1200); + ProcessMessage r3 = module.handleReceived(packet); + meshtastic_TrafficManagementStats stats = module.getStats(); + + TEST_ASSERT_EQUAL_INT(static_cast(ProcessMessage::CONTINUE), static_cast(r1)); + TEST_ASSERT_EQUAL_INT(static_cast(ProcessMessage::STOP), static_cast(r2)); + TEST_ASSERT_EQUAL_INT(static_cast(ProcessMessage::CONTINUE), static_cast(r3)); + TEST_ASSERT_EQUAL_UINT32(1, stats.unknown_packet_drops); +} + +/** + * Verify unknown threshold values above 255 clamp to the counter ceiling. + * Important to align config semantics with saturating counter storage. + */ +static void test_tm_unknownPackets_thresholdAbove255_clamps(void) +{ + moduleConfig.traffic_management.drop_unknown_enabled = true; + moduleConfig.traffic_management.unknown_packet_threshold = 300; + TrafficManagementModuleTestShim module; + meshtastic_MeshPacket packet = makeUnknownPacket(kRemoteNode); + + for (int i = 0; i < 255; i++) { + ProcessMessage result = module.handleReceived(packet); + TEST_ASSERT_EQUAL_INT(static_cast(ProcessMessage::CONTINUE), static_cast(result)); + } + ProcessMessage dropped = module.handleReceived(packet); + meshtastic_TrafficManagementStats stats = module.getStats(); + + TEST_ASSERT_EQUAL_INT(static_cast(ProcessMessage::STOP), static_cast(dropped)); + TEST_ASSERT_EQUAL_UINT32(1, stats.unknown_packet_drops); +} + +/** + * Verify relayed position broadcasts can also be hop-exhausted. + * Important because telemetry and position use separate exhaust flags. + */ +static void test_tm_alterReceived_exhaustsRelayedPositionBroadcast(void) +{ + moduleConfig.traffic_management.exhaust_hop_position = true; + TrafficManagementModuleTestShim module; + meshtastic_MeshPacket packet = makePositionPacket(kRemoteNode, 374221234, -1220845678, NODENUM_BROADCAST); + packet.hop_start = 5; + packet.hop_limit = 2; + + module.alterReceived(packet); + meshtastic_TrafficManagementStats stats = module.getStats(); + + TEST_ASSERT_EQUAL_UINT8(0, packet.hop_limit); + TEST_ASSERT_EQUAL_UINT8(4, packet.hop_start); + TEST_ASSERT_TRUE(module.shouldExhaustHops(packet)); + TEST_ASSERT_EQUAL_UINT32(1, stats.hop_exhausted_packets); +} + +/** + * Verify hop exhaustion ignores undecoded/encrypted packets. + * Important so we never mutate packets that were not decoded by this module. + */ +static void test_tm_alterReceived_skipsUndecodedPackets(void) +{ + moduleConfig.traffic_management.exhaust_hop_telemetry = true; + TrafficManagementModuleTestShim module; + meshtastic_MeshPacket packet = makeUnknownPacket(kRemoteNode, NODENUM_BROADCAST); + packet.hop_start = 5; + packet.hop_limit = 3; + + module.alterReceived(packet); + meshtastic_TrafficManagementStats stats = module.getStats(); + + TEST_ASSERT_EQUAL_UINT8(5, packet.hop_start); + TEST_ASSERT_EQUAL_UINT8(3, packet.hop_limit); + TEST_ASSERT_FALSE(module.shouldExhaustHops(packet)); + TEST_ASSERT_EQUAL_UINT32(0, stats.hop_exhausted_packets); +} + +/** + * Verify exhaustRequested is per-packet and resets on next handleReceived(). + * Important so a prior packet cannot leak hop-exhaust state into later packets. + */ +static void test_tm_alterReceived_resetExhaustFlagOnNextPacket(void) +{ + moduleConfig.traffic_management.exhaust_hop_telemetry = true; + TrafficManagementModuleTestShim module; + + meshtastic_MeshPacket telemetry = makeDecodedPacket(meshtastic_PortNum_TELEMETRY_APP, kRemoteNode, NODENUM_BROADCAST); + telemetry.hop_start = 5; + telemetry.hop_limit = 3; + module.alterReceived(telemetry); + TEST_ASSERT_TRUE(module.shouldExhaustHops(telemetry)); + + meshtastic_MeshPacket text = makeDecodedPacket(meshtastic_PortNum_TEXT_MESSAGE_APP, kRemoteNode); + ProcessMessage result = module.handleReceived(text); + meshtastic_TrafficManagementStats stats = module.getStats(); + + TEST_ASSERT_EQUAL_INT(static_cast(ProcessMessage::CONTINUE), static_cast(result)); + TEST_ASSERT_FALSE(module.shouldExhaustHops(telemetry)); + TEST_ASSERT_EQUAL_UINT32(1, stats.hop_exhausted_packets); +} + +/** + * Verify exhaust requests are packet-scoped (from + id). + * Important so stale state from one packet cannot influence unrelated packets + * that pass through duplicate/rebroadcast paths before handleReceived(). + */ +static void test_tm_alterReceived_exhaustFlag_isPacketScoped(void) +{ + moduleConfig.traffic_management.exhaust_hop_telemetry = true; + TrafficManagementModuleTestShim module; + + meshtastic_MeshPacket exhausted = makeDecodedPacket(meshtastic_PortNum_TELEMETRY_APP, kRemoteNode, NODENUM_BROADCAST); + exhausted.id = 0x1010; + exhausted.hop_start = 5; + exhausted.hop_limit = 3; + module.alterReceived(exhausted); + + meshtastic_MeshPacket unrelated = makeDecodedPacket(meshtastic_PortNum_TELEMETRY_APP, kTargetNode, NODENUM_BROADCAST); + unrelated.id = 0x2020; + unrelated.hop_start = 4; + unrelated.hop_limit = 0; + + TEST_ASSERT_TRUE(module.shouldExhaustHops(exhausted)); + TEST_ASSERT_FALSE(module.shouldExhaustHops(unrelated)); +} + +/** + * Verify runOnce() returns sleep-forever interval when module is disabled. + * Important to ensure the maintenance thread is effectively inert when off. + */ +static void test_tm_runOnce_disabledReturnsMaxInterval(void) +{ + moduleConfig.traffic_management.enabled = false; + TrafficManagementModuleTestShim module; + + int32_t interval = module.runOnce(); + + TEST_ASSERT_EQUAL_INT32(INT32_MAX, interval); +} + +/** + * Verify runOnce() returns the maintenance cadence when enabled. + * Important so periodic cache housekeeping continues at expected interval. + */ +static void test_tm_runOnce_enabledReturnsMaintenanceInterval(void) +{ + TrafficManagementModuleTestShim module; + + int32_t interval = module.runOnce(); + + TEST_ASSERT_EQUAL_INT32(60 * 1000, interval); +} + +} // namespace + +void setUp(void) +{ + resetTrafficConfig(); +} +void tearDown(void) {} + +TM_TEST_ENTRY void setup() +{ + delay(10); + delay(2000); + + initializeTestEnvironment(); + mockNodeDB = new MockNodeDB(); + nodeDB = mockNodeDB; + + UNITY_BEGIN(); + RUN_TEST(test_tm_moduleDisabled_doesNothing); + RUN_TEST(test_tm_unknownPackets_dropOnNPlusOne); + RUN_TEST(test_tm_positionDedup_dropsDuplicateWithinWindow); + RUN_TEST(test_tm_positionDedup_allowsMovedPosition); + RUN_TEST(test_tm_rateLimit_dropsOnlyAfterThreshold); + RUN_TEST(test_tm_rateLimit_skipsRoutingAndAdminPorts); + RUN_TEST(test_tm_fromUs_bypassesPositionAndRateFilters); + RUN_TEST(test_tm_localDestination_bypassesTransitFilters); + RUN_TEST(test_tm_nodeinfo_routerClamp_skipsWhenTooManyHops); + RUN_TEST(test_tm_nodeinfo_directResponse_respondsFromCache); + RUN_TEST(test_tm_nodeinfo_directResponse_learnsRequestorNodeInfo); + RUN_TEST(test_tm_nodeinfo_clientClamp_skipsWhenNotDirect); +#if !(defined(ARCH_ESP32) && defined(BOARD_HAS_PSRAM)) + RUN_TEST(test_tm_nodeinfo_directResponse_withoutNodeDbEntry_skips); +#endif +#if defined(ARCH_ESP32) && defined(BOARD_HAS_PSRAM) + RUN_TEST(test_tm_nodeinfo_directResponse_psramCacheRespondsAndPreservesBitfield); + RUN_TEST(test_tm_nodeinfo_directResponse_psramMissDoesNotFallbackToNodeDb); +#endif + RUN_TEST(test_tm_alterReceived_exhaustsRelayedTelemetryBroadcast); + RUN_TEST(test_tm_alterReceived_skipsLocalAndUnicast); + RUN_TEST(test_tm_positionDedup_allowsDuplicateAfterIntervalExpires); + RUN_TEST(test_tm_positionDedup_intervalZero_neverDrops); + RUN_TEST(test_tm_positionDedup_precisionAbove32_usesDefaultPrecision); + RUN_TEST(test_tm_positionDedup_precision32_allowsDistinctPositions); + RUN_TEST(test_tm_positionDedup_precisionZero_allowsDistinctPositions); + RUN_TEST(test_tm_positionDedup_epochReset_doesNotDropFirstPacketAfterReset); + RUN_TEST(test_tm_positionDedup_priorRateState_doesNotDropFirstFingerprintZero); + RUN_TEST(test_tm_rateLimit_resetsAfterWindowExpires); + RUN_TEST(test_tm_rateLimit_thresholdAbove255_clamps); + RUN_TEST(test_tm_unknownPackets_resetAfterWindowExpires); + RUN_TEST(test_tm_unknownPackets_thresholdAbove255_clamps); + RUN_TEST(test_tm_alterReceived_exhaustsRelayedPositionBroadcast); + RUN_TEST(test_tm_alterReceived_skipsUndecodedPackets); + RUN_TEST(test_tm_alterReceived_resetExhaustFlagOnNextPacket); + RUN_TEST(test_tm_alterReceived_exhaustFlag_isPacketScoped); + RUN_TEST(test_tm_runOnce_disabledReturnsMaxInterval); + RUN_TEST(test_tm_runOnce_enabledReturnsMaintenanceInterval); + exit(UNITY_END()); +} + +TM_TEST_ENTRY void loop() {} + +#else + +void setUp(void) {} +void tearDown(void) {} + +TM_TEST_ENTRY void setup() +{ + initializeTestEnvironment(); + UNITY_BEGIN(); + exit(UNITY_END()); +} + +TM_TEST_ENTRY void loop() {} + +#endif diff --git a/variants/esp32s3/heltec_v4/variant.h b/variants/esp32s3/heltec_v4/variant.h index 83443c5f3..72f55d09f 100644 --- a/variants/esp32s3/heltec_v4/variant.h +++ b/variants/esp32s3/heltec_v4/variant.h @@ -29,6 +29,14 @@ #define SX126X_DIO2_AS_RF_SWITCH #define SX126X_DIO3_TCXO_VOLTAGE 1.8 +// Enable Traffic Management Module for Heltec V4 +#ifndef HAS_TRAFFIC_MANAGEMENT +#define HAS_TRAFFIC_MANAGEMENT 1 +#endif +#ifndef TRAFFIC_MANAGEMENT_CACHE_SIZE +#define TRAFFIC_MANAGEMENT_CACHE_SIZE 2048 +#endif + // ---- GC1109 RF FRONT END CONFIGURATION ---- // The Heltec V4.2 uses a GC1109 FEM chip with integrated PA and LNA // RF path: SX1262 -> Pi attenuator -> GC1109 PA -> Antenna @@ -89,4 +97,4 @@ // Seems to be missing on this new board #define GPS_TX_PIN (38) // This is for bits going TOWARDS the CPU #define GPS_RX_PIN (39) // This is for bits going TOWARDS the GPS -#define GPS_THREAD_INTERVAL 50 \ No newline at end of file +#define GPS_THREAD_INTERVAL 50 diff --git a/variants/esp32s3/station-g2/variant.h b/variants/esp32s3/station-g2/variant.h index 8f0b4b220..2d65a042c 100644 --- a/variants/esp32s3/station-g2/variant.h +++ b/variants/esp32s3/station-g2/variant.h @@ -40,6 +40,14 @@ Board Information: https://wiki.uniteng.com/en/meshtastic/station-g2 #define SX126X_MAX_POWER 19 #endif +// Enable Traffic Management Module for Station G2 +#ifndef HAS_TRAFFIC_MANAGEMENT +#define HAS_TRAFFIC_MANAGEMENT 1 +#endif +#ifndef TRAFFIC_MANAGEMENT_CACHE_SIZE +#define TRAFFIC_MANAGEMENT_CACHE_SIZE 2048 +#endif + /* #define BATTERY_PIN 4 // A battery voltage measurement pin, voltage divider connected here to measure battery voltage #define ADC_CHANNEL ADC1_GPIO4_CHANNEL diff --git a/variants/native/portduino/variant.h b/variants/native/portduino/variant.h index cba4aaedd..c23d17b8d 100644 --- a/variants/native/portduino/variant.h +++ b/variants/native/portduino/variant.h @@ -8,3 +8,11 @@ // RAK12002 RTC Module #define RV3028_RTC (uint8_t)0b1010010 + +// Enable Traffic Management Module for native/portduino +#ifndef HAS_TRAFFIC_MANAGEMENT +#define HAS_TRAFFIC_MANAGEMENT 1 +#endif +#ifndef TRAFFIC_MANAGEMENT_CACHE_SIZE +#define TRAFFIC_MANAGEMENT_CACHE_SIZE 2048 +#endif diff --git a/variants/nrf52840/tracker-t1000-e/variant.h b/variants/nrf52840/tracker-t1000-e/variant.h index b064dbc9f..de6916ac7 100644 --- a/variants/nrf52840/tracker-t1000-e/variant.h +++ b/variants/nrf52840/tracker-t1000-e/variant.h @@ -154,6 +154,15 @@ extern "C" { #define HAS_SCREEN 0 +// Enable Traffic Management Module for testing on T1000-E +// NRF52840 has 256KB RAM - 1024 entries uses ~10KB +#ifndef HAS_TRAFFIC_MANAGEMENT +#define HAS_TRAFFIC_MANAGEMENT 1 +#endif +#ifndef TRAFFIC_MANAGEMENT_CACHE_SIZE +#define TRAFFIC_MANAGEMENT_CACHE_SIZE 1024 +#endif + #ifdef __cplusplus } #endif