mirror of
https://github.com/meshtastic/python.git
synced 2026-01-17 20:28:01 -05:00
Merge pull request #665 from lysol/remote-admin-retry
Retry admin channel setting retrieval and add configurable timeout
This commit is contained in:
@@ -273,6 +273,12 @@ def onConnected(interface):
|
||||
try:
|
||||
args = mt_config.args
|
||||
|
||||
# convenient place to store any keyword args we pass to getNode
|
||||
getNode_kwargs = {
|
||||
"requestChannelAttempts": args.channel_fetch_attempts,
|
||||
"timeout": args.timeout
|
||||
}
|
||||
|
||||
# do not print this line if we are exporting the config
|
||||
if not args.export_config:
|
||||
print("Connected to radio")
|
||||
@@ -324,14 +330,14 @@ def onConnected(interface):
|
||||
print(f"Setting device owner to {args.set_owner}")
|
||||
else: # short name only
|
||||
print(f"Setting device owner short to {args.set_owner_short}")
|
||||
interface.getNode(args.dest, False).setOwner(long_name=args.set_owner, short_name=args.set_owner_short)
|
||||
interface.getNode(args.dest, False, **getNode_kwargs).setOwner(long_name=args.set_owner, short_name=args.set_owner_short)
|
||||
|
||||
# TODO: add to export-config and configure
|
||||
if args.set_canned_message:
|
||||
closeNow = True
|
||||
waitForAckNak = True
|
||||
print(f"Setting canned plugin message to {args.set_canned_message}")
|
||||
interface.getNode(args.dest, False).set_canned_message(
|
||||
interface.getNode(args.dest, False, **getNode_kwargs).set_canned_message(
|
||||
args.set_canned_message
|
||||
)
|
||||
|
||||
@@ -340,12 +346,12 @@ def onConnected(interface):
|
||||
closeNow = True
|
||||
waitForAckNak = True
|
||||
print(f"Setting ringtone to {args.set_ringtone}")
|
||||
interface.getNode(args.dest, False).set_ringtone(args.set_ringtone)
|
||||
interface.getNode(args.dest, False, **getNode_kwargs).set_ringtone(args.set_ringtone)
|
||||
|
||||
if args.pos_fields:
|
||||
# If --pos-fields invoked with args, set position fields
|
||||
closeNow = True
|
||||
positionConfig = interface.getNode(args.dest).localConfig.position
|
||||
positionConfig = interface.getNode(args.dest, **getNode_kwargs).localConfig.position
|
||||
allFields = 0
|
||||
|
||||
try:
|
||||
@@ -364,12 +370,12 @@ def onConnected(interface):
|
||||
print(f"Setting position fields to {allFields}")
|
||||
setPref(positionConfig, "position_flags", f"{allFields:d}")
|
||||
print("Writing modified preferences to device")
|
||||
interface.getNode(args.dest).writeConfig("position")
|
||||
interface.getNode(args.dest, **getNode_kwargs).writeConfig("position")
|
||||
|
||||
elif args.pos_fields is not None:
|
||||
# If --pos-fields invoked without args, read and display current value
|
||||
closeNow = True
|
||||
positionConfig = interface.getNode(args.dest).localConfig.position
|
||||
positionConfig = interface.getNode(args.dest, **getNode_kwargs).localConfig.position
|
||||
|
||||
fieldNames = []
|
||||
for bit in positionConfig.PositionFlags.values():
|
||||
@@ -380,57 +386,58 @@ def onConnected(interface):
|
||||
if args.set_ham:
|
||||
closeNow = True
|
||||
print(f"Setting Ham ID to {args.set_ham} and turning off encryption")
|
||||
interface.getNode(args.dest).setOwner(args.set_ham, is_licensed=True)
|
||||
interface.getNode(args.dest, **getNode_kwargs).setOwner(args.set_ham, is_licensed=True)
|
||||
# Must turn off encryption on primary channel
|
||||
interface.getNode(args.dest).turnOffEncryptionOnPrimaryChannel()
|
||||
interface.getNode(args.dest, **getNode_kwargs).turnOffEncryptionOnPrimaryChannel()
|
||||
|
||||
if args.reboot:
|
||||
closeNow = True
|
||||
waitForAckNak = True
|
||||
interface.getNode(args.dest, False).reboot()
|
||||
interface.getNode(args.dest, False, **getNode_kwargs).reboot()
|
||||
|
||||
if args.reboot_ota:
|
||||
closeNow = True
|
||||
waitForAckNak = True
|
||||
interface.getNode(args.dest, False).rebootOTA()
|
||||
interface.getNode(args.dest, False, **getNode_kwargs).rebootOTA()
|
||||
|
||||
if args.enter_dfu:
|
||||
closeNow = True
|
||||
waitForAckNak = True
|
||||
interface.getNode(args.dest, False).enterDFUMode()
|
||||
interface.getNode(args.dest, False, **getNode_kwargs).enterDFUMode()
|
||||
|
||||
if args.shutdown:
|
||||
closeNow = True
|
||||
waitForAckNak = True
|
||||
interface.getNode(args.dest, False).shutdown()
|
||||
interface.getNode(args.dest, False, **getNode_kwargs).shutdown()
|
||||
|
||||
if args.device_metadata:
|
||||
closeNow = True
|
||||
interface.getNode(args.dest, False).getMetadata()
|
||||
interface.getNode(args.dest, False, **getNode_kwargs).getMetadata()
|
||||
|
||||
if args.begin_edit:
|
||||
closeNow = True
|
||||
interface.getNode(args.dest, False).beginSettingsTransaction()
|
||||
interface.getNode(args.dest, False, **getNode_kwargs).beginSettingsTransaction()
|
||||
|
||||
if args.commit_edit:
|
||||
closeNow = True
|
||||
interface.getNode(args.dest, False).commitSettingsTransaction()
|
||||
interface.getNode(args.dest, False, **getNode_kwargs).commitSettingsTransaction()
|
||||
|
||||
if args.factory_reset or args.factory_reset_device:
|
||||
closeNow = True
|
||||
waitForAckNak = True
|
||||
|
||||
full = bool(args.factory_reset_device)
|
||||
interface.getNode(args.dest, False).factoryReset(full=full)
|
||||
interface.getNode(args.dest, False, **getNode_kwargs).factoryReset(full=full)
|
||||
|
||||
if args.remove_node:
|
||||
closeNow = True
|
||||
waitForAckNak = True
|
||||
interface.getNode(args.dest, False).removeNode(args.remove_node)
|
||||
interface.getNode(args.dest, False, **getNode_kwargs).removeNode(args.remove_node)
|
||||
|
||||
if args.reset_nodedb:
|
||||
closeNow = True
|
||||
waitForAckNak = True
|
||||
interface.getNode(args.dest, False).resetNodeDb()
|
||||
interface.getNode(args.dest, False, **getNode_kwargs).resetNodeDb()
|
||||
|
||||
if args.sendtext:
|
||||
closeNow = True
|
||||
@@ -444,7 +451,7 @@ def onConnected(interface):
|
||||
args.dest,
|
||||
wantAck=True,
|
||||
channelIndex=channelIndex,
|
||||
onResponse=interface.getNode(args.dest, False).onAckNak,
|
||||
onResponse=interface.getNode(args.dest, False, **getNode_kwargs).onAckNak,
|
||||
)
|
||||
else:
|
||||
meshtastic.util.our_exit(
|
||||
@@ -535,7 +542,7 @@ def onConnected(interface):
|
||||
if args.set:
|
||||
closeNow = True
|
||||
waitForAckNak = True
|
||||
node = interface.getNode(args.dest, False)
|
||||
node = interface.getNode(args.dest, False, **getNode_kwargs)
|
||||
|
||||
# Handle the int/float/bool arguments
|
||||
pref = None
|
||||
@@ -574,19 +581,19 @@ def onConnected(interface):
|
||||
configuration = yaml.safe_load(file)
|
||||
closeNow = True
|
||||
|
||||
interface.getNode(args.dest, False).beginSettingsTransaction()
|
||||
interface.getNode(args.dest, False, **getNode_kwargs).beginSettingsTransaction()
|
||||
|
||||
if "owner" in configuration:
|
||||
print(f"Setting device owner to {configuration['owner']}")
|
||||
waitForAckNak = True
|
||||
interface.getNode(args.dest, False).setOwner(configuration["owner"])
|
||||
interface.getNode(args.dest, False, **getNode_kwargs).setOwner(configuration["owner"])
|
||||
|
||||
if "owner_short" in configuration:
|
||||
print(
|
||||
f"Setting device owner short to {configuration['owner_short']}"
|
||||
)
|
||||
waitForAckNak = True
|
||||
interface.getNode(args.dest, False).setOwner(
|
||||
interface.getNode(args.dest, False, **getNode_kwargs).setOwner(
|
||||
long_name=None, short_name=configuration["owner_short"]
|
||||
)
|
||||
|
||||
@@ -595,17 +602,17 @@ def onConnected(interface):
|
||||
f"Setting device owner short to {configuration['ownerShort']}"
|
||||
)
|
||||
waitForAckNak = True
|
||||
interface.getNode(args.dest, False).setOwner(
|
||||
interface.getNode(args.dest, False, **getNode_kwargs).setOwner(
|
||||
long_name=None, short_name=configuration["ownerShort"]
|
||||
)
|
||||
|
||||
if "channel_url" in configuration:
|
||||
print("Setting channel url to", configuration["channel_url"])
|
||||
interface.getNode(args.dest).setURL(configuration["channel_url"])
|
||||
interface.getNode(args.dest, **getNode_kwargs).setURL(configuration["channel_url"])
|
||||
|
||||
if "channelUrl" in configuration:
|
||||
print("Setting channel url to", configuration["channelUrl"])
|
||||
interface.getNode(args.dest).setURL(configuration["channelUrl"])
|
||||
interface.getNode(args.dest, **getNode_kwargs).setURL(configuration["channelUrl"])
|
||||
|
||||
if "location" in configuration:
|
||||
alt = 0
|
||||
@@ -630,28 +637,28 @@ def onConnected(interface):
|
||||
interface.localNode.writeConfig("position")
|
||||
|
||||
if "config" in configuration:
|
||||
localConfig = interface.getNode(args.dest).localConfig
|
||||
localConfig = interface.getNode(args.dest, **getNode_kwargs).localConfig
|
||||
for section in configuration["config"]:
|
||||
traverseConfig(
|
||||
section, configuration["config"][section], localConfig
|
||||
)
|
||||
interface.getNode(args.dest).writeConfig(
|
||||
interface.getNode(args.dest, **getNode_kwargs).writeConfig(
|
||||
meshtastic.util.camel_to_snake(section)
|
||||
)
|
||||
|
||||
if "module_config" in configuration:
|
||||
moduleConfig = interface.getNode(args.dest).moduleConfig
|
||||
moduleConfig = interface.getNode(args.dest, **getNode_kwargs).moduleConfig
|
||||
for section in configuration["module_config"]:
|
||||
traverseConfig(
|
||||
section,
|
||||
configuration["module_config"][section],
|
||||
moduleConfig,
|
||||
)
|
||||
interface.getNode(args.dest).writeConfig(
|
||||
interface.getNode(args.dest, **getNode_kwargs).writeConfig(
|
||||
meshtastic.util.camel_to_snake(section)
|
||||
)
|
||||
|
||||
interface.getNode(args.dest, False).commitSettingsTransaction()
|
||||
interface.getNode(args.dest, False, **getNode_kwargs).commitSettingsTransaction()
|
||||
print("Writing modified configuration to device")
|
||||
|
||||
if args.export_config:
|
||||
@@ -664,7 +671,7 @@ def onConnected(interface):
|
||||
|
||||
if args.seturl:
|
||||
closeNow = True
|
||||
interface.getNode(args.dest).setURL(args.seturl)
|
||||
interface.getNode(args.dest, **getNode_kwargs).setURL(args.seturl)
|
||||
|
||||
# handle changing channels
|
||||
|
||||
@@ -680,7 +687,7 @@ def onConnected(interface):
|
||||
meshtastic.util.our_exit(
|
||||
"Warning: Channel name must be shorter. Channel not added."
|
||||
)
|
||||
n = interface.getNode(args.dest)
|
||||
n = interface.getNode(args.dest, **getNode_kwargs)
|
||||
ch = n.getChannelByName(args.ch_add)
|
||||
if ch:
|
||||
meshtastic.util.our_exit(
|
||||
@@ -719,7 +726,7 @@ def onConnected(interface):
|
||||
)
|
||||
else:
|
||||
print(f"Deleting channel {channelIndex}")
|
||||
ch = interface.getNode(args.dest).deleteChannel(channelIndex)
|
||||
ch = interface.getNode(args.dest, **getNode_kwargs).deleteChannel(channelIndex)
|
||||
|
||||
def setSimpleConfig(modem_preset):
|
||||
"""Set one of the simple modem_config"""
|
||||
@@ -729,7 +736,7 @@ def onConnected(interface):
|
||||
"Warning: Cannot set modem preset for non-primary channel", 1
|
||||
)
|
||||
# Overwrite modem_preset
|
||||
node = interface.getNode(args.dest, False)
|
||||
node = interface.getNode(args.dest, False, **getNode_kwargs)
|
||||
if len(node.localConfig.ListFields()) == 0:
|
||||
node.requestConfig(node.localConfig.DESCRIPTOR.fields_by_name.get("lora"))
|
||||
node.localConfig.lora.modem_preset = modem_preset
|
||||
@@ -763,7 +770,7 @@ def onConnected(interface):
|
||||
channelIndex = mt_config.channel_index
|
||||
if channelIndex is None:
|
||||
meshtastic.util.our_exit("Warning: Need to specify '--ch-index'.", 1)
|
||||
node = interface.getNode(args.dest)
|
||||
node = interface.getNode(args.dest, **getNode_kwargs)
|
||||
ch = node.channels[channelIndex]
|
||||
|
||||
if args.ch_enable or args.ch_disable:
|
||||
@@ -827,12 +834,12 @@ def onConnected(interface):
|
||||
if args.get_canned_message:
|
||||
closeNow = True
|
||||
print("")
|
||||
interface.getNode(args.dest).get_canned_message()
|
||||
interface.getNode(args.dest, **getNode_kwargs).get_canned_message()
|
||||
|
||||
if args.get_ringtone:
|
||||
closeNow = True
|
||||
print("")
|
||||
interface.getNode(args.dest).get_ringtone()
|
||||
interface.getNode(args.dest, **getNode_kwargs).get_ringtone()
|
||||
|
||||
if args.info:
|
||||
print("")
|
||||
@@ -840,7 +847,7 @@ def onConnected(interface):
|
||||
if args.dest == BROADCAST_ADDR:
|
||||
interface.showInfo()
|
||||
print("")
|
||||
interface.getNode(args.dest).showInfo()
|
||||
interface.getNode(args.dest, **getNode_kwargs).showInfo()
|
||||
closeNow = True
|
||||
print("")
|
||||
pypi_version = meshtastic.util.check_if_newer_version()
|
||||
@@ -857,7 +864,7 @@ def onConnected(interface):
|
||||
|
||||
if args.get:
|
||||
closeNow = True
|
||||
node = interface.getNode(args.dest, False)
|
||||
node = interface.getNode(args.dest, False, **getNode_kwargs)
|
||||
for pref in args.get:
|
||||
found = getPref(node, pref[0])
|
||||
|
||||
@@ -873,7 +880,7 @@ def onConnected(interface):
|
||||
|
||||
if args.qr or args.qr_all:
|
||||
closeNow = True
|
||||
url = interface.getNode(args.dest, True).getURL(includeAll=args.qr_all)
|
||||
url = interface.getNode(args.dest, True, **getNode_kwargs).getURL(includeAll=args.qr_all)
|
||||
if args.qr_all:
|
||||
urldesc = "Complete URL (includes all channels)"
|
||||
else:
|
||||
@@ -928,7 +935,7 @@ def onConnected(interface):
|
||||
print(
|
||||
f"Waiting for an acknowledgment from remote node (this could take a while)"
|
||||
)
|
||||
interface.getNode(args.dest, False).iface.waitForAckNak()
|
||||
interface.getNode(args.dest, False, **getNode_kwargs).iface.waitForAckNak()
|
||||
|
||||
if args.wait_to_disconnect:
|
||||
print(f"Waiting {args.wait_to_disconnect} seconds before disconnecting")
|
||||
@@ -1408,6 +1415,20 @@ def initParser():
|
||||
action="append",
|
||||
)
|
||||
|
||||
group.add_argument(
|
||||
"--channel-fetch-attempts",
|
||||
help=("Attempt to retrieve channel settings for --ch-set this many times before giving up."),
|
||||
default=3,
|
||||
type=int,
|
||||
)
|
||||
|
||||
group.add_argument(
|
||||
"--timeout",
|
||||
help="How long to wait for replies",
|
||||
default=300,
|
||||
type=int,
|
||||
)
|
||||
|
||||
group.add_argument(
|
||||
"--ch-vlongslow",
|
||||
help="Change to the very long-range and slow channel",
|
||||
|
||||
@@ -315,19 +315,33 @@ class MeshInterface: # pylint: disable=R0902
|
||||
return table
|
||||
|
||||
def getNode(
|
||||
self, nodeId: str, requestChannels: bool = True
|
||||
self, nodeId: str, requestChannels: bool = True, requestChannelAttempts: int = 3, timeout: int = 300
|
||||
) -> meshtastic.node.Node:
|
||||
"""Return a node object which contains device settings and channel info"""
|
||||
if nodeId in (LOCAL_ADDR, BROADCAST_ADDR):
|
||||
return self.localNode
|
||||
else:
|
||||
n = meshtastic.node.Node(self, nodeId)
|
||||
n = meshtastic.node.Node(self, nodeId, timeout=timeout)
|
||||
# Only request device settings and channel info when necessary
|
||||
if requestChannels:
|
||||
logging.debug("About to requestChannels")
|
||||
n.requestChannels()
|
||||
if not n.waitForConfig():
|
||||
our_exit("Error: Timed out waiting for channels")
|
||||
retries_left = requestChannelAttempts
|
||||
last_index: int = 0
|
||||
while retries_left > 0:
|
||||
retries_left -= 1
|
||||
if not n.waitForConfig():
|
||||
new_index: int = len(n.partialChannels) if n.partialChannels else 0
|
||||
# each time we get a new channel, reset the counter
|
||||
if new_index != last_index:
|
||||
retries_left = requestChannelAttempts - 1
|
||||
if retries_left <= 0:
|
||||
our_exit(f"Error: Timed out waiting for channels, giving up")
|
||||
print("Timed out trying to retrieve channel info, retrying")
|
||||
n.requestChannels(startingIndex=new_index)
|
||||
last_index = new_index
|
||||
else:
|
||||
break
|
||||
return n
|
||||
|
||||
def sendText(
|
||||
|
||||
@@ -5,7 +5,7 @@ import base64
|
||||
import logging
|
||||
import time
|
||||
|
||||
from typing import Optional, Union
|
||||
from typing import Optional, Union, List
|
||||
|
||||
from meshtastic.protobuf import admin_pb2, apponly_pb2, channel_pb2, localonly_pb2, mesh_pb2, portnums_pb2
|
||||
from meshtastic.util import (
|
||||
@@ -25,15 +25,15 @@ class Node:
|
||||
Includes methods for localConfig, moduleConfig and channels
|
||||
"""
|
||||
|
||||
def __init__(self, iface, nodeNum, noProto=False):
|
||||
def __init__(self, iface, nodeNum, noProto=False, timeout: int = 300):
|
||||
"""Constructor"""
|
||||
self.iface = iface
|
||||
self.nodeNum = nodeNum
|
||||
self.localConfig = localonly_pb2.LocalConfig()
|
||||
self.moduleConfig = localonly_pb2.LocalModuleConfig()
|
||||
self.channels = None
|
||||
self._timeout = Timeout(maxSecs=300)
|
||||
self.partialChannels = None
|
||||
self._timeout = Timeout(maxSecs=timeout)
|
||||
self.partialChannels: Optional[List] = None
|
||||
self.noProto = noProto
|
||||
self.cannedPluginMessage = None
|
||||
self.cannedPluginMessageMessages = None
|
||||
@@ -77,13 +77,14 @@ class Node:
|
||||
self.channels = channels
|
||||
self._fixupChannels()
|
||||
|
||||
def requestChannels(self):
|
||||
def requestChannels(self, startingIndex: int = 0):
|
||||
"""Send regular MeshPackets to ask channels."""
|
||||
logging.debug(f"requestChannels for nodeNum:{self.nodeNum}")
|
||||
self.channels = None
|
||||
self.partialChannels = [] # We keep our channels in a temp array until finished
|
||||
|
||||
self._requestChannel(0)
|
||||
# only initialize if we're starting out fresh
|
||||
if startingIndex == 0:
|
||||
self.channels = None
|
||||
self.partialChannels = [] # We keep our channels in a temp array until finished
|
||||
self._requestChannel(startingIndex)
|
||||
|
||||
def onResponseRequestSettings(self, p):
|
||||
"""Handle the response packets for requesting settings _requestSettings()"""
|
||||
|
||||
@@ -173,7 +173,23 @@ def test_getNode_not_local_timeout(capsys):
|
||||
assert pytest_wrapped_e.type == SystemExit
|
||||
assert pytest_wrapped_e.value.code == 1
|
||||
out, err = capsys.readouterr()
|
||||
assert re.match(r"Error: Timed out waiting for channels", out)
|
||||
assert re.match(r"Timed out trying to retrieve channel info, retrying", out)
|
||||
assert err == ""
|
||||
|
||||
@pytest.mark.unit
|
||||
@pytest.mark.usefixtures("reset_mt_config")
|
||||
def test_getNode_not_local_timeout_attempts(capsys):
|
||||
"""Test getNode not local, simulate timeout"""
|
||||
iface = MeshInterface(noProto=True)
|
||||
anode = MagicMock(autospec=Node)
|
||||
anode.waitForConfig.return_value = False
|
||||
with patch("meshtastic.node.Node", return_value=anode):
|
||||
with pytest.raises(SystemExit) as pytest_wrapped_e:
|
||||
iface.getNode("bar2", requestChannelAttempts=2)
|
||||
assert pytest_wrapped_e.type == SystemExit
|
||||
assert pytest_wrapped_e.value.code == 1
|
||||
out, err = capsys.readouterr()
|
||||
assert out == 'Timed out trying to retrieve channel info, retrying\nError: Timed out waiting for channels, giving up\n'
|
||||
assert err == ""
|
||||
|
||||
|
||||
|
||||
@@ -842,6 +842,34 @@ def test_requestChannel_localNode(caplog):
|
||||
assert re.search(r"Requesting channel 0", caplog.text, re.MULTILINE)
|
||||
assert not re.search(r"from remote node", caplog.text, re.MULTILINE)
|
||||
|
||||
@pytest.mark.unit
|
||||
def test_requestChannels_non_localNode(caplog):
|
||||
"""Test requestChannels() with a starting index of 0"""
|
||||
iface = MagicMock(autospec=SerialInterface)
|
||||
with patch("meshtastic.serial_interface.SerialInterface", return_value=iface) as mo:
|
||||
mo.localNode.getChannelByName.return_value = None
|
||||
mo.myInfo.max_channels = 8
|
||||
anode = Node(mo, "bar", noProto=True)
|
||||
anode.partialChannels = ['0']
|
||||
with caplog.at_level(logging.DEBUG):
|
||||
anode.requestChannels(0)
|
||||
assert re.search(f"Requesting channel 0 info from remote node", caplog.text, re.MULTILINE)
|
||||
assert anode.partialChannels == []
|
||||
|
||||
@pytest.mark.unit
|
||||
def test_requestChannels_non_localNode_starting_index(caplog):
|
||||
"""Test requestChannels() with a starting index of non-0"""
|
||||
iface = MagicMock(autospec=SerialInterface)
|
||||
with patch("meshtastic.serial_interface.SerialInterface", return_value=iface) as mo:
|
||||
mo.localNode.getChannelByName.return_value = None
|
||||
mo.myInfo.max_channels = 8
|
||||
anode = Node(mo, "bar", noProto=True)
|
||||
anode.partialChannels = ['1']
|
||||
with caplog.at_level(logging.DEBUG):
|
||||
anode.requestChannels(3)
|
||||
assert re.search(f"Requesting channel 3 info from remote node", caplog.text, re.MULTILINE)
|
||||
# make sure it hasn't been initialized
|
||||
assert anode.partialChannels == ['1']
|
||||
|
||||
# @pytest.mark.unit
|
||||
# def test_onResponseRequestCannedMessagePluginMesagePart1(caplog):
|
||||
|
||||
@@ -650,3 +650,12 @@ def test_fuzz_fromStr(valstr):
|
||||
assert isinstance(result, int)
|
||||
except ValueError:
|
||||
assert isinstance(result, str)
|
||||
|
||||
def test_shorthex():
|
||||
"""Test the shortest hex string representations"""
|
||||
result = fromStr('0x0')
|
||||
assert result == b'\x00'
|
||||
result = fromStr('0x5')
|
||||
assert result == b'\x05'
|
||||
result = fromStr('0xffff')
|
||||
assert result == b'\xff\xff'
|
||||
|
||||
@@ -82,7 +82,7 @@ def fromStr(valstr):
|
||||
val = bytes()
|
||||
elif valstr.startswith("0x"):
|
||||
# if needed convert to string with asBytes.decode('utf-8')
|
||||
val = bytes.fromhex(valstr[2:])
|
||||
val = bytes.fromhex(valstr[2:].zfill(2))
|
||||
elif valstr.startswith("base64:"):
|
||||
val = base64.b64decode(valstr[7:])
|
||||
elif valstr.lower() in {"t", "true", "yes"}:
|
||||
|
||||
Reference in New Issue
Block a user