mirror of
https://github.com/sabnzbd/sabnzbd.git
synced 2026-01-16 03:21:11 -05:00
Compare commits
28 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
971e4fc909 | ||
|
|
51cc765949 | ||
|
|
19c6a4fffa | ||
|
|
105ac32d2f | ||
|
|
57550675d2 | ||
|
|
e674abc5c0 | ||
|
|
f965c96f51 | ||
|
|
c76b8ed9e0 | ||
|
|
4fbd0d8a7b | ||
|
|
2186c0fff6 | ||
|
|
1adca9a9c1 | ||
|
|
9408353f2b | ||
|
|
84f4d453d2 | ||
|
|
d10209f2a1 | ||
|
|
3ae149c72f | ||
|
|
47385acc3b | ||
|
|
814eeaa900 | ||
|
|
5f2ea13aad | ||
|
|
41ca217931 | ||
|
|
b57d36e8dd | ||
|
|
9a4be70734 | ||
|
|
a8443595a6 | ||
|
|
fd0a70ac58 | ||
|
|
8a8685c968 | ||
|
|
9e6cb8da8e | ||
|
|
054ec54d51 | ||
|
|
272ce773cb | ||
|
|
050b925f7b |
@@ -1,5 +1,5 @@
|
||||
*******************************************
|
||||
*** This is SABnzbd 3.0.0 ***
|
||||
*** This is SABnzbd ***
|
||||
*******************************************
|
||||
|
||||
SABnzbd is an open-source cross-platform binary newsreader.
|
||||
|
||||
4
PKG-INFO
4
PKG-INFO
@@ -1,7 +1,7 @@
|
||||
Metadata-Version: 1.0
|
||||
Name: SABnzbd
|
||||
Version: 3.0.0
|
||||
Summary: SABnzbd-3.0.0
|
||||
Version: 3.0.2
|
||||
Summary: SABnzbd-3.0.2
|
||||
Home-page: https://sabnzbd.org
|
||||
Author: The SABnzbd Team
|
||||
Author-email: team@sabnzbd.org
|
||||
|
||||
21
README.mkd
21
README.mkd
@@ -1,7 +1,24 @@
|
||||
Release Notes - SABnzbd 3.0.0
|
||||
Release Notes - SABnzbd 3.0.2
|
||||
=========================================================
|
||||
|
||||
## About this new version
|
||||
## Bugfixes since 3.0.1
|
||||
- Priority was not parsed correctly if supplied as through the API.
|
||||
- API-call `addfile` could fail if `name` and `nzbfile` were used.
|
||||
- Permissions were still not set correctly when creating directories.
|
||||
- Propagation delay label was shown even if no delay was activated.
|
||||
- Reading RSS feed with no categories set could result in crash.
|
||||
- Jobs with numeric names could crash post-processing.
|
||||
- Jobs with missing articles could result in crash.
|
||||
- macOS: changed the power assertion to `NoIdleSleep`.
|
||||
- Windows: end-of-queue-script did not run on Windows.
|
||||
- Windows: crash if the virus scanner removed the certificate bundle.
|
||||
|
||||
## Bugfixes since 3.0.0
|
||||
- Basic Authentication resulted in crash.
|
||||
- Permissions were not set correctly when creating directories.
|
||||
- Windows: base SSL certificate bundle was not included.
|
||||
|
||||
## About the new major version
|
||||
We have been working for months to upgrade the SABnzbd code from Python 2 to Python 3.
|
||||
Although it might not sound like a big change, we had to rewrite almost every part of
|
||||
the code. We also included a number of new features, listed below.
|
||||
|
||||
28
SABnzbd.py
28
SABnzbd.py
@@ -24,6 +24,7 @@ if sys.hexversion < 0x03050000:
|
||||
|
||||
import logging
|
||||
import logging.handlers
|
||||
import importlib.util
|
||||
import traceback
|
||||
import getopt
|
||||
import signal
|
||||
@@ -36,18 +37,12 @@ import re
|
||||
|
||||
try:
|
||||
import Cheetah
|
||||
|
||||
if Cheetah.Version[0] != "3":
|
||||
raise ValueError
|
||||
import feedparser
|
||||
import configobj
|
||||
import cherrypy
|
||||
import portend
|
||||
import cryptography
|
||||
import chardet
|
||||
except ValueError:
|
||||
print("Sorry, requires Python module Cheetah 3 or higher.")
|
||||
sys.exit(1)
|
||||
except ImportError as e:
|
||||
print("Not all required Python modules are available, please check requirements.txt")
|
||||
print("Missing module:", e.name)
|
||||
@@ -1168,12 +1163,19 @@ def main():
|
||||
# SSL Information
|
||||
logging.info("SSL version = %s", ssl.OPENSSL_VERSION)
|
||||
|
||||
# Load (extra) certificates in the binary distributions
|
||||
if hasattr(sys, "frozen") and (sabnzbd.WIN32 or sabnzbd.DARWIN):
|
||||
# The certifi package brings the latest certificates on build
|
||||
# This will cause the create_default_context to load it automatically
|
||||
os.environ["SSL_CERT_FILE"] = os.path.join(sabnzbd.DIR_PROG, "cacert.pem")
|
||||
logging.info("Loaded additional certificates from %s", os.environ["SSL_CERT_FILE"])
|
||||
# Load (extra) certificates if supplied by certifi
|
||||
# This is optional and provided in the binaries
|
||||
if importlib.util.find_spec("certifi") is not None:
|
||||
import certifi
|
||||
|
||||
try:
|
||||
os.environ["SSL_CERT_FILE"] = certifi.where()
|
||||
logging.info("Certifi version: %s", certifi.__version__)
|
||||
logging.info("Loaded additional certificates from: %s", os.environ["SSL_CERT_FILE"])
|
||||
except:
|
||||
# Sometimes the certificate file is blocked
|
||||
logging.warning(T("Could not load additional certificates from certifi package"))
|
||||
logging.info("Traceback: ", exc_info=True)
|
||||
|
||||
# Extra startup info
|
||||
if sabnzbd.cfg.log_level() > 1:
|
||||
@@ -1701,9 +1703,7 @@ if __name__ == "__main__":
|
||||
# This code is made with trial-and-error, please improve!
|
||||
class startApp(Thread):
|
||||
def run(self):
|
||||
logging.info("[osx] sabApp Starting - starting main thread")
|
||||
main()
|
||||
logging.info("[osx] sabApp Stopping - main thread quit ")
|
||||
AppHelper.stopEventLoop()
|
||||
|
||||
sabApp = startApp()
|
||||
|
||||
@@ -1,8 +1,9 @@
|
||||
sabyenc3
|
||||
cheetah3
|
||||
sabyenc3>=4.0.0
|
||||
cheetah3>=3.0.0
|
||||
cryptography
|
||||
feedparser
|
||||
configobj
|
||||
cheroot<8.4.3
|
||||
cherrypy
|
||||
portend
|
||||
chardet
|
||||
|
||||
@@ -848,10 +848,10 @@ def change_queue_complete_action(action, new=True):
|
||||
|
||||
def run_script(script):
|
||||
""" Run a user script (queue complete only) """
|
||||
command = [os.path.join(cfg.script_dir.get_path(), script)]
|
||||
if os.path.exists(command[0]):
|
||||
script_path = filesystem.make_script_path(script)
|
||||
if script_path:
|
||||
try:
|
||||
stup, need_shell, command, creationflags = sabnzbd.newsunpack.build_command(command)
|
||||
stup, need_shell, command, creationflags = sabnzbd.newsunpack.build_command([script_path])
|
||||
logging.info("Spawning external command %s", command)
|
||||
subprocess.Popen(
|
||||
command,
|
||||
|
||||
@@ -350,7 +350,7 @@ def _api_translate(name, output, kwargs):
|
||||
def _api_addfile(name, output, kwargs):
|
||||
""" API: accepts name, output, pp, script, cat, priority, nzbname """
|
||||
# Normal upload will send the nzb in a kw arg called name or nzbfile
|
||||
if not name:
|
||||
if not name or isinstance(name, str):
|
||||
name = kwargs.get("nzbfile", None)
|
||||
if hasattr(name, "file") and hasattr(name, "filename") and name.filename:
|
||||
cat = kwargs.get("cat")
|
||||
|
||||
@@ -549,20 +549,34 @@ DIR_LOCK = threading.RLock()
|
||||
|
||||
|
||||
@synchronized(DIR_LOCK)
|
||||
def create_all_dirs(path, umask=False):
|
||||
def create_all_dirs(path, apply_umask=False):
|
||||
""" Create all required path elements and set umask on all
|
||||
The umask argument is ignored on Windows
|
||||
Return path if elements could be made or exists
|
||||
"""
|
||||
try:
|
||||
# Use custom mask if desired
|
||||
mask = 0o700
|
||||
if umask and sabnzbd.cfg.umask():
|
||||
mask = int(sabnzbd.cfg.umask(), 8)
|
||||
logging.info("Creating directories: %s", path)
|
||||
if sabnzbd.WIN32:
|
||||
os.makedirs(path, exist_ok=True)
|
||||
else:
|
||||
# We need to build the directory recursively so we can
|
||||
# apply permissions to only the newly created folders
|
||||
# We cannot use os.makedirs() as it could ignore the mode
|
||||
umask = sabnzbd.cfg.umask()
|
||||
if umask:
|
||||
umask = int(umask, 8) | int("0700", 8)
|
||||
|
||||
# Use python functions to create the directory
|
||||
logging.info("Creating directories: %s (mask=%s)", path, mask)
|
||||
os.makedirs(path, mode=mask, exist_ok=True)
|
||||
# Build path from root
|
||||
path_part_combined = "/"
|
||||
for path_part in path.split("/"):
|
||||
if path_part:
|
||||
path_part_combined = os.path.join(path_part_combined, path_part)
|
||||
# Only create if it doesn't exist
|
||||
if not os.path.exists(path_part_combined):
|
||||
os.mkdir(path_part_combined)
|
||||
# Try to set permissions if desired, ignore failures
|
||||
if umask and apply_umask:
|
||||
set_chmod(path_part_combined, umask, report=False)
|
||||
return path
|
||||
except OSError:
|
||||
logging.error(T("Failed making (%s)"), clip_path(path), exc_info=True)
|
||||
@@ -582,7 +596,7 @@ def get_unique_path(dirpath, n=0, create_dir=True):
|
||||
|
||||
if not os.path.exists(path):
|
||||
if create_dir:
|
||||
return create_all_dirs(path, umask=True)
|
||||
return create_all_dirs(path, apply_umask=True)
|
||||
else:
|
||||
return path
|
||||
else:
|
||||
@@ -643,7 +657,7 @@ def move_to_path(path, new_path):
|
||||
# Cannot rename, try copying
|
||||
logging.debug("File could not be renamed, trying copying: %s", path)
|
||||
try:
|
||||
create_all_dirs(os.path.dirname(new_path), umask=True)
|
||||
create_all_dirs(os.path.dirname(new_path), apply_umask=True)
|
||||
shutil.copyfile(path, new_path)
|
||||
os.remove(path)
|
||||
except:
|
||||
|
||||
@@ -252,13 +252,9 @@ def check_login():
|
||||
return check_login_cookie()
|
||||
|
||||
|
||||
def get_users():
|
||||
users = {cfg.username(): cfg.password()}
|
||||
return users
|
||||
|
||||
|
||||
def encrypt_pwd(pwd):
|
||||
return pwd
|
||||
def check_basic_auth(_, username, password):
|
||||
""" CherryPy basic authentication validation """
|
||||
return username == cfg.username() and password == cfg.password()
|
||||
|
||||
|
||||
def set_auth(conf):
|
||||
@@ -268,8 +264,7 @@ def set_auth(conf):
|
||||
{
|
||||
"tools.auth_basic.on": True,
|
||||
"tools.auth_basic.realm": "SABnzbd",
|
||||
"tools.auth_basic.users": get_users,
|
||||
"tools.auth_basic.encrypt": encrypt_pwd,
|
||||
"tools.auth_basic.checkpassword": check_basic_auth,
|
||||
}
|
||||
)
|
||||
conf.update(
|
||||
|
||||
@@ -710,15 +710,15 @@ class NzbQueue:
|
||||
Not locked for performance, since it only reads the queue
|
||||
"""
|
||||
# Pre-calculate propagation delay
|
||||
propagtion_delay = float(cfg.propagation_delay() * 60)
|
||||
propagation_delay = float(cfg.propagation_delay() * 60)
|
||||
for nzo in self.__nzo_list:
|
||||
# Not when queue paused and not a forced item
|
||||
if nzo.status not in (Status.PAUSED, Status.GRABBING) or nzo.priority == TOP_PRIORITY:
|
||||
# Check if past propagation delay, or forced
|
||||
if (
|
||||
not propagtion_delay
|
||||
not propagation_delay
|
||||
or nzo.priority == TOP_PRIORITY
|
||||
or (nzo.avg_stamp + propagtion_delay) < time.time()
|
||||
or (nzo.avg_stamp + propagation_delay) < time.time()
|
||||
):
|
||||
if not nzo.server_in_try_list(server):
|
||||
article = nzo.get_article(server, servers)
|
||||
|
||||
@@ -20,6 +20,7 @@ sabnzbd.nzbstuff - misc
|
||||
"""
|
||||
|
||||
import os
|
||||
import pickle
|
||||
import time
|
||||
import re
|
||||
import logging
|
||||
@@ -481,8 +482,15 @@ class NzbFile(TryList):
|
||||
self.md5 = None
|
||||
|
||||
def __eq__(self, other):
|
||||
""" Assume it's the same file if the bytes and first article are the same """
|
||||
return self.bytes == other.bytes and self.decodetable[0] == other.decodetable[0]
|
||||
""" Assume it's the same file if the numer bytes and first article
|
||||
are the same or if there are no articles left, use the filenames
|
||||
"""
|
||||
if self.bytes == other.bytes:
|
||||
if self.decodetable and other.decodetable:
|
||||
return self.decodetable[0] == other.decodetable[0]
|
||||
# Fallback to filename comparison
|
||||
return self.filename == other.filename
|
||||
return False
|
||||
|
||||
def __hash__(self):
|
||||
""" Required because we implement eq. The same file can be spread
|
||||
@@ -556,6 +564,8 @@ NzbObjectSaver = (
|
||||
"rating_filtered",
|
||||
)
|
||||
|
||||
NzoAttributeSaver = ("cat", "pp", "script", "priority", "final_name", "password", "url")
|
||||
|
||||
# Lock to prevent errors when saving the NZO data
|
||||
NZO_LOCK = threading.RLock()
|
||||
|
||||
@@ -707,7 +717,7 @@ class NzbObject(TryList):
|
||||
self.final_name = self.final_name.replace(".", " ")
|
||||
|
||||
# Check against identical checksum or series/season/episode
|
||||
if (not reuse) and nzb and dup_check and priority != REPAIR_PRIORITY:
|
||||
if (not reuse) and nzb and dup_check and self.priority != REPAIR_PRIORITY:
|
||||
duplicate, series = self.has_duplicates()
|
||||
else:
|
||||
duplicate = series = 0
|
||||
@@ -779,14 +789,10 @@ class NzbObject(TryList):
|
||||
|
||||
# Pickup backed-up attributes when re-using
|
||||
if reuse:
|
||||
cat, pp, script, priority, name, password, self.url = get_attrib_file(self.workpath, 7)
|
||||
if name:
|
||||
self.final_name = name
|
||||
if password:
|
||||
self.password = password
|
||||
cat, pp, script = self.load_attribs()
|
||||
|
||||
# Determine category and find pp/script values
|
||||
self.cat, pp_tmp, self.script, priority = cat_to_opts(cat, pp, script, priority)
|
||||
self.cat, pp_tmp, self.script, priority = cat_to_opts(cat, pp, script, self.priority)
|
||||
self.set_priority(priority)
|
||||
self.repair, self.unpack, self.delete = pp_to_opts(pp_tmp)
|
||||
|
||||
@@ -1339,8 +1345,9 @@ class NzbObject(TryList):
|
||||
labels.append(T("WAIT %s sec") % dif)
|
||||
|
||||
# Propagation delay label
|
||||
if (self.avg_stamp + float(cfg.propagation_delay() * 60)) > time.time() and self.priority != TOP_PRIORITY:
|
||||
wait_time = int((self.avg_stamp + float(cfg.propagation_delay() * 60) - time.time()) / 60 + 0.5)
|
||||
propagation_delay = float(cfg.propagation_delay() * 60)
|
||||
if propagation_delay and self.avg_stamp + propagation_delay > time.time() and self.priority != TOP_PRIORITY:
|
||||
wait_time = int((self.avg_stamp + propagation_delay - time.time()) / 60 + 0.5)
|
||||
labels.append(T("PROPAGATING %s min") % wait_time) # Queue indicator while waiting for propagation of post
|
||||
|
||||
return labels
|
||||
@@ -1892,9 +1899,36 @@ class NzbObject(TryList):
|
||||
sabnzbd.save_data(self, self.nzo_id, self.workpath)
|
||||
|
||||
def save_attribs(self):
|
||||
set_attrib_file(
|
||||
self.workpath, (self.cat, self.pp, self.script, self.priority, self.final_name, self.password, self.url)
|
||||
)
|
||||
""" Save specific attributes for Retry """
|
||||
attribs = {}
|
||||
for attrib in NzoAttributeSaver:
|
||||
attribs[attrib] = getattr(self, attrib)
|
||||
logging.debug("Saving attributes %s for %s", attribs, self.final_name)
|
||||
sabnzbd.save_data(attribs, ATTRIB_FILE, self.workpath)
|
||||
|
||||
def load_attribs(self):
|
||||
""" Load saved attributes and return them to be parsed """
|
||||
attribs = sabnzbd.load_data(ATTRIB_FILE, self.workpath, remove=False)
|
||||
logging.debug("Loaded attributes %s for %s", attribs, self.final_name)
|
||||
|
||||
# TODO: Remove fallback to old method in SABnzbd 3.2.0
|
||||
if not attribs:
|
||||
cat, pp, script, self.priority, name, password, self.url = get_attrib_file(self.workpath, 7)
|
||||
if name:
|
||||
# Could be converted to integer due to the logic in get_attrib_file
|
||||
self.final_name = str(name)
|
||||
if password:
|
||||
self.password = password
|
||||
return cat, pp, script
|
||||
|
||||
# Only a subset we want to apply directly to the NZO
|
||||
for attrib in ("final_name", "priority", "password", "url"):
|
||||
# Only set if it is present and has a value
|
||||
if attribs.get(attrib):
|
||||
setattr(self, attrib, attribs[attrib])
|
||||
|
||||
# Rest is to be used directly in the NZO-init flow
|
||||
return attribs["cat"], attribs["pp"], attribs["script"]
|
||||
|
||||
@synchronized(NZO_LOCK)
|
||||
def build_pos_nzf_table(self, nzf_ids):
|
||||
@@ -2150,15 +2184,6 @@ def get_attrib_file(path, size):
|
||||
return [None for _ in range(size)]
|
||||
|
||||
|
||||
def set_attrib_file(path, attribs):
|
||||
""" Write job's attributes to file """
|
||||
logging.debug("Writing attributes %s to %s", attribs, path)
|
||||
path = os.path.join(path, ATTRIB_FILE)
|
||||
with open(path, "w", encoding="utf-8") as attr_file:
|
||||
for item in attribs:
|
||||
attr_file.write("%s\n" % item)
|
||||
|
||||
|
||||
def name_extractor(subject):
|
||||
""" Try to extract a file name from a subject line, return `subject` if in doubt """
|
||||
result = subject
|
||||
|
||||
@@ -688,7 +688,7 @@ def prepare_extraction_path(nzo):
|
||||
complete_dir = sanitize_and_trim_path(complete_dir)
|
||||
|
||||
if one_folder:
|
||||
workdir_complete = create_all_dirs(complete_dir, umask=True)
|
||||
workdir_complete = create_all_dirs(complete_dir, apply_umask=True)
|
||||
else:
|
||||
workdir_complete = get_unique_path(os.path.join(complete_dir, nzo.final_name), create_dir=True)
|
||||
marker_file = set_marker(workdir_complete)
|
||||
|
||||
@@ -791,7 +791,7 @@ def _get_link(entry):
|
||||
except AttributeError:
|
||||
try: # nzb.su
|
||||
category = entry.tags[0]["term"]
|
||||
except (AttributeError, KeyError):
|
||||
except (AttributeError, IndexError, KeyError):
|
||||
try:
|
||||
category = entry.description
|
||||
except AttributeError:
|
||||
|
||||
@@ -45,12 +45,16 @@ def keep_awake(reason):
|
||||
Multiple calls allowed.
|
||||
"""
|
||||
global assertion_id
|
||||
kIOPMAssertionTypeNoIdleSleep = "PreventUserIdleSystemSleep"
|
||||
kIOPMAssertionLevelOn = 255
|
||||
errcode, assertion_id = IOPMAssertionCreateWithName(
|
||||
kIOPMAssertionTypeNoIdleSleep, kIOPMAssertionLevelOn, reason, None
|
||||
)
|
||||
return errcode == 0
|
||||
|
||||
# Each assertion needs to be released, so make sure to only set it once
|
||||
if not assertion_id:
|
||||
kIOPMAssertionTypeNoIdleSleep = "NoIdleSleepAssertion"
|
||||
kIOPMAssertionLevelOn = 255
|
||||
errcode, assertion_id = IOPMAssertionCreateWithName(
|
||||
kIOPMAssertionTypeNoIdleSleep, kIOPMAssertionLevelOn, reason, None
|
||||
)
|
||||
return errcode == 0
|
||||
return True
|
||||
|
||||
|
||||
def allow_sleep():
|
||||
|
||||
@@ -4,5 +4,5 @@
|
||||
|
||||
# You MUST use double quotes (so " and not ')
|
||||
|
||||
__version__ = "3.0.0"
|
||||
__baseline__ = "cc465c75543dfa26e3065c7dbf98354d53ad1112"
|
||||
__version__ = "3.0.2"
|
||||
__baseline__ = "51cc7659493b8f98b5da7537831d1a56f2777775"
|
||||
|
||||
@@ -194,7 +194,7 @@ class TestSameFile:
|
||||
assert 0 == filesystem.same_file("/test/../home", "/test")
|
||||
assert 0 == filesystem.same_file("/test/./test", "/test")
|
||||
|
||||
@pytest.mark.skipif(sys.platform.startswith("win"), reason="Not for Windows")
|
||||
@pytest.mark.skipif(sys.platform.startswith("win"), reason="Non-Windows tests")
|
||||
@set_platform("linux")
|
||||
def test_posix_fun(self):
|
||||
assert 1 == filesystem.same_file("/test", "/test")
|
||||
@@ -302,7 +302,7 @@ class TestClipLongPath:
|
||||
assert filesystem.long_path("/test/dir") == "/test/dir"
|
||||
|
||||
|
||||
@pytest.mark.skipif(sys.platform.startswith("win"), reason="Broken on Windows")
|
||||
@pytest.mark.skipif(sys.platform.startswith("win"), reason="Non-Windows tests")
|
||||
class TestCheckMountLinux(ffs.TestCase):
|
||||
# Our collection of fake directories
|
||||
test_dirs = ["/media/test/dir", "/mnt/TEST/DIR"]
|
||||
@@ -350,7 +350,7 @@ class TestCheckMountLinux(ffs.TestCase):
|
||||
assert filesystem.check_mount("/") is True
|
||||
|
||||
|
||||
@pytest.mark.skipif(sys.platform.startswith("win"), reason="Broken on Windows")
|
||||
@pytest.mark.skipif(sys.platform.startswith("win"), reason="Non-Windows tests")
|
||||
class TestCheckMountDarwin(ffs.TestCase):
|
||||
# Our faked macos directory
|
||||
test_dir = "/Volumes/test/dir"
|
||||
@@ -458,7 +458,7 @@ class TestTrimWinPath:
|
||||
assert filesystem.trim_win_path(test_path + "\\" + ("D" * 20)) == test_path + "\\" + "D" * 3
|
||||
|
||||
|
||||
@pytest.mark.skipif(sys.platform.startswith("win"), reason="Broken on Windows")
|
||||
@pytest.mark.skipif(sys.platform.startswith("win"), reason="Non-Windows tests")
|
||||
class TestListdirFull(ffs.TestCase):
|
||||
# Basic fake filesystem setup stanza
|
||||
def setUp(self):
|
||||
@@ -539,7 +539,6 @@ class TestListdirFull(ffs.TestCase):
|
||||
|
||||
class TestListdirFullWin(ffs.TestCase):
|
||||
# Basic fake filesystem setup stanza
|
||||
@set_platform("win32")
|
||||
def setUp(self):
|
||||
self.setUpPyfakefs()
|
||||
self.fs.is_windows_fs = True
|
||||
@@ -617,7 +616,7 @@ class TestListdirFullWin(ffs.TestCase):
|
||||
assert filesystem.listdir_full(test_file) == []
|
||||
|
||||
|
||||
@pytest.mark.skipif(sys.platform.startswith("win"), reason="Broken on Windows")
|
||||
@pytest.mark.skipif(sys.platform.startswith("win"), reason="Non-Windows tests")
|
||||
class TestGetUniquePathFilename(ffs.TestCase):
|
||||
# Basic fake filesystem setup stanza
|
||||
def setUp(self):
|
||||
@@ -675,9 +674,9 @@ class TestGetUniquePathFilename(ffs.TestCase):
|
||||
assert filesystem.get_unique_filename(test_file) == "/some/filename.1"
|
||||
|
||||
|
||||
@pytest.mark.skipif(not sys.platform.startswith("win"), reason="Windows specific tests")
|
||||
class TestGetUniquePathFilenameWin(ffs.TestCase):
|
||||
# Basic fake filesystem setup stanza
|
||||
@set_platform("win32")
|
||||
def setUp(self):
|
||||
self.setUpPyfakefs()
|
||||
self.fs.is_windows_fs = True
|
||||
@@ -730,6 +729,94 @@ class TestGetUniquePathFilenameWin(ffs.TestCase):
|
||||
assert filesystem.get_unique_filename(test_file).lower() == r"c:\some\filename.1"
|
||||
|
||||
|
||||
class TestCreateAllDirsWin(ffs.TestCase):
|
||||
# Basic fake filesystem setup stanza
|
||||
def setUp(self):
|
||||
self.setUpPyfakefs()
|
||||
self.fs.is_windows_fs = True
|
||||
self.fs.path_separator = "\\"
|
||||
self.fs.is_case_sensitive = False
|
||||
|
||||
@set_platform("win32")
|
||||
def test_create_all_dirs(self):
|
||||
self.dir = self.fs.create_dir(r"C:\Downloads")
|
||||
# Also test for no crash when folder already exists
|
||||
for folder in (r"C:\Downloads", r"C:\Downloads\Show\Test", r"C:\Downloads\Show\Test2", r"C:\Downloads\Show"):
|
||||
assert filesystem.create_all_dirs(folder) == folder
|
||||
assert os.path.exists(folder)
|
||||
|
||||
|
||||
class PermissionCheckerHelper:
|
||||
@staticmethod
|
||||
def assert_dir_perms(path, expected_perms):
|
||||
assert stat.filemode(os.stat(path).st_mode) == "d" + stat.filemode(expected_perms)[1:]
|
||||
|
||||
|
||||
@pytest.mark.skipif(sys.platform.startswith("win"), reason="Non-Windows tests")
|
||||
class TestCreateAllDirs(ffs.TestCase, PermissionCheckerHelper):
|
||||
def setUp(self):
|
||||
self.setUpPyfakefs()
|
||||
self.fs.path_separator = "/"
|
||||
self.fs.is_case_sensitive = True
|
||||
|
||||
def test_basic_folder_creation(self):
|
||||
self.fs.create_dir("/test_base")
|
||||
# Also test for no crash when folder already exists
|
||||
for folder in ("/test_base", "/test_base/show/season 1/episode 1", "/test_base/show"):
|
||||
assert filesystem.create_all_dirs(folder) == folder
|
||||
assert os.path.exists(folder)
|
||||
|
||||
@set_config({"umask": "0777"})
|
||||
def test_permissions_777(self):
|
||||
self._permissions_runner("/test_base777")
|
||||
self._permissions_runner("/test_base777_nomask", apply_umask=False)
|
||||
|
||||
@set_config({"umask": "0770"})
|
||||
def test_permissions_770(self):
|
||||
self._permissions_runner("/test_base770")
|
||||
self._permissions_runner("/test_base770_nomask", apply_umask=False)
|
||||
|
||||
@set_config({"umask": "0600"})
|
||||
def test_permissions_600(self):
|
||||
self._permissions_runner("/test_base600")
|
||||
self._permissions_runner("/test_base600_nomask", apply_umask=False)
|
||||
|
||||
@set_config({"umask": "0700"})
|
||||
def test_permissions_450(self):
|
||||
with pytest.raises(OSError):
|
||||
self._permissions_runner("/test_base450", perms_base="0450")
|
||||
|
||||
@set_config({"umask": ""})
|
||||
def test_no_umask(self):
|
||||
self._permissions_runner("/test_base_perm700", perms_base="0700")
|
||||
self._permissions_runner("/test_base_perm750", perms_base="0750")
|
||||
self._permissions_runner("/test_base_perm777", perms_base="0777")
|
||||
self._permissions_runner("/test_base_perm600", perms_base="0600")
|
||||
|
||||
def _permissions_runner(self, test_base, perms_base="0700", apply_umask=True):
|
||||
# Create base directory and set the base permissions
|
||||
perms_base_int = int(perms_base, 8)
|
||||
self.fs.create_dir(test_base, perms_base_int)
|
||||
assert os.path.exists(test_base) is True
|
||||
self.assert_dir_perms(test_base, perms_base_int)
|
||||
|
||||
# Create directories with permissions
|
||||
new_dir = os.path.join(test_base, "se 1", "ep1")
|
||||
filesystem.create_all_dirs(new_dir, apply_umask=apply_umask)
|
||||
|
||||
# If permissions needed to be set, verify the new folder has the
|
||||
# right permissions and verify the base didn't change
|
||||
if apply_umask and cfg.umask():
|
||||
perms_test_int = int(cfg.umask(), 8) | int("0700", 8)
|
||||
else:
|
||||
# Get the current umask, since os.mkdir masks that out
|
||||
cur_umask = os.umask(0)
|
||||
os.umask(cur_umask)
|
||||
perms_test_int = int("0777", 8) & ~cur_umask
|
||||
self.assert_dir_perms(new_dir, perms_test_int)
|
||||
self.assert_dir_perms(test_base, perms_base_int)
|
||||
|
||||
|
||||
class TestSetPermissionsWin(ffs.TestCase):
|
||||
@set_platform("win32")
|
||||
def test_win32(self):
|
||||
@@ -737,8 +824,8 @@ class TestSetPermissionsWin(ffs.TestCase):
|
||||
assert filesystem.set_permissions(r"F:\who\cares", recursive=False) is None
|
||||
|
||||
|
||||
@pytest.mark.skipif(sys.platform.startswith("win"), reason="Broken on Windows")
|
||||
class TestSetPermissions(ffs.TestCase):
|
||||
@pytest.mark.skipif(sys.platform.startswith("win"), reason="Non-Windows tests")
|
||||
class TestSetPermissions(ffs.TestCase, PermissionCheckerHelper):
|
||||
# Basic fake filesystem setup stanza
|
||||
def setUp(self):
|
||||
self.setUpPyfakefs()
|
||||
@@ -771,7 +858,7 @@ class TestSetPermissions(ffs.TestCase):
|
||||
ffs.set_uid(0)
|
||||
self.fs.create_dir(test_dir, perms_test)
|
||||
assert os.path.exists(test_dir) is True
|
||||
assert stat.filemode(os.stat(test_dir).st_mode) == "d" + stat.filemode(perms_test)[1:]
|
||||
self.assert_dir_perms(test_dir, perms_test)
|
||||
|
||||
# Setup and verify fake files
|
||||
for file in (
|
||||
@@ -800,7 +887,7 @@ class TestSetPermissions(ffs.TestCase):
|
||||
for root, dirs, files in os.walk(test_dir):
|
||||
for dir in [os.path.join(root, d) for d in dirs]:
|
||||
# Permissions on directories should now match perms_after
|
||||
assert stat.filemode(os.stat(dir).st_mode) == "d" + stat.filemode(perms_after)[1:]
|
||||
self.assert_dir_perms(dir, perms_after)
|
||||
for file in [os.path.join(root, f) for f in files]:
|
||||
# Files also shouldn't have any executable or special bits set
|
||||
assert (
|
||||
|
||||
@@ -59,3 +59,17 @@ class TestSleepless:
|
||||
sleepless.allow_sleep()
|
||||
assert not self.check_msg_in_assertions()
|
||||
assert sleepless.assertion_id is None
|
||||
|
||||
def test_sleepless_multi_call(self):
|
||||
# If we set it twice, is it still cleared with one call
|
||||
assert not self.check_msg_in_assertions()
|
||||
assert sleepless.assertion_id is None
|
||||
|
||||
sleepless.keep_awake(self.sleep_msg)
|
||||
time.sleep(2)
|
||||
sleepless.keep_awake(self.sleep_msg)
|
||||
assert self.check_msg_in_assertions()
|
||||
|
||||
sleepless.allow_sleep()
|
||||
assert not self.check_msg_in_assertions()
|
||||
assert sleepless.assertion_id is None
|
||||
|
||||
@@ -281,26 +281,25 @@ if os.path.exists(tl):
|
||||
TOOL = [tl]
|
||||
|
||||
result = True
|
||||
if len(sys.argv) > 1 and sys.argv[1] == "all":
|
||||
if len(sys.argv) > 1 and sys.argv[1] == "nsis":
|
||||
print("NSIS MO file")
|
||||
result = result and process_po_folder(DOMAIN_N, PON_DIR)
|
||||
|
||||
print("Patch NSIS script")
|
||||
patch_nsis()
|
||||
else:
|
||||
print("Email MO files")
|
||||
result = result and process_po_folder(DOMAIN_E, POE_DIR)
|
||||
|
||||
print("Email MO files")
|
||||
result = result and process_po_folder(DOMAIN_E, POE_DIR)
|
||||
print("Create email templates from MO files")
|
||||
make_templates()
|
||||
|
||||
print("Create email templates from MO files")
|
||||
make_templates()
|
||||
print("Main program MO files")
|
||||
# -n option added to remove all newlines from the translations
|
||||
result = result and process_po_folder(DOMAIN, PO_DIR, "-n")
|
||||
|
||||
|
||||
print("Main program MO files")
|
||||
# -n option added to remove all newlines from the translations
|
||||
result = result and process_po_folder(DOMAIN, PO_DIR, "-n")
|
||||
|
||||
print("Remove temporary templates")
|
||||
remove_mo_files()
|
||||
print("Remove temporary templates")
|
||||
remove_mo_files()
|
||||
|
||||
print()
|
||||
if result:
|
||||
|
||||
Reference in New Issue
Block a user