Open temp file in binary mode. Let ZipFile work with filename,

instead of file-like object.
This commit is contained in:
Tom Keffer
2019-02-12 06:10:43 -08:00
parent 54b13ff02f
commit ce8415fbbc
2 changed files with 65 additions and 141 deletions

View File

@@ -186,14 +186,13 @@ def save(config_dict, config_path, backup=False):
backup_path = weeutil.weeutil.move_with_timestamp(config_path)
# Now we can save the file. Get a temporary file:
tmpfile = tempfile.NamedTemporaryFile("w")
with tempfile.NamedTemporaryFile() as tmpfile:
# Write the configuration dictionary to it:
config_dict.write(tmpfile)
tmpfile.flush()
# Write the configuration dictionary to it:
config_dict.write(tmpfile)
tmpfile.flush()
# Now move the temporary file into the proper place:
shutil.copyfile(tmpfile.name, config_path)
# Now move the temporary file into the proper place:
shutil.copyfile(tmpfile.name, config_path)
else:
@@ -1552,10 +1551,12 @@ def extract_zip(filename, target_dir, logger=None):
logger = logger or Logger()
import zipfile
logger.log("Extracting from zip archive %s" % filename, level=1)
zip_archive = None
zip_archive = zipfile.ZipFile(filename)
try:
zip_archive = zipfile.ZipFile(open(filename, mode='r'))
member_names = zip_archive.namelist()
# manually extract files since extractall is only in python 2.6+
# zip_archive.extractall(target_dir)
for f in member_names:
@@ -1569,8 +1570,7 @@ def extract_zip(filename, target_dir, logger=None):
dest_file.write(zip_archive.read(f))
return member_names
finally:
if zip_archive is not None:
zip_archive.close()
zip_archive.close()
def mkdir_p(path):

View File

@@ -1,5 +1,5 @@
#
# Copyright (c) 2009-2015 Tom Keffer <tkeffer@gmail.com>
# Copyright (c) 2009-2019 Tom Keffer <tkeffer@gmail.com>
#
# See the file LICENSE.txt for your full rights.
#
@@ -15,14 +15,16 @@ import tempfile
import unittest
import configobj
import six
from six.moves import StringIO
import weecfg.extension
import weeutil.weeutil
try:
from mock import patch
from six.moves import builtins
if six.PY2:
__input_str__ = '__builtin__.raw_input'
else:
@@ -33,7 +35,6 @@ except ImportError:
print("Module 'mock' not installed. Testing will be restricted.")
have_mock = False
TMPDIR = '/var/tmp/weewx_test'
# Redirect the import of setup:
@@ -76,6 +77,7 @@ y_str = """
current_config_dict_path = "../../../weewx.conf"
class LineTest(unittest.TestCase):
def _check_against_expected(self, config_dict, expected):
@@ -85,22 +87,24 @@ class LineTest(unittest.TestCase):
expected: The name of a file holding the expected version
"""
# Write the ConfigObj out to a io.BytesIO, then start checking it against the expected
# Writing a ConfigObj to a file-like object always writes in bytes,
# so we cannot write to a StringIO (which accepts only Unicode under Python 3).
# Use a BytesIO object instead, which accepts byte strings.
with io.BytesIO() as out_str:
config_dict.write(out_str)
check_fileend(out_str)
out_str.seek(0)
fd_expected = open(expected)
N = 0
for expected in fd_expected:
actual = out_str.readline()
N += 1
self.assertEqual(actual.strip(), expected.strip(), "[%d] '%s' vs '%s'" % (N, actual, expected))
with open(expected, 'r') as fd_expected:
N = 0
for expected in fd_expected:
actual = out_str.readline()
N += 1
self.assertEqual(actual.strip(), expected.strip(), "[%d] '%s' vs '%s'" % (N, actual, expected))
# Make sure there are no extra lines in the updated config:
more = out_str.readline()
self.assertEqual(more, '')
# Make sure there are no extra lines in the updated config:
more = out_str.readline()
self.assertEqual(more, '')
class ConfigTest(LineTest):
@@ -134,32 +138,32 @@ class ConfigTest(LineTest):
def test_utilities(self):
global x_str, y_str
xio = io.BytesIO(x_str)
x_dict = configobj.ConfigObj(xio, encoding='utf-8', default_encoding='utf-8')
weecfg.reorder_sections(x_dict, 'section_c', 'section_b')
self.assertEqual("{'section_a': {'a': '1'}, 'section_c': {'c': '3'}, "
"'section_b': {'b': '2'}, 'section_d': {'d': '4'}}", str(x_dict))
with StringIO(x_str) as xio:
x_dict = configobj.ConfigObj(xio, encoding='utf-8')
weecfg.reorder_sections(x_dict, 'section_c', 'section_b')
self.assertEqual(str(x_dict), "{'section_a': {'a': '1'}, 'section_c': {'c': '3'}, "
"'section_b': {'b': '2'}, 'section_d': {'d': '4'}}")
xio.seek(0)
x_dict = configobj.ConfigObj(xio, encoding='utf-8', default_encoding='utf-8')
weecfg.reorder_sections(x_dict, 'section_c', 'section_b', after=True)
self.assertEqual("{'section_a': {'a': '1'}, 'section_b': {'b': '2'}, "
"'section_c': {'c': '3'}, 'section_d': {'d': '4'}}", str(x_dict))
xio.seek(0)
x_dict = configobj.ConfigObj(xio, encoding='utf-8')
weecfg.reorder_sections(x_dict, 'section_c', 'section_b', after=True)
self.assertEqual(str(x_dict), "{'section_a': {'a': '1'}, 'section_b': {'b': '2'}, "
"'section_c': {'c': '3'}, 'section_d': {'d': '4'}}")
xio = io.BytesIO(x_str)
yio = io.BytesIO(y_str)
x_dict = configobj.ConfigObj(xio, encoding='utf-8', default_encoding='utf-8')
y_dict = configobj.ConfigObj(yio, encoding='utf-8', default_encoding='utf-8')
weeutil.weeutil.conditional_merge(x_dict, y_dict)
self.assertEqual("{'section_a': {'a': '1'}, 'section_b': {'b': '2'}, 'section_c': {'c': '3'}, "
"'section_d': {'d': '4'}, 'section_e': {'c': '15'}}", str(x_dict))
with StringIO(x_str) as xio:
with StringIO(y_str) as yio:
x_dict = configobj.ConfigObj(xio, encoding='utf-8')
y_dict = configobj.ConfigObj(yio, encoding='utf-8')
weeutil.weeutil.conditional_merge(x_dict, y_dict)
self.assertEqual(str(x_dict), "{'section_a': {'a': '1'}, 'section_b': {'b': '2'}, 'section_c': {'c': '3'}, "
"'section_d': {'d': '4'}, 'section_e': {'c': '15'}}")
xio = io.BytesIO(x_str)
yio = io.BytesIO(y_str)
x_dict = configobj.ConfigObj(xio, encoding='utf-8', default_encoding='utf-8')
y_dict = configobj.ConfigObj(yio, encoding='utf-8', default_encoding='utf-8')
weecfg.remove_and_prune(x_dict, y_dict)
self.assertEqual("{'section_c': {'c': '3'}, 'section_d': {'d': '4'}}", str(x_dict))
with StringIO(x_str) as xio:
with StringIO(y_str) as yio:
x_dict = configobj.ConfigObj(xio, encoding='utf-8')
y_dict = configobj.ConfigObj(yio, encoding='utf-8')
weecfg.remove_and_prune(x_dict, y_dict)
self.assertEqual(str(x_dict), "{'section_c': {'c': '3'}, 'section_d': {'d': '4'}}")
test_list = ['a', 'b', 'd', 'c']
weecfg.reorder_scalars(test_list, 'c', 'd')
@@ -173,50 +177,6 @@ class ConfigTest(LineTest):
weecfg.reorder_scalars(test_list, 'x', 'd')
self.assertEqual(test_list, ['a', 'b', 'd'])
# report_start_str = """[StdReport]
# SKIN_ROOT = skins
# HTML_ROOT = public_html
# data_binding = wx_binding
# [[XtraReport]]
# skin = Foo
#
# [[StandardReport]]
# skin = Standard
#
# [[FTP]]
# skin = Ftp
#
# [[RSYNC]]
# skin = Rsync
# """
#
# report_expected_str = """[StdReport]
# SKIN_ROOT = skins
# HTML_ROOT = public_html
# data_binding = wx_binding
#
# [[StandardReport]]
# skin = Standard
# [[XtraReport]]
# skin = Foo
#
# [[FTP]]
# skin = Ftp
#
# [[RSYNC]]
# skin = Rsync
# """
#
# def test_reorder(self):
# """Test the utility reorder_to_ref"""
# xio = io.BytesIO(ConfigTest.report_start_str)
# x_dict = configobj.ConfigObj(xio)
# weecfg.reorder_to_ref(x_dict)
# x_result = io.BytesIO()
# x_dict.write(x_result)
# check_fileend(x_result)
# self.assertEqual(ConfigTest.report_expected_str, x_result.getvalue())
if have_mock:
def test_prompt_for_info(self):
@@ -323,7 +283,7 @@ class ConfigTest(LineTest):
def test_upgrade_v25(self):
# Start with the Version 2.0 weewx.conf file:
config_dict = configobj.ConfigObj('weewx20.conf', encoding='utf-8', default_encoding='utf-8')
config_dict = configobj.ConfigObj('weewx20.conf', encoding='utf-8')
# Upgrade the V2.0 configuration dictionary to V2.5:
weecfg.update_to_v25(config_dict)
@@ -333,7 +293,7 @@ class ConfigTest(LineTest):
def test_upgrade_v26(self):
# Start with the Version 2.5 weewx.conf file:
config_dict = configobj.ConfigObj('weewx25.conf', encoding='utf-8', default_encoding='utf-8')
config_dict = configobj.ConfigObj('weewx25.conf', encoding='utf-8')
# Upgrade the V2.5 configuration dictionary to V2.6:
weecfg.update_to_v26(config_dict)
@@ -343,7 +303,7 @@ class ConfigTest(LineTest):
def test_upgrade_v30(self):
# Start with the Version 2.7 weewx.conf file:
config_dict = configobj.ConfigObj('weewx27.conf', encoding='utf-8', default_encoding='utf-8')
config_dict = configobj.ConfigObj('weewx27.conf', encoding='utf-8')
# Upgrade the V2.7 configuration dictionary to V3.0:
weecfg.update_to_v30(config_dict)
@@ -353,7 +313,7 @@ class ConfigTest(LineTest):
def test_upgrade_v32(self):
# Start with the Version 3.0 weewx.conf file:
config_dict = configobj.ConfigObj('weewx30.conf', encoding='utf-8', default_encoding='utf-8')
config_dict = configobj.ConfigObj('weewx30.conf', encoding='utf-8')
# Upgrade the V3.0 configuration dictionary to V3.2:
weecfg.update_to_v32(config_dict)
@@ -363,7 +323,7 @@ class ConfigTest(LineTest):
def test_upgrade_v36(self):
# Start with the Version 3.2 weewx.conf file:
config_dict = configobj.ConfigObj('weewx32.conf', encoding='utf-8', default_encoding='utf-8')
config_dict = configobj.ConfigObj('weewx32.conf', encoding='utf-8')
# Upgrade the V3.2 configuration dictionary to V3.6:
weecfg.update_to_v36(config_dict)
@@ -373,7 +333,7 @@ class ConfigTest(LineTest):
def test_upgrade_v39(self):
# Start with the Version 3.8 weewx.conf file:
config_dict = configobj.ConfigObj('weewx38.conf', encoding='utf-8', default_encoding='utf-8')
config_dict = configobj.ConfigObj('weewx38.conf', encoding='utf-8')
# Upgrade the V3.8 configuration dictionary to V3.9:
weecfg.update_to_v39(config_dict)
@@ -386,10 +346,10 @@ class ConfigTest(LineTest):
def test_merge(self):
# Start with a typical V2.0 user file:
config_dict = configobj.ConfigObj('weewx20_user.conf', encoding='utf-8', default_encoding='utf-8')
config_dict = configobj.ConfigObj('weewx20_user.conf', encoding='utf-8')
# The current config file becomes the template:
template = configobj.ConfigObj(current_config_dict_path, encoding='utf-8', default_encoding='utf-8')
template = configobj.ConfigObj(current_config_dict_path, encoding='utf-8')
# First update, then merge:
weecfg.update_and_merge(config_dict, template)
@@ -414,7 +374,7 @@ class ConfigTest(LineTest):
def test_modify_config(self):
# Use the current weewx.conf
config_dict = configobj.ConfigObj(current_config_dict_path, encoding='utf-8', default_encoding='utf-8')
config_dict = configobj.ConfigObj(current_config_dict_path, encoding='utf-8')
stn_info = weecfg.get_station_info(config_dict)
@@ -433,47 +393,11 @@ class ConfigTest(LineTest):
# Make sure the driver stanza got injected correctly
import weewx.drivers.vantage
vcf = weewx.drivers.vantage.VantageConfEditor()
default_config = configobj.ConfigObj(io.BytesIO(vcf.default_stanza), encoding='utf-8', default_encoding='utf-8')
default_config = configobj.ConfigObj(StringIO(vcf.default_stanza), encoding='utf-8')
self.assertEqual(config_dict['Vantage'], default_config['Vantage'])
# class SkinPatchTest(LineTest):
#
# def setUp(self):
# self.skin_dir = os.path.join(TMPDIR, 'skins38')
# shutil.rmtree(self.skin_dir, ignore_errors=True)
# shutil.copytree('skins38', self.skin_dir)
#
# def tearDown(self):
# pass
# #shutil.rmtree(os.path.join(TMPDIR, 'skin38'), ignore_errors=True)
#
# def test_patch_skin(self):
#
# config_dict = configobj.ConfigObj('weewx38_user.conf')
# config_dict['WEEWX_ROOT'] = TMPDIR
# # Upgrade the V3.8 configuration dictionary to V3.9:
# weecfg.update_to_v39(config_dict)
# weecfg.patch_skins(config_dict)
#
# # Find the patched skin.conf ...
# skin_file = os.path.join(
# config_dict['WEEWX_ROOT'],
# config_dict['StdReport']['SKIN_ROOT'],
# config_dict['StdReport']['StandardReport'].get('skin', ''),
# 'skin.conf')
# # ... retrieve it ...
# skin_dict = configobj.ConfigObj(skin_file)
#
# with open('expected/skin39.conf', 'w') as fd:
# skin_dict.write(fd)
#
# # ... and check it against the expected
# self._check_against_expected(skin_dict, 'expected/skin39.conf')
class ExtensionUtilityTest(unittest.TestCase):
"""Tests of utility functions used by the extension installer."""
@@ -582,7 +506,7 @@ class ExtensionInstallTest(unittest.TestCase):
def test_install(self):
# Find and read the test configuration
config_path = os.path.join(self.weewx_root, 'weewx.conf')
config_dict = configobj.ConfigObj(config_path, encoding='utf-8', default_encoding='utf-8')
config_dict = configobj.ConfigObj(config_path, encoding='utf-8')
# Note that the actual location of the "mini-weewx" is over in /var/tmp
config_dict['WEEWX_ROOT'] = self.weewx_root
@@ -611,7 +535,7 @@ class ExtensionInstallTest(unittest.TestCase):
self.assertTrue(os.path.isfile(os.path.join(self.skin_dir, 'pmon', 'skin.conf')))
# Get, then check the new config dict:
test_dict = configobj.ConfigObj(config_path, encoding='utf-8', default_encoding='utf-8')
test_dict = configobj.ConfigObj(config_path, encoding='utf-8')
self.assertEqual(test_dict['StdReport']['pmon'],
{'HTML_ROOT': 'public_html/pmon', 'skin': 'pmon'})
self.assertEqual(test_dict['Databases']['pmon_sqlite'],
@@ -631,7 +555,7 @@ class ExtensionInstallTest(unittest.TestCase):
def test_uninstall(self):
# Find and read the test configuration
config_path = os.path.join(self.weewx_root, 'weewx.conf')
config_dict = configobj.ConfigObj(config_path, encoding='utf-8', default_encoding='utf-8')
config_dict = configobj.ConfigObj(config_path, encoding='utf-8')
# Note that the actual location of the "mini-weewx" is over in /var/tmp
config_dict['WEEWX_ROOT'] = self.weewx_root
@@ -653,7 +577,7 @@ class ExtensionInstallTest(unittest.TestCase):
self.assertTrue(not os.path.exists(os.path.join(self.skin_dir, 'pmon', 'skin.conf')))
# Get the modified config dict, which had the extension removed from it
test_dict = configobj.ConfigObj(config_path, encoding='utf-8', default_encoding='utf-8')
test_dict = configobj.ConfigObj(config_path, encoding='utf-8')
# It should be the same as our original:
self.assertEqual(test_dict, config_dict)