mirror of
https://github.com/meshtastic/python.git
synced 2025-12-31 11:57:57 -05:00
Merge pull request #584 from ianmcorvidae/improve-fixed-position
Use new fixed position admin messages and add `--remove-position` argument
This commit is contained in:
@@ -257,33 +257,41 @@ def onConnected(interface):
|
||||
if not args.export_config:
|
||||
print("Connected to radio")
|
||||
|
||||
if args.setlat or args.setlon or args.setalt:
|
||||
if args.remove_position:
|
||||
if args.dest != BROADCAST_ADDR:
|
||||
print("Setting positions of remote nodes is not supported.")
|
||||
return
|
||||
closeNow = True
|
||||
print("Removing fixed position and disabling fixed position setting")
|
||||
interface.localNode.removeFixedPosition()
|
||||
elif args.setlat or args.setlon or args.setalt:
|
||||
if args.dest != BROADCAST_ADDR:
|
||||
print("Setting latitude, longitude, and altitude of remote nodes is not supported.")
|
||||
return
|
||||
closeNow = True
|
||||
|
||||
alt = 0
|
||||
lat = 0.0
|
||||
lon = 0.0
|
||||
localConfig = interface.localNode.localConfig
|
||||
lat = 0
|
||||
lon = 0
|
||||
if args.setalt:
|
||||
alt = int(args.setalt)
|
||||
localConfig.position.fixed_position = True
|
||||
print(f"Fixing altitude at {alt} meters")
|
||||
if args.setlat:
|
||||
lat = float(args.setlat)
|
||||
localConfig.position.fixed_position = True
|
||||
try:
|
||||
lat = int(args.setlat)
|
||||
except ValueError:
|
||||
lat = float(args.setlat)
|
||||
print(f"Fixing latitude at {lat} degrees")
|
||||
if args.setlon:
|
||||
lon = float(args.setlon)
|
||||
localConfig.position.fixed_position = True
|
||||
try:
|
||||
lon = int(args.setlon)
|
||||
except ValueError:
|
||||
lon = float(args.setlon)
|
||||
print(f"Fixing longitude at {lon} degrees")
|
||||
|
||||
print("Setting device position")
|
||||
print("Setting device position and enabling fixed position setting")
|
||||
# can include lat/long/alt etc: latitude = 37.5, longitude = -122.1
|
||||
interface.sendPosition(lat, lon, alt)
|
||||
interface.localNode.writeConfig("position")
|
||||
interface.localNode.setFixedPosition(lat, lon, alt)
|
||||
elif not args.no_time:
|
||||
# We normally provide a current time to the mesh when we connect
|
||||
if interface.localNode.nodeNum in interface.nodesByNum and "position" in interface.nodesByNum[interface.localNode.nodeNum]:
|
||||
@@ -1445,12 +1453,25 @@ def initParser():
|
||||
action="store_true",
|
||||
)
|
||||
|
||||
group.add_argument("--setalt", help="Set device altitude in meters (allows use without GPS)")
|
||||
|
||||
group.add_argument("--setlat", help="Set device latitude (allows use without GPS)")
|
||||
group.add_argument(
|
||||
"--setalt",
|
||||
help="Set device altitude in meters (allows use without GPS), and enable fixed position.",
|
||||
)
|
||||
|
||||
group.add_argument(
|
||||
"--setlon", help="Set device longitude (allows use without GPS)"
|
||||
"--setlat",
|
||||
help="Set device latitude (allows use without GPS), and enable fixed position. Accepts a decimal value or an integer premultiplied by 1e7.",
|
||||
)
|
||||
|
||||
group.add_argument(
|
||||
"--setlon",
|
||||
help="Set device longitude (allows use without GPS), and enable fixed position. Accepts a decimal value or an integer premultiplied by 1e7.",
|
||||
)
|
||||
|
||||
group.add_argument(
|
||||
"--remove-position",
|
||||
help="Clear any existing fixed position and disable fixed position.",
|
||||
action="store_true",
|
||||
)
|
||||
|
||||
group.add_argument(
|
||||
|
||||
@@ -7,7 +7,7 @@ import time
|
||||
|
||||
from typing import Union
|
||||
|
||||
from meshtastic import admin_pb2, apponly_pb2, channel_pb2, localonly_pb2, portnums_pb2
|
||||
from meshtastic import admin_pb2, apponly_pb2, channel_pb2, localonly_pb2, mesh_pb2, portnums_pb2
|
||||
from meshtastic.util import (
|
||||
Timeout,
|
||||
camel_to_snake,
|
||||
@@ -655,6 +655,38 @@ class Node:
|
||||
onResponse = self.onAckNak
|
||||
return self._sendAdmin(p, onResponse=onResponse)
|
||||
|
||||
def setFixedPosition(self, lat: Union[int, float], lon: Union[int, float], alt: int):
|
||||
"""Tell the node to set fixed position to the provided value and enable the fixed position setting"""
|
||||
if self != self.iface.localNode:
|
||||
logging.error("Setting position of remote nodes is not supported.")
|
||||
return None
|
||||
|
||||
p = mesh_pb2.Position()
|
||||
if isinstance(lat, float) and lat != 0.0:
|
||||
p.latitude_i = int(lat / 1e-7)
|
||||
elif isinstance(lat, int) and lat != 0:
|
||||
p.latitude_i = lat
|
||||
|
||||
if isinstance(lon, float) and lon != 0.0:
|
||||
p.longitude_i = int(lon / 1e-7)
|
||||
elif isinstance(lon, int) and lon != 0:
|
||||
p.longitude_i = lon
|
||||
|
||||
if alt != 0:
|
||||
p.altitude = alt
|
||||
|
||||
a = admin_pb2.AdminMessage()
|
||||
a.set_fixed_position.CopyFrom(p)
|
||||
return self._sendAdmin(a)
|
||||
|
||||
def removeFixedPosition(self):
|
||||
"""Tell the node to remove the fixed position and set the fixed position setting to false"""
|
||||
p = admin_pb2.AdminMessage()
|
||||
p.remove_fixed_position = True
|
||||
logging.info(f"Telling node to remove fixed position")
|
||||
|
||||
return self._sendAdmin(p)
|
||||
|
||||
def _fixupChannels(self):
|
||||
"""Fixup indexes and add disabled channels as needed"""
|
||||
|
||||
|
||||
@@ -734,19 +734,14 @@ def test_main_setlat(capsys):
|
||||
|
||||
mocked_node = MagicMock(autospec=Node)
|
||||
|
||||
def mock_writeConfig():
|
||||
print("inside mocked writeConfig")
|
||||
|
||||
mocked_node.writeConfig.side_effect = mock_writeConfig
|
||||
|
||||
iface = MagicMock(autospec=SerialInterface)
|
||||
|
||||
def mock_sendPosition(lat, lon, alt):
|
||||
print("inside mocked sendPosition")
|
||||
def mock_setFixedPosition(lat, lon, alt):
|
||||
print("inside mocked setFixedPosition")
|
||||
print(f"{lat} {lon} {alt}")
|
||||
|
||||
iface.sendPosition.side_effect = mock_sendPosition
|
||||
iface.localNode.return_value = mocked_node
|
||||
mocked_node.setFixedPosition.side_effect = mock_setFixedPosition
|
||||
|
||||
iface = MagicMock(autospec=SerialInterface)
|
||||
iface.localNode = mocked_node
|
||||
|
||||
with patch("meshtastic.serial_interface.SerialInterface", return_value=iface) as mo:
|
||||
main()
|
||||
@@ -754,8 +749,7 @@ def test_main_setlat(capsys):
|
||||
assert re.search(r"Connected to radio", out, re.MULTILINE)
|
||||
assert re.search(r"Fixing latitude", out, re.MULTILINE)
|
||||
assert re.search(r"Setting device position", out, re.MULTILINE)
|
||||
assert re.search(r"inside mocked sendPosition", out, re.MULTILINE)
|
||||
# TODO: Why does this not work? assert re.search(r'inside mocked writeConfig', out, re.MULTILINE)
|
||||
assert re.search(r"inside mocked setFixedPosition", out, re.MULTILINE)
|
||||
assert err == ""
|
||||
mo.assert_called()
|
||||
|
||||
@@ -769,19 +763,14 @@ def test_main_setlon(capsys):
|
||||
|
||||
mocked_node = MagicMock(autospec=Node)
|
||||
|
||||
def mock_writeConfig():
|
||||
print("inside mocked writeConfig")
|
||||
|
||||
mocked_node.writeConfig.side_effect = mock_writeConfig
|
||||
|
||||
iface = MagicMock(autospec=SerialInterface)
|
||||
|
||||
def mock_sendPosition(lat, lon, alt):
|
||||
print("inside mocked sendPosition")
|
||||
def mock_setFixedPosition(lat, lon, alt):
|
||||
print("inside mocked setFixedPosition")
|
||||
print(f"{lat} {lon} {alt}")
|
||||
|
||||
iface.sendPosition.side_effect = mock_sendPosition
|
||||
iface.localNode.return_value = mocked_node
|
||||
mocked_node.setFixedPosition.side_effect = mock_setFixedPosition
|
||||
|
||||
iface = MagicMock(autospec=SerialInterface)
|
||||
iface.localNode = mocked_node
|
||||
|
||||
with patch("meshtastic.serial_interface.SerialInterface", return_value=iface) as mo:
|
||||
main()
|
||||
@@ -789,8 +778,7 @@ def test_main_setlon(capsys):
|
||||
assert re.search(r"Connected to radio", out, re.MULTILINE)
|
||||
assert re.search(r"Fixing longitude", out, re.MULTILINE)
|
||||
assert re.search(r"Setting device position", out, re.MULTILINE)
|
||||
assert re.search(r"inside mocked sendPosition", out, re.MULTILINE)
|
||||
# TODO: Why does this not work? assert re.search(r'inside mocked writeConfig', out, re.MULTILINE)
|
||||
assert re.search(r"inside mocked setFixedPosition", out, re.MULTILINE)
|
||||
assert err == ""
|
||||
mo.assert_called()
|
||||
|
||||
@@ -804,19 +792,14 @@ def test_main_setalt(capsys):
|
||||
|
||||
mocked_node = MagicMock(autospec=Node)
|
||||
|
||||
def mock_writeConfig():
|
||||
print("inside mocked writeConfig")
|
||||
|
||||
mocked_node.writeConfig.side_effect = mock_writeConfig
|
||||
|
||||
iface = MagicMock(autospec=SerialInterface)
|
||||
|
||||
def mock_sendPosition(lat, lon, alt):
|
||||
print("inside mocked sendPosition")
|
||||
def mock_setFixedPosition(lat, lon, alt):
|
||||
print("inside mocked setFixedPosition")
|
||||
print(f"{lat} {lon} {alt}")
|
||||
|
||||
iface.sendPosition.side_effect = mock_sendPosition
|
||||
iface.localNode.return_value = mocked_node
|
||||
mocked_node.setFixedPosition.side_effect = mock_setFixedPosition
|
||||
|
||||
iface = MagicMock(autospec=SerialInterface)
|
||||
iface.localNode = mocked_node
|
||||
|
||||
with patch("meshtastic.serial_interface.SerialInterface", return_value=iface) as mo:
|
||||
main()
|
||||
@@ -824,8 +807,7 @@ def test_main_setalt(capsys):
|
||||
assert re.search(r"Connected to radio", out, re.MULTILINE)
|
||||
assert re.search(r"Fixing altitude", out, re.MULTILINE)
|
||||
assert re.search(r"Setting device position", out, re.MULTILINE)
|
||||
assert re.search(r"inside mocked sendPosition", out, re.MULTILINE)
|
||||
# TODO: Why does this not work? assert re.search(r'inside mocked writeConfig', out, re.MULTILINE)
|
||||
assert re.search(r"inside mocked setFixedPosition", out, re.MULTILINE)
|
||||
assert err == ""
|
||||
mo.assert_called()
|
||||
|
||||
|
||||
Reference in New Issue
Block a user