diff --git a/bin/weecfg/__init__.py b/bin/weecfg/__init__.py index 33fa20b9..d9c92ef4 100644 --- a/bin/weecfg/__init__.py +++ b/bin/weecfg/__init__.py @@ -186,14 +186,13 @@ def save(config_dict, config_path, backup=False): backup_path = weeutil.weeutil.move_with_timestamp(config_path) # Now we can save the file. Get a temporary file: - tmpfile = tempfile.NamedTemporaryFile("w") + with tempfile.NamedTemporaryFile() as tmpfile: + # Write the configuration dictionary to it: + config_dict.write(tmpfile) + tmpfile.flush() - # Write the configuration dictionary to it: - config_dict.write(tmpfile) - tmpfile.flush() - - # Now move the temporary file into the proper place: - shutil.copyfile(tmpfile.name, config_path) + # Now move the temporary file into the proper place: + shutil.copyfile(tmpfile.name, config_path) else: @@ -1552,10 +1551,12 @@ def extract_zip(filename, target_dir, logger=None): logger = logger or Logger() import zipfile logger.log("Extracting from zip archive %s" % filename, level=1) - zip_archive = None + + zip_archive = zipfile.ZipFile(filename) + try: - zip_archive = zipfile.ZipFile(open(filename, mode='r')) member_names = zip_archive.namelist() + # manually extract files since extractall is only in python 2.6+ # zip_archive.extractall(target_dir) for f in member_names: @@ -1569,8 +1570,7 @@ def extract_zip(filename, target_dir, logger=None): dest_file.write(zip_archive.read(f)) return member_names finally: - if zip_archive is not None: - zip_archive.close() + zip_archive.close() def mkdir_p(path): diff --git a/bin/weecfg/test/test_config.py b/bin/weecfg/test/test_config.py index 84f2f28e..7c2d73cd 100644 --- a/bin/weecfg/test/test_config.py +++ b/bin/weecfg/test/test_config.py @@ -1,5 +1,5 @@ # -# Copyright (c) 2009-2015 Tom Keffer +# Copyright (c) 2009-2019 Tom Keffer # # See the file LICENSE.txt for your full rights. # @@ -15,14 +15,16 @@ import tempfile import unittest import configobj - import six +from six.moves import StringIO + import weecfg.extension import weeutil.weeutil try: from mock import patch from six.moves import builtins + if six.PY2: __input_str__ = '__builtin__.raw_input' else: @@ -33,7 +35,6 @@ except ImportError: print("Module 'mock' not installed. Testing will be restricted.") have_mock = False - TMPDIR = '/var/tmp/weewx_test' # Redirect the import of setup: @@ -76,6 +77,7 @@ y_str = """ current_config_dict_path = "../../../weewx.conf" + class LineTest(unittest.TestCase): def _check_against_expected(self, config_dict, expected): @@ -85,22 +87,24 @@ class LineTest(unittest.TestCase): expected: The name of a file holding the expected version """ - # Write the ConfigObj out to a io.BytesIO, then start checking it against the expected + # Writing a ConfigObj to a file-like object always writes in bytes, + # so we cannot write to a StringIO (which accepts only Unicode under Python 3). + # Use a BytesIO object instead, which accepts byte strings. with io.BytesIO() as out_str: config_dict.write(out_str) check_fileend(out_str) out_str.seek(0) - fd_expected = open(expected) - N = 0 - for expected in fd_expected: - actual = out_str.readline() - N += 1 - self.assertEqual(actual.strip(), expected.strip(), "[%d] '%s' vs '%s'" % (N, actual, expected)) + with open(expected, 'r') as fd_expected: + N = 0 + for expected in fd_expected: + actual = out_str.readline() + N += 1 + self.assertEqual(actual.strip(), expected.strip(), "[%d] '%s' vs '%s'" % (N, actual, expected)) - # Make sure there are no extra lines in the updated config: - more = out_str.readline() - self.assertEqual(more, '') + # Make sure there are no extra lines in the updated config: + more = out_str.readline() + self.assertEqual(more, '') class ConfigTest(LineTest): @@ -134,32 +138,32 @@ class ConfigTest(LineTest): def test_utilities(self): global x_str, y_str - xio = io.BytesIO(x_str) - x_dict = configobj.ConfigObj(xio, encoding='utf-8', default_encoding='utf-8') - weecfg.reorder_sections(x_dict, 'section_c', 'section_b') - self.assertEqual("{'section_a': {'a': '1'}, 'section_c': {'c': '3'}, " - "'section_b': {'b': '2'}, 'section_d': {'d': '4'}}", str(x_dict)) + with StringIO(x_str) as xio: + x_dict = configobj.ConfigObj(xio, encoding='utf-8') + weecfg.reorder_sections(x_dict, 'section_c', 'section_b') + self.assertEqual(str(x_dict), "{'section_a': {'a': '1'}, 'section_c': {'c': '3'}, " + "'section_b': {'b': '2'}, 'section_d': {'d': '4'}}") - xio.seek(0) - x_dict = configobj.ConfigObj(xio, encoding='utf-8', default_encoding='utf-8') - weecfg.reorder_sections(x_dict, 'section_c', 'section_b', after=True) - self.assertEqual("{'section_a': {'a': '1'}, 'section_b': {'b': '2'}, " - "'section_c': {'c': '3'}, 'section_d': {'d': '4'}}", str(x_dict)) + xio.seek(0) + x_dict = configobj.ConfigObj(xio, encoding='utf-8') + weecfg.reorder_sections(x_dict, 'section_c', 'section_b', after=True) + self.assertEqual(str(x_dict), "{'section_a': {'a': '1'}, 'section_b': {'b': '2'}, " + "'section_c': {'c': '3'}, 'section_d': {'d': '4'}}") - xio = io.BytesIO(x_str) - yio = io.BytesIO(y_str) - x_dict = configobj.ConfigObj(xio, encoding='utf-8', default_encoding='utf-8') - y_dict = configobj.ConfigObj(yio, encoding='utf-8', default_encoding='utf-8') - weeutil.weeutil.conditional_merge(x_dict, y_dict) - self.assertEqual("{'section_a': {'a': '1'}, 'section_b': {'b': '2'}, 'section_c': {'c': '3'}, " - "'section_d': {'d': '4'}, 'section_e': {'c': '15'}}", str(x_dict)) + with StringIO(x_str) as xio: + with StringIO(y_str) as yio: + x_dict = configobj.ConfigObj(xio, encoding='utf-8') + y_dict = configobj.ConfigObj(yio, encoding='utf-8') + weeutil.weeutil.conditional_merge(x_dict, y_dict) + self.assertEqual(str(x_dict), "{'section_a': {'a': '1'}, 'section_b': {'b': '2'}, 'section_c': {'c': '3'}, " + "'section_d': {'d': '4'}, 'section_e': {'c': '15'}}") - xio = io.BytesIO(x_str) - yio = io.BytesIO(y_str) - x_dict = configobj.ConfigObj(xio, encoding='utf-8', default_encoding='utf-8') - y_dict = configobj.ConfigObj(yio, encoding='utf-8', default_encoding='utf-8') - weecfg.remove_and_prune(x_dict, y_dict) - self.assertEqual("{'section_c': {'c': '3'}, 'section_d': {'d': '4'}}", str(x_dict)) + with StringIO(x_str) as xio: + with StringIO(y_str) as yio: + x_dict = configobj.ConfigObj(xio, encoding='utf-8') + y_dict = configobj.ConfigObj(yio, encoding='utf-8') + weecfg.remove_and_prune(x_dict, y_dict) + self.assertEqual(str(x_dict), "{'section_c': {'c': '3'}, 'section_d': {'d': '4'}}") test_list = ['a', 'b', 'd', 'c'] weecfg.reorder_scalars(test_list, 'c', 'd') @@ -173,50 +177,6 @@ class ConfigTest(LineTest): weecfg.reorder_scalars(test_list, 'x', 'd') self.assertEqual(test_list, ['a', 'b', 'd']) - # report_start_str = """[StdReport] - # SKIN_ROOT = skins - # HTML_ROOT = public_html - # data_binding = wx_binding - # [[XtraReport]] - # skin = Foo - # - # [[StandardReport]] - # skin = Standard - # - # [[FTP]] - # skin = Ftp - # - # [[RSYNC]] - # skin = Rsync - # """ - # - # report_expected_str = """[StdReport] - # SKIN_ROOT = skins - # HTML_ROOT = public_html - # data_binding = wx_binding - # - # [[StandardReport]] - # skin = Standard - # [[XtraReport]] - # skin = Foo - # - # [[FTP]] - # skin = Ftp - # - # [[RSYNC]] - # skin = Rsync - # """ - # - # def test_reorder(self): - # """Test the utility reorder_to_ref""" - # xio = io.BytesIO(ConfigTest.report_start_str) - # x_dict = configobj.ConfigObj(xio) - # weecfg.reorder_to_ref(x_dict) - # x_result = io.BytesIO() - # x_dict.write(x_result) - # check_fileend(x_result) - # self.assertEqual(ConfigTest.report_expected_str, x_result.getvalue()) - if have_mock: def test_prompt_for_info(self): @@ -323,7 +283,7 @@ class ConfigTest(LineTest): def test_upgrade_v25(self): # Start with the Version 2.0 weewx.conf file: - config_dict = configobj.ConfigObj('weewx20.conf', encoding='utf-8', default_encoding='utf-8') + config_dict = configobj.ConfigObj('weewx20.conf', encoding='utf-8') # Upgrade the V2.0 configuration dictionary to V2.5: weecfg.update_to_v25(config_dict) @@ -333,7 +293,7 @@ class ConfigTest(LineTest): def test_upgrade_v26(self): # Start with the Version 2.5 weewx.conf file: - config_dict = configobj.ConfigObj('weewx25.conf', encoding='utf-8', default_encoding='utf-8') + config_dict = configobj.ConfigObj('weewx25.conf', encoding='utf-8') # Upgrade the V2.5 configuration dictionary to V2.6: weecfg.update_to_v26(config_dict) @@ -343,7 +303,7 @@ class ConfigTest(LineTest): def test_upgrade_v30(self): # Start with the Version 2.7 weewx.conf file: - config_dict = configobj.ConfigObj('weewx27.conf', encoding='utf-8', default_encoding='utf-8') + config_dict = configobj.ConfigObj('weewx27.conf', encoding='utf-8') # Upgrade the V2.7 configuration dictionary to V3.0: weecfg.update_to_v30(config_dict) @@ -353,7 +313,7 @@ class ConfigTest(LineTest): def test_upgrade_v32(self): # Start with the Version 3.0 weewx.conf file: - config_dict = configobj.ConfigObj('weewx30.conf', encoding='utf-8', default_encoding='utf-8') + config_dict = configobj.ConfigObj('weewx30.conf', encoding='utf-8') # Upgrade the V3.0 configuration dictionary to V3.2: weecfg.update_to_v32(config_dict) @@ -363,7 +323,7 @@ class ConfigTest(LineTest): def test_upgrade_v36(self): # Start with the Version 3.2 weewx.conf file: - config_dict = configobj.ConfigObj('weewx32.conf', encoding='utf-8', default_encoding='utf-8') + config_dict = configobj.ConfigObj('weewx32.conf', encoding='utf-8') # Upgrade the V3.2 configuration dictionary to V3.6: weecfg.update_to_v36(config_dict) @@ -373,7 +333,7 @@ class ConfigTest(LineTest): def test_upgrade_v39(self): # Start with the Version 3.8 weewx.conf file: - config_dict = configobj.ConfigObj('weewx38.conf', encoding='utf-8', default_encoding='utf-8') + config_dict = configobj.ConfigObj('weewx38.conf', encoding='utf-8') # Upgrade the V3.8 configuration dictionary to V3.9: weecfg.update_to_v39(config_dict) @@ -386,10 +346,10 @@ class ConfigTest(LineTest): def test_merge(self): # Start with a typical V2.0 user file: - config_dict = configobj.ConfigObj('weewx20_user.conf', encoding='utf-8', default_encoding='utf-8') + config_dict = configobj.ConfigObj('weewx20_user.conf', encoding='utf-8') # The current config file becomes the template: - template = configobj.ConfigObj(current_config_dict_path, encoding='utf-8', default_encoding='utf-8') + template = configobj.ConfigObj(current_config_dict_path, encoding='utf-8') # First update, then merge: weecfg.update_and_merge(config_dict, template) @@ -414,7 +374,7 @@ class ConfigTest(LineTest): def test_modify_config(self): # Use the current weewx.conf - config_dict = configobj.ConfigObj(current_config_dict_path, encoding='utf-8', default_encoding='utf-8') + config_dict = configobj.ConfigObj(current_config_dict_path, encoding='utf-8') stn_info = weecfg.get_station_info(config_dict) @@ -433,47 +393,11 @@ class ConfigTest(LineTest): # Make sure the driver stanza got injected correctly import weewx.drivers.vantage vcf = weewx.drivers.vantage.VantageConfEditor() - default_config = configobj.ConfigObj(io.BytesIO(vcf.default_stanza), encoding='utf-8', default_encoding='utf-8') + default_config = configobj.ConfigObj(StringIO(vcf.default_stanza), encoding='utf-8') self.assertEqual(config_dict['Vantage'], default_config['Vantage']) -# class SkinPatchTest(LineTest): -# -# def setUp(self): -# self.skin_dir = os.path.join(TMPDIR, 'skins38') -# shutil.rmtree(self.skin_dir, ignore_errors=True) -# shutil.copytree('skins38', self.skin_dir) -# -# def tearDown(self): -# pass -# #shutil.rmtree(os.path.join(TMPDIR, 'skin38'), ignore_errors=True) -# -# def test_patch_skin(self): -# -# config_dict = configobj.ConfigObj('weewx38_user.conf') -# config_dict['WEEWX_ROOT'] = TMPDIR -# # Upgrade the V3.8 configuration dictionary to V3.9: -# weecfg.update_to_v39(config_dict) -# weecfg.patch_skins(config_dict) -# -# # Find the patched skin.conf ... -# skin_file = os.path.join( -# config_dict['WEEWX_ROOT'], -# config_dict['StdReport']['SKIN_ROOT'], -# config_dict['StdReport']['StandardReport'].get('skin', ''), -# 'skin.conf') -# # ... retrieve it ... -# skin_dict = configobj.ConfigObj(skin_file) -# -# with open('expected/skin39.conf', 'w') as fd: -# skin_dict.write(fd) -# -# # ... and check it against the expected -# self._check_against_expected(skin_dict, 'expected/skin39.conf') - - - class ExtensionUtilityTest(unittest.TestCase): """Tests of utility functions used by the extension installer.""" @@ -582,7 +506,7 @@ class ExtensionInstallTest(unittest.TestCase): def test_install(self): # Find and read the test configuration config_path = os.path.join(self.weewx_root, 'weewx.conf') - config_dict = configobj.ConfigObj(config_path, encoding='utf-8', default_encoding='utf-8') + config_dict = configobj.ConfigObj(config_path, encoding='utf-8') # Note that the actual location of the "mini-weewx" is over in /var/tmp config_dict['WEEWX_ROOT'] = self.weewx_root @@ -611,7 +535,7 @@ class ExtensionInstallTest(unittest.TestCase): self.assertTrue(os.path.isfile(os.path.join(self.skin_dir, 'pmon', 'skin.conf'))) # Get, then check the new config dict: - test_dict = configobj.ConfigObj(config_path, encoding='utf-8', default_encoding='utf-8') + test_dict = configobj.ConfigObj(config_path, encoding='utf-8') self.assertEqual(test_dict['StdReport']['pmon'], {'HTML_ROOT': 'public_html/pmon', 'skin': 'pmon'}) self.assertEqual(test_dict['Databases']['pmon_sqlite'], @@ -631,7 +555,7 @@ class ExtensionInstallTest(unittest.TestCase): def test_uninstall(self): # Find and read the test configuration config_path = os.path.join(self.weewx_root, 'weewx.conf') - config_dict = configobj.ConfigObj(config_path, encoding='utf-8', default_encoding='utf-8') + config_dict = configobj.ConfigObj(config_path, encoding='utf-8') # Note that the actual location of the "mini-weewx" is over in /var/tmp config_dict['WEEWX_ROOT'] = self.weewx_root @@ -653,7 +577,7 @@ class ExtensionInstallTest(unittest.TestCase): self.assertTrue(not os.path.exists(os.path.join(self.skin_dir, 'pmon', 'skin.conf'))) # Get the modified config dict, which had the extension removed from it - test_dict = configobj.ConfigObj(config_path, encoding='utf-8', default_encoding='utf-8') + test_dict = configobj.ConfigObj(config_path, encoding='utf-8') # It should be the same as our original: self.assertEqual(test_dict, config_dict)