Split nzbstuff into separate files for Article, NzbFile and NzbObject (#3221)

This commit is contained in:
Safihre
2025-12-09 21:21:51 +01:00
committed by GitHub
parent b420975267
commit a91e718ef5
29 changed files with 989 additions and 830 deletions

View File

@@ -82,15 +82,15 @@ from sabnzbd.version import __version__, __baseline__
import sabnzbd.misc as misc
import sabnzbd.filesystem as filesystem
import sabnzbd.powersup as powersup
import sabnzbd.rss as rss
import sabnzbd.emailer as emailer
import sabnzbd.encoding as encoding
import sabnzbd.config as config
import sabnzbd.cfg as cfg
import sabnzbd.database
import sabnzbd.lang as lang
import sabnzbd.nzb
import sabnzbd.nzbparser as nzbparser
import sabnzbd.nzbstuff
import sabnzbd.rss as rss
import sabnzbd.emailer as emailer
import sabnzbd.getipaddress
import sabnzbd.newsunpack
import sabnzbd.par2file

View File

@@ -85,7 +85,7 @@ from sabnzbd.encoding import xml_name, utob
from sabnzbd.getipaddress import local_ipv4, public_ipv4, public_ipv6, dnslookup, active_socks5_proxy
from sabnzbd.database import HistoryDB
from sabnzbd.lang import is_rtl
from sabnzbd.nzbstuff import NzbObject
from sabnzbd.nzb import TryList, NzbObject
from sabnzbd.newswrapper import NewsWrapper, NNTPPermanentError
import sabnzbd.emailer
import sabnzbd.sorting
@@ -1006,7 +1006,7 @@ def _api_gc_stats(name: str, kwargs: dict[str, Union[str, list[str]]]) -> bytes:
# Collect before we check
gc.collect()
# We cannot create any lists/dicts, as they would create a reference
return report(data=[str(obj) for obj in gc.get_objects() if isinstance(obj, sabnzbd.nzbstuff.TryList)])
return report(data=[str(obj) for obj in gc.get_objects() if isinstance(obj, TryList)])
##############################################################################

View File

@@ -27,7 +27,7 @@ from typing import Collection
import sabnzbd
from sabnzbd.decorators import synchronized
from sabnzbd.constants import GIGI, ANFO, ASSEMBLER_WRITE_THRESHOLD
from sabnzbd.nzbstuff import Article
from sabnzbd.nzb import Article
# Operations on the article table are handled via try/except.
# The counters need to be made atomic to ensure consistency.

View File

@@ -41,7 +41,7 @@ from sabnzbd.filesystem import (
)
from sabnzbd.constants import Status, GIGI
import sabnzbd.cfg as cfg
from sabnzbd.nzbstuff import NzbObject, NzbFile
from sabnzbd.nzb import NzbFile, NzbObject
import sabnzbd.par2file as par2file

View File

@@ -498,9 +498,14 @@ def convert_search(search: str) -> str:
return search
def build_history_info(nzo, workdir_complete: str, postproc_time: int, script_output: str, script_line: str):
def build_history_info(
nzo: "sabnzbd.nzbobject.NzbObject",
workdir_complete: str,
postproc_time: int,
script_output: str,
script_line: str,
):
"""Collects all the information needed for the database"""
nzo: sabnzbd.nzbstuff.NzbObject
completed = int(time.time())
pp = PP_LOOKUP.get(opts_to_pp(nzo.repair, nzo.unpack, nzo.delete), "X")

View File

@@ -25,7 +25,7 @@ from typing import Optional
import sabnzbd
from sabnzbd.constants import SABCTOOLS_VERSION_REQUIRED
from sabnzbd.nzbstuff import Article
from sabnzbd.nzb import Article
from sabnzbd.misc import match_str
# Check for correct SABCTools version

View File

@@ -189,7 +189,7 @@ def get_biggest_file(filelist: list[str]) -> str:
return None
def deobfuscate(nzo, filelist: list[str], usefulname: str) -> list[str]:
def deobfuscate(nzo: "sabnzbd.nzbobject.NzbObject", filelist: list[str], usefulname: str) -> list[str]:
"""
For files in filelist:
1. if a file has no meaningful extension, add it (for example ".txt" or ".png")
@@ -227,9 +227,6 @@ def deobfuscate(nzo, filelist: list[str], usefulname: str) -> list[str]:
"""
# Can't be imported directly due to circular import
nzo: sabnzbd.nzbstuff.NzbObject
# to be sure, only keep really existing files and remove any duplicates:
filtered_filelist = list(set(f for f in filelist if os.path.isfile(f)))
@@ -320,7 +317,7 @@ def without_extension(fullpathfilename: str) -> str:
return os.path.splitext(fullpathfilename)[0]
def deobfuscate_subtitles(nzo, filelist: list[str]):
def deobfuscate_subtitles(nzo: "sabnzbd.nzbobject.NzbObject", filelist: list[str]):
"""
input:
nzo, so we can update result via set_unpack_info()
@@ -345,10 +342,6 @@ def deobfuscate_subtitles(nzo, filelist: list[str]):
Something.else.txt
"""
# Can't be imported directly due to circular import
nzo: sabnzbd.nzbstuff.NzbObject
# find .srt files
if not (srt_files := [f for f in filelist if f.endswith(".srt")]):
logging.debug("No .srt files found, so nothing to do")

View File

@@ -31,7 +31,7 @@ import sabnzbd
import sabnzbd.cfg as cfg
from sabnzbd.misc import int_conv, format_time_string, build_and_run_command
from sabnzbd.filesystem import remove_all, real_path, remove_file, get_basename, clip_path
from sabnzbd.nzbstuff import NzbObject, NzbFile
from sabnzbd.nzb import NzbFile, NzbObject
from sabnzbd.encoding import platform_btou
from sabnzbd.decorators import synchronized
from sabnzbd.newsunpack import RAR_EXTRACTFROM_RE, RAR_EXTRACTED_RE, rar_volumelist, add_time_left

View File

@@ -151,7 +151,7 @@ class Server:
self.request: bool = False # True if a getaddrinfo() request is pending
self.have_body: bool = True # Assume server has "BODY", until proven otherwise
self.have_stat: bool = True # Assume server has "STAT", until proven otherwise
self.article_queue: Deque[sabnzbd.nzbstuff.Article] = deque()
self.article_queue: Deque[sabnzbd.nzbarticle.Article] = deque()
# Skip during server testing
if threads:
@@ -540,7 +540,7 @@ class Downloader(Thread):
server.addrinfo = None
@staticmethod
def decode(article: "sabnzbd.nzbstuff.Article", response: Optional[sabctools.NNTPResponse] = None):
def decode(article: "sabnzbd.nzbarticle.Article", response: Optional[sabctools.NNTPResponse] = None):
"""Decode article"""
# Need a better way of draining requests
if article.nzf.nzo.removed_from_queue:
@@ -900,7 +900,7 @@ class Downloader(Thread):
wait: bool = True,
count_article_try: bool = True,
retry_article: bool = True,
article: Optional["sabnzbd.nzbstuff.Article"] = None,
article: Optional["sabnzbd.nzbarticle.Article"] = None,
):
# Some warnings are errors, and not added as server.warning
if warn and reset_msg:

View File

@@ -1367,3 +1367,54 @@ def pathbrowser(path: str, show_hidden: bool = False, show_files: bool = False)
)
return file_list
def create_work_name(name: str) -> str:
"""Remove ".nzb" and ".par(2)" and sanitize, skip URL's"""
if name.find("://") < 0:
# Invalid charters need to be removed before and after (see unit-tests)
return sanitize_foldername(strip_extensions(sanitize_foldername(name)))
else:
return name.strip()
def nzf_cmp_name(nzf1, nzf2):
"""Comparison function for sorting NZB files.
The comparison will sort .par2 files to the top of the queue followed by .rar files,
they will then be sorted by name.
Note: nzf1 and nzf2 should be NzbFile objects, but we can't import that here
to avoid circular dependencies.
"""
nzf1_name = nzf1.filename.lower()
nzf2_name = nzf2.filename.lower()
# Determine vol-pars
is_par1 = ".vol" in nzf1_name and ".par2" in nzf1_name
is_par2 = ".vol" in nzf2_name and ".par2" in nzf2_name
# mini-par2 in front
if not is_par1 and nzf1_name.endswith(".par2"):
return -1
if not is_par2 and nzf2_name.endswith(".par2"):
return 1
# vol-pars go to the back
if is_par1 and not is_par2:
return 1
if is_par2 and not is_par1:
return -1
# Prioritize .rar files above any other type of file (other than vol-par)
m1 = RAR_RE.search(nzf1_name)
m2 = RAR_RE.search(nzf2_name)
if m1 and not (is_par2 or m2):
return -1
elif m2 and not (is_par1 or m1):
return 1
# Force .rar to come before 'r00'
if m1 and m1.group(1) == ".rar":
nzf1_name = nzf1_name.replace(".rar", ".r//")
if m2 and m2.group(1) == ".rar":
nzf2_name = nzf2_name.replace(".rar", ".r//")
return sabnzbd.misc.cmp(nzf1_name, nzf2_name)

View File

@@ -57,7 +57,7 @@ import sabnzbd.config as config
import sabnzbd.cfg as cfg
from sabnzbd.decorators import conditional_cache
from sabnzbd.encoding import ubtou, platform_btou
from sabnzbd.filesystem import userxbit, make_script_path, remove_file
from sabnzbd.filesystem import userxbit, make_script_path, remove_file, strip_extensions
if sabnzbd.WINDOWS:
try:
@@ -85,6 +85,10 @@ RE_SAMPLE = re.compile(r"((^|[\W_])(sample|proof))", re.I) # something-sample o
RE_IP4 = re.compile(r"inet\s+(addr:\s*)?(\d+\.\d+\.\d+\.\d+)")
RE_IP6 = re.compile(r"inet6\s+(addr:\s*)?([0-9a-f:]+)", re.I)
# Name patterns for NZB parsing
RE_SUBJECT_FILENAME_QUOTES = re.compile(r'"([^"]*)"')
RE_SUBJECT_BASIC_FILENAME = re.compile(r"\b([\w\-+()' .,]+(?:\[[\w\-/+()' .,]*][\w\-+()' .,]*)*\.[A-Za-z0-9]{2,4})\b")
# Check if strings are defined for AM and PM
HAVE_AMPM = bool(time.strftime("%p"))
@@ -1591,6 +1595,66 @@ def convert_history_retention():
cfg.history_retention_option.set("all-delete")
def scan_password(name: str) -> tuple[str, Optional[str]]:
"""Get password (if any) from the title"""
if "http://" in name or "https://" in name:
return name, None
# Strip any unwanted usenet-related extensions
name = strip_extensions(name)
# Identify any braces
braces = name[1:].find("{{")
if braces < 0:
braces = len(name)
else:
braces += 1
slash = name.find("/")
# Look for name/password, but make sure that '/' comes before any {{
if 0 < slash < braces and "password=" not in name:
# Is it maybe in 'name / password' notation?
if slash == name.find(" / ") + 1 and name[: slash - 1].strip(". "):
# Remove the extra space after name and before password
return name[: slash - 1].strip(". "), name[slash + 2 :]
if name[:slash].strip(". "):
return name[:slash].strip(". "), name[slash + 1 :]
# Look for "name password=password"
pw = name.find("password=")
if pw > 0 and name[:pw].strip(". "):
return name[:pw].strip(". "), name[pw + 9 :]
# Look for name{{password}}
if braces < len(name):
closing_braces = name.rfind("}}")
if closing_braces > braces and name[:braces].strip(". "):
return name[:braces].strip(". "), name[braces + 2 : closing_braces]
# Look again for name/password
if slash > 0 and name[:slash].strip(". "):
return name[:slash].strip(". "), name[slash + 1 :]
# No password found
return name, None
def subject_name_extractor(subject: str) -> str:
"""Try to extract a file name from a subject line, return `subject` if in doubt"""
# Filename nicely wrapped in quotes
for name in re.findall(RE_SUBJECT_FILENAME_QUOTES, subject):
if name := name.strip(' "'):
return name
# Found nothing? Try a basic filename-like search
for name in re.findall(RE_SUBJECT_BASIC_FILENAME, subject):
if name := name.strip():
return name
# Return the subject
return subject
##
## SABnzbd patched rarfile classes
## Patch for https://github.com/markokr/rarfile/issues/56#issuecomment-711146569

View File

@@ -66,7 +66,7 @@ from sabnzbd.filesystem import (
get_basename,
create_all_dirs,
)
from sabnzbd.nzbstuff import NzbObject
from sabnzbd.nzb import NzbObject
import sabnzbd.cfg as cfg
from sabnzbd.constants import Status

View File

@@ -97,16 +97,16 @@ class NewsWrapper:
self.group: Optional[str] = None
# Command queue and concurrency
self.next_request: Optional[tuple[bytes, Optional["sabnzbd.nzbstuff.Article"]]] = None
self.next_request: Optional[tuple[bytes, Optional["sabnzbd.nzbarticle.Article"]]] = None
self.concurrent_requests: threading.BoundedSemaphore = threading.BoundedSemaphore(
sabnzbd.cfg.pipelining_requests()
)
self._response_queue: deque[Optional[sabnzbd.nzbstuff.Article]] = deque()
self._response_queue: deque[Optional[sabnzbd.nzbarticle.Article]] = deque()
self.selector_events = 0
self.lock: threading.Lock = threading.Lock()
@property
def article(self) -> Optional["sabnzbd.nzbstuff.Article"]:
def article(self) -> Optional["sabnzbd.nzbarticle.Article"]:
"""The article currently being downloaded"""
with self.lock:
if self._response_queue:
@@ -177,12 +177,12 @@ class NewsWrapper:
def queue_command(
self,
command: bytes,
article: Optional["sabnzbd.nzbstuff.Article"] = None,
article: Optional["sabnzbd.nzbarticle.Article"] = None,
) -> None:
"""Add a command to the command queue"""
self.next_request = command, article
def body(self, article: "sabnzbd.nzbstuff.Article") -> tuple[bytes, "sabnzbd.nzbstuff.Article"]:
def body(self, article: "sabnzbd.nzbarticle.Article") -> tuple[bytes, "sabnzbd.nzbarticle.Article"]:
"""Request the body of the article"""
self.timeout = time.time() + self.server.timeout
if article.nzf.nzo.precheck:
@@ -196,7 +196,7 @@ class NewsWrapper:
command = utob("ARTICLE <%s>\r\n" % article.article)
return command, article
def on_response(self, response: sabctools.NNTPResponse, article: Optional["sabnzbd.nzbstuff.Article"]) -> None:
def on_response(self, response: sabctools.NNTPResponse, article: Optional["sabnzbd.nzbarticle.Article"]) -> None:
"""A response to a NNTP request is received"""
self.concurrent_requests.release()
sabnzbd.Downloader.modify_socket(self, EVENT_READ | EVENT_WRITE)
@@ -432,7 +432,7 @@ class NewsWrapper:
def discard(
self,
article: Optional["sabnzbd.nzbstuff.Article"],
article: Optional["sabnzbd.nzbarticle.Article"],
count_article_try: bool = True,
retry_article: bool = True,
) -> None:

59
sabnzbd/nzb/__init__.py Normal file
View File

@@ -0,0 +1,59 @@
#!/usr/bin/python3 -OO
# Copyright 2007-2025 by The SABnzbd-Team (sabnzbd.org)
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
"""
sabnzbd.nzb - NZB-related classes and functionality
"""
# Article-related classes
from sabnzbd.nzb.article import Article, ArticleSaver, TryList, TRYLIST_LOCK
# File-related classes
from sabnzbd.nzb.file import NzbFile, NzbFileSaver, SkippedNzbFile
# Object-related classes
from sabnzbd.nzb.object import (
NzbObject,
NzbObjectSaver,
NzoAttributeSaver,
NZO_LOCK,
NzbEmpty,
NzbRejected,
NzbPreQueueRejected,
NzbRejectToHistory,
)
__all__ = [
# Article
"Article",
"ArticleSaver",
"TryList",
"TRYLIST_LOCK",
# File
"NzbFile",
"NzbFileSaver",
"SkippedNzbFile",
# Object
"NzbObject",
"NzbObjectSaver",
"NzoAttributeSaver",
"NZO_LOCK",
"NzbEmpty",
"NzbRejected",
"NzbPreQueueRejected",
"NzbRejectToHistory",
]

214
sabnzbd/nzb/article.py Normal file
View File

@@ -0,0 +1,214 @@
#!/usr/bin/python3 -OO
# Copyright 2007-2025 by The SABnzbd-Team (sabnzbd.org)
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
"""
sabnzbd.article - Article and TryList classes for NZB downloading
"""
import logging
import threading
from typing import Optional
import sabnzbd
from sabnzbd.downloader import Server
from sabnzbd.filesystem import get_new_id
from sabnzbd.decorators import synchronized
##############################################################################
# Trylist
##############################################################################
TRYLIST_LOCK = threading.RLock()
class TryList:
"""TryList keeps track of which servers have been tried for a specific article"""
# Pre-define attributes to save memory
__slots__ = ("try_list",)
def __init__(self):
# Sets are faster than lists
self.try_list: set[Server] = set()
def server_in_try_list(self, server: Server) -> bool:
"""Return whether specified server has been tried"""
with TRYLIST_LOCK:
return server in self.try_list
def all_servers_in_try_list(self, all_servers: set[Server]) -> bool:
"""Check if all servers have been tried"""
with TRYLIST_LOCK:
return all_servers.issubset(self.try_list)
def add_to_try_list(self, server: Server):
"""Register server as having been tried already"""
with TRYLIST_LOCK:
# Sets cannot contain duplicate items
self.try_list.add(server)
def remove_from_try_list(self, server: Server):
"""Remove server from list of tried servers"""
with TRYLIST_LOCK:
# Discard does not require the item to be present
self.try_list.discard(server)
def reset_try_list(self):
"""Clean the list"""
with TRYLIST_LOCK:
self.try_list = set()
def __getstate__(self):
"""Save the servers"""
return set(server.id for server in self.try_list)
def __setstate__(self, servers_ids: list[str]):
self.try_list = set()
for server in sabnzbd.Downloader.servers:
if server.id in servers_ids:
self.add_to_try_list(server)
##############################################################################
# Article
##############################################################################
ArticleSaver = (
"article",
"art_id",
"bytes",
"lowest_partnum",
"decoded",
"file_size",
"data_begin",
"data_size",
"on_disk",
"nzf",
"crc32",
)
class Article(TryList):
"""Representation of one article"""
# Pre-define attributes to save memory
__slots__ = ArticleSaver + ("fetcher", "fetcher_priority", "tries")
def __init__(self, article, article_bytes, nzf):
super().__init__()
self.article: str = article
self.art_id: Optional[str] = None
self.bytes: int = article_bytes
self.lowest_partnum: bool = False
self.fetcher: Optional[Server] = None
self.fetcher_priority: int = 0
self.tries: int = 0 # Try count
self.decoded: bool = False
self.file_size: Optional[int] = None
self.data_begin: Optional[int] = None
self.data_size: Optional[int] = None
self.on_disk: bool = False
self.crc32: Optional[int] = None
self.nzf = nzf # NzbFile reference
@synchronized(TRYLIST_LOCK)
def reset_try_list(self):
"""In addition to resetting the try list, also reset fetcher so all servers
are tried again. Locked so fetcher setting changes are also protected."""
self.fetcher = None
self.fetcher_priority = 0
super().reset_try_list()
@synchronized(TRYLIST_LOCK)
def allow_new_fetcher(self, remove_fetcher_from_try_list: bool = True):
"""Let article get new fetcher and reset try lists of file and job.
Locked so all resets are performed at once"""
if remove_fetcher_from_try_list:
self.remove_from_try_list(self.fetcher)
self.fetcher = None
self.tries = 0
self.nzf.reset_try_list()
self.nzf.nzo.reset_try_list()
def get_article(self, server: Server, servers: list[Server]):
"""Return article when appropriate for specified server"""
if self.fetcher or self.server_in_try_list(server):
return None
if server.priority > self.fetcher_priority:
# Check for higher priority server, taking advantage of servers list being sorted by priority
for server_check in servers:
if server_check.priority < server.priority:
if server_check.active and not self.server_in_try_list(server_check):
# There is a higher priority server, so set article priority and return
self.fetcher_priority = server_check.priority
return None
else:
# All servers with a higher priority have been checked
break
# If no higher priority servers, use this server
self.fetcher_priority = server.priority
self.fetcher = server
self.tries += 1
return self
def get_art_id(self):
"""Return unique article storage name, create if needed"""
if not self.art_id:
self.art_id = get_new_id("article", self.nzf.nzo.admin_path)
return self.art_id
def search_new_server(self):
"""Search for a new server for this article"""
# Since we need a new server, this one can be listed as failed
sabnzbd.BPSMeter.register_server_article_failed(self.fetcher.id)
self.add_to_try_list(self.fetcher)
# Servers-list could be modified during iteration, so we need a copy
for server in sabnzbd.Downloader.servers[:]:
if server.active and not self.server_in_try_list(server):
if server.priority >= self.fetcher.priority:
self.tries = 0
# Allow all servers for this nzo and nzf again (but not this fetcher for this article)
self.allow_new_fetcher(remove_fetcher_from_try_list=False)
return True
logging.info("Article %s unavailable on all servers, discarding", self.article)
return False
def __getstate__(self):
"""Save to pickle file, selecting attributes"""
dict_ = {}
for item in ArticleSaver:
dict_[item] = getattr(self, item)
dict_["try_list"] = super().__getstate__()
return dict_
def __setstate__(self, dict_):
"""Load from pickle file, selecting attributes"""
for item in ArticleSaver:
try:
setattr(self, item, dict_[item])
except KeyError:
# Handle new attributes
setattr(self, item, None)
super().__setstate__(dict_.get("try_list", []))
self.fetcher = None
self.fetcher_priority = 0
self.tries = 0
def __repr__(self):
return "<Article: article=%s, bytes=%s, art_id=%s>" % (self.article, self.bytes, self.art_id)

290
sabnzbd/nzb/file.py Normal file
View File

@@ -0,0 +1,290 @@
#!/usr/bin/python3 -OO
# Copyright 2007-2025 by The SABnzbd-Team (sabnzbd.org)
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
"""
sabnzbd.nzb.file - NzbFile class for representing files in NZB downloads
"""
import datetime
import logging
import os
import threading
from typing import Optional
import sabctools
from sabnzbd.nzb.article import TryList, Article, TRYLIST_LOCK
from sabnzbd.downloader import Server
from sabnzbd.filesystem import (
sanitize_filename,
get_unique_filename,
get_filename,
remove_file,
get_new_id,
save_data,
load_data,
)
from sabnzbd.misc import int_conv, subject_name_extractor
from sabnzbd.decorators import synchronized
##############################################################################
# NzbFile
##############################################################################
class SkippedNzbFile(Exception):
pass
NzbFileSaver = (
"date",
"filename",
"filename_checked",
"filepath",
"type",
"is_par2",
"vol",
"blocks",
"setname",
"articles",
"decodetable",
"bytes",
"bytes_left",
"nzo",
"nzf_id",
"deleted",
"import_finished",
"crc32",
"assembled",
"md5of16k",
)
class NzbFile(TryList):
"""Representation of one file consisting of multiple articles"""
# Pre-define attributes to save memory
__slots__ = NzbFileSaver + ("lock",)
def __init__(self, date, subject, raw_article_db, file_bytes, nzo):
"""Setup object"""
super().__init__()
self.lock = threading.RLock()
self.date: datetime.datetime = date
self.type: Optional[str] = None
self.filename: str = sanitize_filename(subject_name_extractor(subject))
self.filename_checked = False
self.filepath: Optional[str] = None
# Identifiers for par2 files
self.is_par2: bool = False
self.vol: Optional[int] = None
self.blocks: Optional[int] = None
self.setname: Optional[str] = None
# Articles are removed from "articles" after being fetched
self.articles: dict[Article, Article] = {}
self.decodetable: list[Article] = []
self.bytes: int = file_bytes
self.bytes_left: int = file_bytes
self.nzo = nzo # NzbObject reference
self.deleted = False
self.import_finished = False
self.crc32: Optional[int] = 0
self.assembled: bool = False
self.md5of16k: Optional[bytes] = None
# Add first article to decodetable, this way we can check
# if this is maybe a duplicate nzf
if raw_article_db:
first_article = self.add_article(raw_article_db.pop(0))
first_article.lowest_partnum = True
if self in nzo.files:
logging.info("File %s occurred twice in NZB, skipping", self.filename)
raise SkippedNzbFile
# Create file on disk, which can fail in case of disk errors
self.nzf_id: str = get_new_id("nzf", nzo.admin_path)
if not self.nzf_id:
# Error already shown to user from get_new_id
raise SkippedNzbFile
# Any articles left?
if raw_article_db:
# Save the rest
save_data(raw_article_db, self.nzf_id, nzo.admin_path)
else:
# All imported
self.import_finished = True
def finish_import(self):
"""Load the article objects from disk"""
logging.debug("Finishing import on %s", self.filename)
if raw_article_db := load_data(self.nzf_id, self.nzo.admin_path, remove=False):
for raw_article in raw_article_db:
self.add_article(raw_article)
# Make sure we have labeled the lowest part number
# Also when DirectUnpack is disabled we need to know
self.decodetable[0].lowest_partnum = True
# Mark safe to continue
self.import_finished = True
def add_article(self, article_info):
"""Add article to object database and return article object"""
article = Article(article_info[0], article_info[1], self)
with self.lock:
self.articles[article] = article
self.decodetable.append(article)
return article
def remove_article(self, article: Article, success: bool) -> int:
"""Handle completed article, possibly end of file"""
with self.lock:
if self.articles.pop(article, None) is not None:
if success:
self.bytes_left -= article.bytes
return len(self.articles)
def set_par2(self, setname, vol, blocks):
"""Designate this file as a par2 file"""
self.is_par2 = True
self.setname = setname
self.vol = vol
self.blocks = int_conv(blocks)
def update_crc32(self, crc32: Optional[int], length: int) -> None:
if self.crc32 is None or crc32 is None:
self.crc32 = None
else:
self.crc32 = sabctools.crc32_combine(self.crc32, crc32, length)
def get_articles(self, server: Server, servers: list[Server], fetch_limit: int):
"""Get next articles to be downloaded"""
articles = server.article_queue
with self.lock:
for article in self.articles:
if article := article.get_article(server, servers):
articles.append(article)
if len(articles) >= fetch_limit:
return
self.add_to_try_list(server)
@synchronized(TRYLIST_LOCK)
def reset_all_try_lists(self):
"""Reset all try lists. Locked so reset is performed
for all items at the same time without chance of another
thread changing any of the items while we are resetting"""
with self.lock:
for art in self.articles:
art.reset_try_list()
self.reset_try_list()
def first_article_processed(self) -> bool:
"""Check if the first article has been processed.
This ensures we have attempted to extract md5of16k and filename information
before creating the filepath.
"""
# The first article of decodetable is always the lowest
first_article = self.decodetable[0]
# If it's still in nzo.first_articles, it hasn't been processed yet
return first_article not in self.nzo.first_articles
def prepare_filepath(self):
"""Do all checks before making the final path"""
if not self.filepath:
# Wait for the first article to be processed so we can get md5of16k
# and proper filename before creating the filepath
if not self.first_article_processed():
return None
self.nzo.verify_nzf_filename(self)
filename = sanitize_filename(self.filename)
self.filepath = get_unique_filename(os.path.join(self.nzo.download_path, filename))
self.filename = get_filename(self.filepath)
return self.filepath
@property
def completed(self):
"""Is this file completed?"""
if not self.import_finished:
return False
with self.lock:
return not self.articles
def remove_admin(self):
"""Remove article database from disk (sabnzbd_nzf_<id>)"""
try:
logging.debug("Removing article database for %s", self.nzf_id)
remove_file(os.path.join(self.nzo.admin_path, self.nzf_id))
except Exception:
pass
def __enter__(self):
self.lock.acquire()
def __exit__(self, exc_type, exc_val, exc_tb):
self.lock.release()
def __getstate__(self):
"""Save to pickle file, selecting attributes"""
dict_ = {}
for item in NzbFileSaver:
dict_[item] = getattr(self, item)
dict_["try_list"] = super().__getstate__()
return dict_
def __setstate__(self, dict_):
"""Load from pickle file, selecting attributes"""
for item in NzbFileSaver:
try:
setattr(self, item, dict_[item])
except KeyError:
# Handle new attributes
setattr(self, item, None)
super().__setstate__(dict_.get("try_list", []))
self.lock = threading.RLock()
if isinstance(self.articles, list):
# Converted from list to dict
self.articles = {x: x for x in self.articles}
def __eq__(self, other: "NzbFile"):
"""Assume it's the same file if the number bytes and first article
are the same or if there are no articles left, use the filenames.
Some NZB's are just a mess and report different sizes for the same article.
We used to compare (__eq__) articles based on article-ID, however, this failed
because some NZB's had the same article-ID twice within one NZF.
"""
if other and (self.bytes == other.bytes or len(self.decodetable) == len(other.decodetable)):
if self.decodetable and other.decodetable:
return self.decodetable[0].article == other.decodetable[0].article
# Fallback to filename comparison
return self.filename == other.filename
return False
def __hash__(self):
"""Required because we implement eq. The same file can be spread
over multiple NZO's so we make every NZF unique. Even though
it's considered bad practice.
"""
return id(self)
def __repr__(self):
return "<NzbFile: filename=%s, bytes=%s, nzf_id=%s>" % (self.filename, self.bytes, self.nzf_id)

View File

@@ -16,7 +16,7 @@
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
"""
sabnzbd.nzbstuff - misc
sabnzbd.nzb.object - NzbObject class for representing NZB download jobs
"""
import os
import time
@@ -30,7 +30,8 @@ from typing import Any, Optional, Union, BinaryIO, Deque
# SABnzbd modules
import sabnzbd
import sabctools
from sabnzbd.nzb.article import TryList, Article, TRYLIST_LOCK
from sabnzbd.nzb.file import NzbFile
from sabnzbd.constants import (
GIGI,
ATTRIB_FILE,
@@ -60,6 +61,8 @@ from sabnzbd.misc import (
opts_to_pp,
pp_to_opts,
duplicate_warning,
scan_password,
subject_name_extractor,
)
from sabnzbd.filesystem import (
sanitize_foldername,
@@ -89,463 +92,19 @@ from sabnzbd.filesystem import (
remove_data,
strip_extensions,
get_ext,
create_work_name,
nzf_cmp_name,
RAR_RE,
)
from sabnzbd.par2file import FilePar2Info, has_par2_in_filename, analyse_par2, parse_par2_file, is_par2_file
from sabnzbd.decorators import synchronized
import sabnzbd.config as config
import sabnzbd.cfg as cfg
import sabnzbd.nzbparser
from sabnzbd.downloader import Server
from sabnzbd.database import HistoryDB
from sabnzbd.deobfuscate_filenames import is_probably_obfuscated
# Name patterns
# In the subject, we expect the filename within double quotes
RE_SUBJECT_FILENAME_QUOTES = re.compile(r'"([^"]*)"')
# Otherwise something that looks like a filename
RE_SUBJECT_BASIC_FILENAME = re.compile(r"\b([\w\-+()' .,]+(?:\[[\w\-/+()' .,]*][\w\-+()' .,]*)*\.[A-Za-z0-9]{2,4})\b")
RE_RAR = re.compile(r"(\.rar|\.r\d\d|\.s\d\d|\.t\d\d|\.u\d\d|\.v\d\d)$", re.I)
##############################################################################
# Trylist
##############################################################################
TRYLIST_LOCK = threading.RLock()
class TryList:
"""TryList keeps track of which servers have been tried for a specific article"""
# Pre-define attributes to save memory
__slots__ = ("try_list",)
def __init__(self):
# Sets are faster than lists
self.try_list: set[Server] = set()
def server_in_try_list(self, server: Server) -> bool:
"""Return whether specified server has been tried"""
with TRYLIST_LOCK:
return server in self.try_list
def all_servers_in_try_list(self, all_servers: set[Server]) -> bool:
"""Check if all servers have been tried"""
with TRYLIST_LOCK:
return all_servers.issubset(self.try_list)
def add_to_try_list(self, server: Server):
"""Register server as having been tried already"""
with TRYLIST_LOCK:
# Sets cannot contain duplicate items
self.try_list.add(server)
def remove_from_try_list(self, server: Server):
"""Remove server from list of tried servers"""
with TRYLIST_LOCK:
# Discard does not require the item to be present
self.try_list.discard(server)
def reset_try_list(self):
"""Clean the list"""
with TRYLIST_LOCK:
self.try_list = set()
def __getstate__(self):
"""Save the servers"""
return set(server.id for server in self.try_list)
def __setstate__(self, servers_ids: list[str]):
self.try_list = set()
for server in sabnzbd.Downloader.servers:
if server.id in servers_ids:
self.add_to_try_list(server)
##############################################################################
# Article
##############################################################################
ArticleSaver = (
"article",
"art_id",
"bytes",
"lowest_partnum",
"decoded",
"file_size",
"data_begin",
"data_size",
"on_disk",
"nzf",
"crc32",
)
class Article(TryList):
"""Representation of one article"""
# Pre-define attributes to save memory
__slots__ = ArticleSaver + ("fetcher", "fetcher_priority", "tries")
def __init__(self, article, article_bytes, nzf):
super().__init__()
self.article: str = article
self.art_id: Optional[str] = None
self.bytes: int = article_bytes
self.lowest_partnum: bool = False
self.fetcher: Optional[Server] = None
self.fetcher_priority: int = 0
self.tries: int = 0 # Try count
self.decoded: bool = False
self.file_size: Optional[int] = None
self.data_begin: Optional[int] = None
self.data_size: Optional[int] = None
self.on_disk: bool = False
self.crc32: Optional[int] = None
self.nzf: NzbFile = nzf
@synchronized(TRYLIST_LOCK)
def reset_try_list(self):
"""In addition to resetting the try list, also reset fetcher so all servers
are tried again. Locked so fetcher setting changes are also protected."""
self.fetcher = None
self.fetcher_priority = 0
super().reset_try_list()
@synchronized(TRYLIST_LOCK)
def allow_new_fetcher(self, remove_fetcher_from_try_list: bool = True):
"""Let article get new fetcher and reset try lists of file and job.
Locked so all resets are performed at once"""
if remove_fetcher_from_try_list:
self.remove_from_try_list(self.fetcher)
self.fetcher = None
self.tries = 0
self.nzf.reset_try_list()
self.nzf.nzo.reset_try_list()
def get_article(self, server: Server, servers: list[Server]):
"""Return article when appropriate for specified server"""
if self.fetcher or self.server_in_try_list(server):
return None
if server.priority > self.fetcher_priority:
# Check for higher priority server, taking advantage of servers list being sorted by priority
for server_check in servers:
if server_check.priority < server.priority:
if server_check.active and not self.server_in_try_list(server_check):
# There is a higher priority server, so set article priority and return
self.fetcher_priority = server_check.priority
return None
else:
# All servers with a higher priority have been checked
break
# If no higher priority servers, use this server
self.fetcher_priority = server.priority
self.fetcher = server
self.tries += 1
return self
def get_art_id(self):
"""Return unique article storage name, create if needed"""
if not self.art_id:
self.art_id = get_new_id("article", self.nzf.nzo.admin_path)
return self.art_id
def search_new_server(self):
"""Search for a new server for this article"""
# Since we need a new server, this one can be listed as failed
sabnzbd.BPSMeter.register_server_article_failed(self.fetcher.id)
self.add_to_try_list(self.fetcher)
# Servers-list could be modified during iteration, so we need a copy
for server in sabnzbd.Downloader.servers[:]:
if server.active and not self.server_in_try_list(server):
if server.priority >= self.fetcher.priority:
self.tries = 0
# Allow all servers for this nzo and nzf again (but not this fetcher for this article)
self.allow_new_fetcher(remove_fetcher_from_try_list=False)
return True
logging.info("Article %s unavailable on all servers, discarding", self.article)
return False
def __getstate__(self):
"""Save to pickle file, selecting attributes"""
dict_ = {}
for item in ArticleSaver:
dict_[item] = getattr(self, item)
dict_["try_list"] = super().__getstate__()
return dict_
def __setstate__(self, dict_):
"""Load from pickle file, selecting attributes"""
for item in ArticleSaver:
try:
setattr(self, item, dict_[item])
except KeyError:
# Handle new attributes
setattr(self, item, None)
super().__setstate__(dict_.get("try_list", []))
self.fetcher = None
self.fetcher_priority = 0
self.tries = 0
def __repr__(self):
return "<Article: article=%s, bytes=%s, art_id=%s>" % (self.article, self.bytes, self.art_id)
##############################################################################
# NzbFile
##############################################################################
class SkippedNzbFile(Exception):
pass
NzbFileSaver = (
"date",
"filename",
"filename_checked",
"filepath",
"type",
"is_par2",
"vol",
"blocks",
"setname",
"articles",
"decodetable",
"bytes",
"bytes_left",
"nzo",
"nzf_id",
"deleted",
"import_finished",
"crc32",
"assembled",
"md5of16k",
)
class NzbFile(TryList):
"""Representation of one file consisting of multiple articles"""
# Pre-define attributes to save memory
__slots__ = NzbFileSaver + ("lock",)
def __init__(self, date, subject, raw_article_db, file_bytes, nzo):
"""Setup object"""
super().__init__()
self.lock = threading.RLock()
self.date: datetime.datetime = date
self.type: Optional[str] = None
self.filename: str = sanitize_filename(name_extractor(subject))
self.filename_checked = False
self.filepath: Optional[str] = None
# Identifiers for par2 files
self.is_par2: bool = False
self.vol: Optional[int] = None
self.blocks: Optional[int] = None
self.setname: Optional[str] = None
# Articles are removed from "articles" after being fetched
self.articles: dict[Article, Article] = {}
self.decodetable: list[Article] = []
self.bytes: int = file_bytes
self.bytes_left: int = file_bytes
self.nzo: NzbObject = nzo
self.deleted = False
self.import_finished = False
self.crc32: Optional[int] = 0
self.assembled: bool = False
self.md5of16k: Optional[bytes] = None
# Add first article to decodetable, this way we can check
# if this is maybe a duplicate nzf
if raw_article_db:
first_article = self.add_article(raw_article_db.pop(0))
first_article.lowest_partnum = True
if self in nzo.files:
logging.info("File %s occurred twice in NZB, skipping", self.filename)
raise SkippedNzbFile
# Create file on disk, which can fail in case of disk errors
self.nzf_id: str = get_new_id("nzf", nzo.admin_path)
if not self.nzf_id:
# Error already shown to user from get_new_id
raise SkippedNzbFile
# Any articles left?
if raw_article_db:
# Save the rest
save_data(raw_article_db, self.nzf_id, nzo.admin_path)
else:
# All imported
self.import_finished = True
def finish_import(self):
"""Load the article objects from disk"""
logging.debug("Finishing import on %s", self.filename)
if raw_article_db := load_data(self.nzf_id, self.nzo.admin_path, remove=False):
for raw_article in raw_article_db:
self.add_article(raw_article)
# Make sure we have labeled the lowest part number
# Also when DirectUnpack is disabled we need to know
self.decodetable[0].lowest_partnum = True
# Mark safe to continue
self.import_finished = True
def add_article(self, article_info):
"""Add article to object database and return article object"""
article = Article(article_info[0], article_info[1], self)
with self.lock:
self.articles[article] = article
self.decodetable.append(article)
return article
def remove_article(self, article: Article, success: bool) -> int:
"""Handle completed article, possibly end of file"""
with self.lock:
if self.articles.pop(article, None) is not None:
if success:
self.bytes_left -= article.bytes
return len(self.articles)
def set_par2(self, setname, vol, blocks):
"""Designate this file as a par2 file"""
self.is_par2 = True
self.setname = setname
self.vol = vol
self.blocks = int_conv(blocks)
def update_crc32(self, crc32: Optional[int], length: int) -> None:
if self.crc32 is None or crc32 is None:
self.crc32 = None
else:
self.crc32 = sabctools.crc32_combine(self.crc32, crc32, length)
def get_articles(self, server: Server, servers: list[Server], fetch_limit: int):
"""Get next articles to be downloaded"""
articles = server.article_queue
with self.lock:
for article in self.articles:
if article := article.get_article(server, servers):
articles.append(article)
if len(articles) >= fetch_limit:
return
self.add_to_try_list(server)
@synchronized(TRYLIST_LOCK)
def reset_all_try_lists(self):
"""Reset all try lists. Locked so reset is performed
for all items at the same time without chance of another
thread changing any of the items while we are resetting"""
with self.lock:
for art in self.articles:
art.reset_try_list()
self.reset_try_list()
def first_article_processed(self) -> bool:
"""Check if the first article has been processed.
This ensures we have attempted to extract md5of16k and filename information
before creating the filepath.
"""
# The first article of decodetable is always the lowest
first_article = self.decodetable[0]
# If it's still in nzo.first_articles, it hasn't been processed yet
return first_article not in self.nzo.first_articles
def prepare_filepath(self):
"""Do all checks before making the final path"""
if not self.filepath:
# Wait for the first article to be processed so we can get md5of16k
# and proper filename before creating the filepath
if not self.first_article_processed():
return None
self.nzo.verify_nzf_filename(self)
filename = sanitize_filename(self.filename)
self.filepath = get_unique_filename(os.path.join(self.nzo.download_path, filename))
self.filename = get_filename(self.filepath)
return self.filepath
@property
def completed(self):
"""Is this file completed?"""
if not self.import_finished:
return False
with self.lock:
return not self.articles
def remove_admin(self):
"""Remove article database from disk (sabnzbd_nzf_<id>)"""
try:
logging.debug("Removing article database for %s", self.nzf_id)
remove_file(os.path.join(self.nzo.admin_path, self.nzf_id))
except Exception:
pass
def __enter__(self):
self.lock.acquire()
def __exit__(self, exc_type, exc_val, exc_tb):
self.lock.release()
def __getstate__(self):
"""Save to pickle file, selecting attributes"""
dict_ = {}
for item in NzbFileSaver:
dict_[item] = getattr(self, item)
dict_["try_list"] = super().__getstate__()
return dict_
def __setstate__(self, dict_):
"""Load from pickle file, selecting attributes"""
for item in NzbFileSaver:
try:
setattr(self, item, dict_[item])
except KeyError:
# Handle new attributes
setattr(self, item, None)
super().__setstate__(dict_.get("try_list", []))
self.lock = threading.RLock()
if isinstance(self.articles, list):
# Converted from list to dict
self.articles = {x: x for x in self.articles}
def __eq__(self, other: "NzbFile"):
"""Assume it's the same file if the number bytes and first article
are the same or if there are no articles left, use the filenames.
Some NZB's are just a mess and report different sizes for the same article.
We used to compare (__eq__) articles based on article-ID, however, this failed
because some NZB's had the same article-ID twice within one NZF.
"""
if other and (self.bytes == other.bytes or len(self.decodetable) == len(other.decodetable)):
if self.decodetable and other.decodetable:
return self.decodetable[0].article == other.decodetable[0].article
# Fallback to filename comparison
return self.filename == other.filename
return False
def __hash__(self):
"""Required because we implement eq. The same file can be spread
over multiple NZO's so we make every NZF unique. Even though
it's considered bad practice.
"""
return id(self)
def __repr__(self):
return "<NzbFile: filename=%s, bytes=%s, nzf_id=%s>" % (self.filename, self.bytes, self.nzf_id)
##############################################################################
# NzbObject
##############################################################################
class NzbEmpty(Exception):
pass
@@ -1032,7 +591,7 @@ class NzbObject(TryList):
logging.debug("Unwanted Extension: putting last rar after first rar")
firstrarpos = lastrarpos = 0
for nzfposcounter, nzf in enumerate(self.files):
if RE_RAR.search(nzf.filename.lower()):
if RAR_RE.search(nzf.filename.lower()):
# a NZF found with '.rar' in the name
if firstrarpos == 0:
# this is the first .rar found, so remember this position
@@ -2134,109 +1693,3 @@ class NzbObject(TryList):
def __repr__(self):
return "<NzbObject: filename=%s, bytes=%s, nzo_id=%s>" % (self.filename, self.bytes, self.nzo_id)
def nzf_cmp_name(nzf1: NzbFile, nzf2: NzbFile):
# The comparison will sort .par2 files to the top of the queue followed by .rar files,
# they will then be sorted by name.
nzf1_name = nzf1.filename.lower()
nzf2_name = nzf2.filename.lower()
# Determine vol-pars
is_par1 = ".vol" in nzf1_name and ".par2" in nzf1_name
is_par2 = ".vol" in nzf2_name and ".par2" in nzf2_name
# mini-par2 in front
if not is_par1 and nzf1_name.endswith(".par2"):
return -1
if not is_par2 and nzf2_name.endswith(".par2"):
return 1
# vol-pars go to the back
if is_par1 and not is_par2:
return 1
if is_par2 and not is_par1:
return -1
# Prioritize .rar files above any other type of file (other than vol-par)
m1 = RE_RAR.search(nzf1_name)
m2 = RE_RAR.search(nzf2_name)
if m1 and not (is_par2 or m2):
return -1
elif m2 and not (is_par1 or m1):
return 1
# Force .rar to come before 'r00'
if m1 and m1.group(1) == ".rar":
nzf1_name = nzf1_name.replace(".rar", ".r//")
if m2 and m2.group(1) == ".rar":
nzf2_name = nzf2_name.replace(".rar", ".r//")
return cmp(nzf1_name, nzf2_name)
def create_work_name(name: str) -> str:
"""Remove ".nzb" and ".par(2)" and sanitize, skip URL's"""
if name.find("://") < 0:
# Invalid charters need to be removed before and after (see unit-tests)
return sanitize_foldername(strip_extensions(sanitize_foldername(name)))
else:
return name.strip()
def scan_password(name: str) -> tuple[str, Optional[str]]:
"""Get password (if any) from the title"""
if "http://" in name or "https://" in name:
return name, None
# Strip any unwanted usenet-related extensions
name = strip_extensions(name)
# Identify any braces
braces = name[1:].find("{{")
if braces < 0:
braces = len(name)
else:
braces += 1
slash = name.find("/")
# Look for name/password, but make sure that '/' comes before any {{
if 0 < slash < braces and "password=" not in name:
# Is it maybe in 'name / password' notation?
if slash == name.find(" / ") + 1 and name[: slash - 1].strip(". "):
# Remove the extra space after name and before password
return name[: slash - 1].strip(". "), name[slash + 2 :]
if name[:slash].strip(". "):
return name[:slash].strip(". "), name[slash + 1 :]
# Look for "name password=password"
pw = name.find("password=")
if pw > 0 and name[:pw].strip(". "):
return name[:pw].strip(". "), name[pw + 9 :]
# Look for name{{password}}
if braces < len(name):
closing_braces = name.rfind("}}")
if closing_braces > braces and name[:braces].strip(". "):
return name[:braces].strip(". "), name[braces + 2 : closing_braces]
# Look again for name/password
if slash > 0 and name[:slash].strip(". "):
return name[:slash].strip(". "), name[slash + 1 :]
# No password found
return name, None
def name_extractor(subject: str) -> str:
"""Try to extract a file name from a subject line, return `subject` if in doubt"""
# Filename nicely wrapped in quotes
for name in re.findall(RE_SUBJECT_FILENAME_QUOTES, subject):
if name := name.strip(' "'):
return name
# Found nothing? Try a basic filename-like search
for name in re.findall(RE_SUBJECT_BASIC_FILENAME, subject):
if name := name.strip():
return name
# Return the subject
return subject

View File

@@ -33,7 +33,15 @@ import cherrypy._cpreqbody
from typing import Optional, Any, Union
import sabnzbd
from sabnzbd import nzbstuff
from sabnzbd.nzb import (
NzbObject,
NzbEmpty,
NzbRejected,
NzbPreQueueRejected,
NzbRejectToHistory,
NzbFile,
SkippedNzbFile,
)
from sabnzbd.encoding import utob, correct_cherrypy_encoding
from sabnzbd.filesystem import (
get_filename,
@@ -204,7 +212,7 @@ def process_nzb_archive_file(
if datap:
nzo = None
try:
nzo = nzbstuff.NzbObject(
nzo = NzbObject(
name,
pp=pp,
script=script,
@@ -220,13 +228,13 @@ def process_nzb_archive_file(
dup_check=dup_check,
)
except (
sabnzbd.nzbstuff.NzbEmpty,
sabnzbd.nzbstuff.NzbRejected,
sabnzbd.nzbstuff.NzbPreQueueRejected,
NzbEmpty,
NzbRejected,
NzbPreQueueRejected,
):
# Empty or fully rejected (including pre-queue rejections)
pass
except sabnzbd.nzbstuff.NzbRejectToHistory as err:
except NzbRejectToHistory as err:
# Duplicate or unwanted extension directed to history
sabnzbd.NzbQueue.fail_to_history(err.nzo)
nzo_ids.append(err.nzo.nzo_id)
@@ -315,7 +323,7 @@ def process_single_nzb(
nzo = None
nzo_ids = []
try:
nzo = nzbstuff.NzbObject(
nzo = NzbObject(
filename,
pp=pp,
script=script,
@@ -330,16 +338,16 @@ def process_single_nzb(
nzo_id=nzo_id,
dup_check=dup_check,
)
except sabnzbd.nzbstuff.NzbEmpty:
except NzbEmpty:
# Malformed or might not be an NZB file
result = AddNzbFileResult.NO_FILES_FOUND
except sabnzbd.nzbstuff.NzbRejected:
except NzbRejected:
# Rejected as duplicate
result = AddNzbFileResult.ERROR
except sabnzbd.nzbstuff.NzbPreQueueRejected:
except NzbPreQueueRejected:
# Rejected by pre-queue script - should be silently ignored for URL fetches
result = AddNzbFileResult.PREQUEUE_REJECTED
except sabnzbd.nzbstuff.NzbRejectToHistory as err:
except NzbRejectToHistory as err:
# Duplicate or unwanted extension directed to history
sabnzbd.NzbQueue.fail_to_history(err.nzo)
nzo_ids.append(err.nzo.nzo_id)
@@ -366,7 +374,7 @@ def process_single_nzb(
def nzbfile_parser(full_nzb_path: str, nzo):
# For type-hinting
nzo: sabnzbd.nzbstuff.NzbObject
nzo: NzbObject
# Hash for dupe-checking
md5sum = hashlib.md5()
@@ -470,8 +478,8 @@ def nzbfile_parser(full_nzb_path: str, nzo):
# Create NZF
try:
nzf = sabnzbd.nzbstuff.NzbFile(file_date, file_name, raw_article_db_sorted, file_bytes, nzo)
except sabnzbd.nzbstuff.SkippedNzbFile:
nzf = NzbFile(file_date, file_name, raw_article_db_sorted, file_bytes, nzo)
except SkippedNzbFile:
# Did not meet requirements, so continue
skipped_files += 1
continue

View File

@@ -26,7 +26,7 @@ import cherrypy._cpreqbody
from typing import Union, Optional
import sabnzbd
from sabnzbd.nzbstuff import NzbObject, Article
from sabnzbd.nzb import Article, NzbObject
from sabnzbd.misc import exit_sab, cat_to_opts, int_conv, caller_name, safe_lower, duplicate_warning
from sabnzbd.filesystem import get_admin_path, remove_all, globber_full, remove_file, is_valid_script
from sabnzbd.nzbparser import process_single_nzb

View File

@@ -73,7 +73,7 @@ from sabnzbd.filesystem import (
get_ext,
get_filename,
)
from sabnzbd.nzbstuff import NzbObject
from sabnzbd.nzb import NzbObject
from sabnzbd.sorting import Sorter
from sabnzbd.constants import (
REPAIR_PRIORITY,

View File

@@ -47,7 +47,8 @@ from sabnzbd.constants import (
GUESSIT_SORT_TYPES,
)
from sabnzbd.misc import is_sample, from_units, sort_to_opts
from sabnzbd.nzbstuff import NzbObject, scan_password
from sabnzbd.misc import scan_password
from sabnzbd.nzb import NzbObject
# Do not rename .vob files as they are usually DVD's
EXCLUDED_FILE_EXTS = (".vob", ".bin")

View File

@@ -51,7 +51,7 @@ import sabnzbd.notifier as notifier
from sabnzbd.decorators import NZBQUEUE_LOCK
from sabnzbd.encoding import ubtou, utob
from sabnzbd.nzbparser import AddNzbFileResult
from sabnzbd.nzbstuff import NzbObject, NzbRejected, NzbRejectToHistory
from sabnzbd.nzb import NzbObject
class URLGrabber(Thread):

View File

@@ -28,7 +28,7 @@ from unittest import mock
import sabctools
import sabnzbd.decoder as decoder
from sabnzbd.nzbstuff import Article
from sabnzbd.nzb import Article
def uu(data: bytes):

View File

@@ -1256,3 +1256,21 @@ class TestOtherFileSystemFunctions:
)
def test_strip_extensions(self, name, ext_to_remove, output):
assert filesystem.strip_extensions(name, ext_to_remove) == output
@pytest.mark.parametrize(
"file_name, clean_file_name",
[
("my_awesome_nzb_file.pAr2.nZb", "my_awesome_nzb_file"),
("my_awesome_nzb_file.....pAr2.nZb", "my_awesome_nzb_file"),
("my_awesome_nzb_file....par2..", "my_awesome_nzb_file"),
(" my_awesome_nzb_file .pAr.nZb", "my_awesome_nzb_file"),
("with.extension.and.period.par2.", "with.extension.and.period"),
("nothing.in.here", "nothing.in.here"),
(" just.space ", "just.space"),
("http://test.par2 ", "http://test.par2"),
],
)
def test_create_work_name(self, file_name, clean_file_name):
# Only test stuff specific for create_work_name
# The sanitizing is already tested in tests for sanitize_foldername
assert filesystem.create_work_name(file_name) == clean_file_name

View File

@@ -837,6 +837,94 @@ class TestMisc:
_func()
@pytest.mark.parametrize(
"argument, name, password",
[
("my_awesome_nzb_file{{password}}", "my_awesome_nzb_file", "password"),
("file_with_text_after_pw{{passw0rd}}_[180519]", "file_with_text_after_pw", "passw0rd"),
("file_without_pw", "file_without_pw", None),
("multiple_pw{{first-pw}}_{{second-pw}}", "multiple_pw", "first-pw}}_{{second-pw"), # Greed is Good
("デビアン", "デビアン", None), # Unicode
("Gentoo_Hobby_Edition {{secret}}", "Gentoo_Hobby_Edition", "secret"), # Space between name and password
("Test {{secret}}.nzb", "Test", "secret"),
("Mandrake{{top{{secret}}", "Mandrake", "top{{secret"), # Double opening {{
("Красная}}{{Шляпа}}", "Красная}}", "Шляпа"), # Double closing }}
("{{Jobname{{PassWord}}", "{{Jobname", "PassWord"), # {{ at start
("Hello/kITTY", "Hello", "kITTY"), # Notation with slash
("Hello/kITTY.nzb", "Hello", "kITTY"), # Notation with slash and extension
("/Jobname", "/Jobname", None), # Slash at start
("Jobname/Top{{Secret}}", "Jobname", "Top{{Secret}}"), # Slash with braces
("Jobname / Top{{Secret}}", "Jobname", "Top{{Secret}}"), # Slash with braces and extra spaces
("Jobname / Top{{Secret}}.nzb", "Jobname", "Top{{Secret}}"),
("לינוקס/معلومات سرية", "לינוקס", "معلومات سرية"), # LTR with slash
("לינוקס{{معلومات سرية}}", "לינוקס", "معلومات سرية"), # LTR with brackets
("thư điện tử password=mật_khẩu", "thư điện tử", "mật_khẩu"), # Password= notation
("password=PartOfTheJobname", "password=PartOfTheJobname", None), # Password= at the start
("Job password=Test.par2", "Job", "Test"), # Password= including extension
("Job}}Name{{FTW", "Job}}Name{{FTW", None), # Both {{ and }} present but incorrect order (no password)
("./Text", "./Text", None), # Name would end up empty after the function strips the dot
],
)
def test_scan_password(self, argument, name, password):
assert misc.scan_password(argument) == (name, password)
@pytest.mark.parametrize(
"subject, filename",
[
('Great stuff (001/143) - "Filename.txt" yEnc (1/1)', "Filename.txt"),
(
'"910a284f98ebf57f6a531cd96da48838.vol01-03.par2" yEnc (1/3)',
"910a284f98ebf57f6a531cd96da48838.vol01-03.par2",
),
('Subject-KrzpfTest [02/30] - ""KrzpfTest.part.nzb"" yEnc', "KrzpfTest.part.nzb"),
(
'[PRiVATE]-[WtFnZb]-[Supertje-_S03E11-12_-blabla_+_blabla_WEBDL-480p.mkv]-[4/12] - "" yEnc 9786 (1/1366)',
"Supertje-_S03E11-12_-blabla_+_blabla_WEBDL-480p.mkv",
),
(
'[N3wZ] MAlXD245333\\::[PRiVATE]-[WtFnZb]-[Show.S04E04.720p.AMZN.WEBRip.x264-GalaxyTV.mkv]-[1/2] - "" yEnc 293197257 (1/573)',
"Show.S04E04.720p.AMZN.WEBRip.x264-GalaxyTV.mkv",
),
(
'reftestnzb bf1664007a71 [1/6] - "20b9152c-57eb-4d02-9586-66e30b8e3ac2" yEnc (1/22) 15728640',
"20b9152c-57eb-4d02-9586-66e30b8e3ac2",
),
(
"Re: REQ Author Child's The Book-Thanks much - Child, Lee - Author - The Book.epub (1/1)",
"REQ Author Child's The Book-Thanks much - Child, Lee - Author - The Book.epub",
),
('63258-0[001/101] - "63258-2.0" yEnc (1/250) (1/250)', "63258-2.0"),
# If specified between ", the extension is allowed to be too long
('63258-0[001/101] - "63258-2.0toolong" yEnc (1/250) (1/250)', "63258-2.0toolong"),
(
"Singer - A Album (2005) - [04/25] - 02 Sweetest Somebody (I Know).flac",
"Singer - A Album (2005) - [04/25] - 02 Sweetest Somebody (I Know).flac",
),
("<>random!>", "<>random!>"),
("nZb]-[Supertje-_S03E11-12_", "nZb]-[Supertje-_S03E11-12_"),
("Bla [Now it's done.exe]", "Now it's done.exe"),
# If specified between [], the extension should be a valid one
("Bla [Now it's done.123nonsense]", "Bla [Now it's done.123nonsense]"),
('[PRiVATE]-[WtFnZb]-[00000.clpi]-[1/46] - "" yEnc 788 (1/1)', "00000.clpi"),
(
'[PRiVATE]-[WtFnZb]-[Video_(2001)_AC5.1_-RELEASE_[TAoE].mkv]-[1/23] - "" yEnc 1234567890 (1/23456)',
"Video_(2001)_AC5.1_-RELEASE_[TAoE].mkv",
),
(
"[PRiVATE]-[WtFnZb]-[219]-[1/series.name.s01e01.1080p.web.h264-group.mkv] - "
" yEnc (1/[PRiVATE] \\c2b510b594\\::686ea969999193.155368eba4965e56a8cd263382e012.f2712fdc::/97bd201cf931/) 1 (1/0)",
"series.name.s01e01.1080p.web.h264-group.mkv",
),
(
"[PRiVATE]-[WtFnZb]-[/More.Bla.S02E01.1080p.WEB.h264-EDITH[eztv.re].mkv-WtF[nZb]/"
'More.Bla.S02E01.1080p.WEB.h264-EDITH.mkv]-[1/2] - "" yEnc 2990558544 (1/4173)',
"More.Bla.S02E01.1080p.WEB.h264-EDITH[eztv.re].mkv",
),
],
)
def test_name_extractor(self, subject, filename):
assert misc.subject_name_extractor(subject) == filename
class TestBuildAndRunCommand:
# Path should exist

69
tests/test_nzbarticle.py Normal file
View File

@@ -0,0 +1,69 @@
#!/usr/bin/python3 -OO
# Copyright 2007-2025 by The SABnzbd-Team (sabnzbd.org)
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
"""
tests.test_nzbarticle - Testing functions in nzbarticle.py
"""
from sabnzbd.nzb import Article
from tests.testhelper import *
class Server:
def __init__(self, host, priority, active):
self.host = host
self.priority = priority
self.active = active
class TestArticle:
def test_get_article(self):
article_id = "test@host" + os.urandom(8).hex() + ".sab"
article = Article(article_id, randint(4321, 54321), None)
servers = []
servers.append(Server("testserver1", 10, True))
servers.append(Server("testserver2", 20, True))
servers.append(Server("testserver3", 30, True))
# Test fetching top priority server
server = servers[0]
assert article.get_article(server, servers) == article
assert article.fetcher_priority == 10
assert article.fetcher == server
assert article.get_article(server, servers) == None
article.fetcher = None
article.add_to_try_list(server)
assert article.get_article(server, servers) == None
# Test fetching when there is a higher priority server available
server = servers[2]
assert article.fetcher_priority == 10
assert article.get_article(server, servers) == None
assert article.fetcher_priority == 20
# Server should be used even if article.fetcher_priority is a higher number than server.priority
article.fetcher_priority = 30
server = servers[1]
assert article.get_article(server, servers) == article
# Inactive servers in servers list should be ignored
article.fetcher = None
article.fetcher_priority = 0
servers[1].active = False
server = servers[2]
assert article.get_article(server, servers) == article
assert article.tries == 3

67
tests/test_nzbobject.py Normal file
View File

@@ -0,0 +1,67 @@
#!/usr/bin/python3 -OO
# Copyright 2007-2025 by The SABnzbd-Team (sabnzbd.org)
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
"""
tests.test_nzbobject - Testing functions in nzbobject.py
"""
from sabnzbd.nzb import NzbObject
from sabnzbd.config import ConfigCat
from sabnzbd.constants import NORMAL_PRIORITY
from sabnzbd.filesystem import globber
from tests.testhelper import *
@pytest.mark.usefixtures("clean_cache_dir")
class TestNZO:
@set_config({"download_dir": SAB_CACHE_DIR})
def test_nzo_basic(self):
# Need to create the Default category, as we would in normal instance
# Otherwise it will try to save the config
def_cat = ConfigCat("*", {"pp": 3, "script": "None", "priority": NORMAL_PRIORITY})
# Create empty object, normally used to grab URL's
nzo = NzbObject("test_basic")
assert nzo.work_name == "test_basic"
assert not nzo.files
# Create NZB-file to import
nzb_fp = create_and_read_nzb_fp("basic_rar5")
# Very basic test of NZO creation with data
nzo = NzbObject("test_basic_data", nzb_fp=nzb_fp)
assert nzo.final_name == "test_basic_data"
assert nzo.files
assert nzo.files[0].filename == "testfile.rar"
assert nzo.bytes == 283
assert nzo.files[0].bytes == 283
# work_name can be trimmed in Windows due to max-path-length
assert "test_basic_data".startswith(nzo.work_name)
assert os.path.exists(nzo.admin_path)
# Check if there's an nzf file and the backed-up nzb
assert globber(nzo.admin_path, "*.nzb.gz")
assert globber(nzo.admin_path, "SABnzbd_nzf*")
# Should have picked up the default category settings
assert nzo.cat == "*"
assert nzo.script == def_cat.script() == "None"
assert nzo.priority == def_cat.priority() == NORMAL_PRIORITY
assert nzo.repair and nzo.unpack and nzo.delete
# TODO: More checks!

View File

@@ -21,7 +21,7 @@ tests.test_nzbparser - Tests of basic NZB parsing
from tests.testhelper import *
import sabnzbd.nzbparser as nzbparser
from sabnzbd import nzbstuff
from sabnzbd.nzb import NzbObject
from sabnzbd.filesystem import save_compressed
@@ -29,7 +29,7 @@ from sabnzbd.filesystem import save_compressed
class TestNzbParser:
@set_config({"download_dir": SAB_CACHE_DIR})
def test_nzbparser(self):
nzo = nzbstuff.NzbObject("test_basic")
nzo = NzbObject("test_basic")
# Create test file
metadata = {"category": "test", "password": "testpass"}
nzb_fp = create_and_read_nzb_fp("..", metadata=metadata)

View File

@@ -1,221 +0,0 @@
#!/usr/bin/python3 -OO
# Copyright 2007-2025 by The SABnzbd-Team (sabnzbd.org)
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
"""
tests.test_nzbstuff - Testing functions in nzbstuff.py
"""
import sabnzbd.nzbstuff as nzbstuff
from sabnzbd.config import ConfigCat
from sabnzbd.constants import NORMAL_PRIORITY
from sabnzbd.filesystem import globber
from tests.testhelper import *
@pytest.mark.usefixtures("clean_cache_dir")
class TestNZO:
@set_config({"download_dir": SAB_CACHE_DIR})
def test_nzo_basic(self):
# Need to create the Default category, as we would in normal instance
# Otherwise it will try to save the config
def_cat = ConfigCat("*", {"pp": 3, "script": "None", "priority": NORMAL_PRIORITY})
# Create empty object, normally used to grab URL's
nzo = nzbstuff.NzbObject("test_basic")
assert nzo.work_name == "test_basic"
assert not nzo.files
# Create NZB-file to import
nzb_fp = create_and_read_nzb_fp("basic_rar5")
# Very basic test of NZO creation with data
nzo = nzbstuff.NzbObject("test_basic_data", nzb_fp=nzb_fp)
assert nzo.final_name == "test_basic_data"
assert nzo.files
assert nzo.files[0].filename == "testfile.rar"
assert nzo.bytes == 283
assert nzo.files[0].bytes == 283
# work_name can be trimmed in Windows due to max-path-length
assert "test_basic_data".startswith(nzo.work_name)
assert os.path.exists(nzo.admin_path)
# Check if there's an nzf file and the backed-up nzb
assert globber(nzo.admin_path, "*.nzb.gz")
assert globber(nzo.admin_path, "SABnzbd_nzf*")
# Should have picked up the default category settings
assert nzo.cat == "*"
assert nzo.script == def_cat.script() == "None"
assert nzo.priority == def_cat.priority() == NORMAL_PRIORITY
assert nzo.repair and nzo.unpack and nzo.delete
# TODO: More checks!
class Server:
def __init__(self, host, priority, active):
self.host = host
self.priority = priority
self.active = active
class TestArticle:
def test_get_article(self):
article_id = "test@host" + os.urandom(8).hex() + ".sab"
article = nzbstuff.Article(article_id, randint(4321, 54321), None)
servers = []
servers.append(Server("testserver1", 10, True))
servers.append(Server("testserver2", 20, True))
servers.append(Server("testserver3", 30, True))
# Test fetching top priority server
server = servers[0]
assert article.get_article(server, servers) == article
assert article.fetcher_priority == 10
assert article.fetcher == server
assert article.get_article(server, servers) == None
article.fetcher = None
article.add_to_try_list(server)
assert article.get_article(server, servers) == None
# Test fetching when there is a higher priority server available
server = servers[2]
assert article.fetcher_priority == 10
assert article.get_article(server, servers) == None
assert article.fetcher_priority == 20
# Server should be used even if article.fetcher_priority is a higher number than server.priority
article.fetcher_priority = 30
server = servers[1]
assert article.get_article(server, servers) == article
# Inactive servers in servers list should be ignored
article.fetcher = None
article.fetcher_priority = 0
servers[1].active = False
server = servers[2]
assert article.get_article(server, servers) == article
assert article.tries == 3
class TestNZBStuffHelpers:
@pytest.mark.parametrize(
"argument, name, password",
[
("my_awesome_nzb_file{{password}}", "my_awesome_nzb_file", "password"),
("file_with_text_after_pw{{passw0rd}}_[180519]", "file_with_text_after_pw", "passw0rd"),
("file_without_pw", "file_without_pw", None),
("multiple_pw{{first-pw}}_{{second-pw}}", "multiple_pw", "first-pw}}_{{second-pw"), # Greed is Good
("デビアン", "デビアン", None), # Unicode
("Gentoo_Hobby_Edition {{secret}}", "Gentoo_Hobby_Edition", "secret"), # Space between name and password
("Test {{secret}}.nzb", "Test", "secret"),
("Mandrake{{top{{secret}}", "Mandrake", "top{{secret"), # Double opening {{
("Красная}}{{Шляпа}}", "Красная}}", "Шляпа"), # Double closing }}
("{{Jobname{{PassWord}}", "{{Jobname", "PassWord"), # {{ at start
("Hello/kITTY", "Hello", "kITTY"), # Notation with slash
("Hello/kITTY.nzb", "Hello", "kITTY"), # Notation with slash and extension
("/Jobname", "/Jobname", None), # Slash at start
("Jobname/Top{{Secret}}", "Jobname", "Top{{Secret}}"), # Slash with braces
("Jobname / Top{{Secret}}", "Jobname", "Top{{Secret}}"), # Slash with braces and extra spaces
("Jobname / Top{{Secret}}.nzb", "Jobname", "Top{{Secret}}"),
("לינוקס/معلومات سرية", "לינוקס", "معلومات سرية"), # LTR with slash
("לינוקס{{معلومات سرية}}", "לינוקס", "معلومات سرية"), # LTR with brackets
("thư điện tử password=mật_khẩu", "thư điện tử", "mật_khẩu"), # Password= notation
("password=PartOfTheJobname", "password=PartOfTheJobname", None), # Password= at the start
("Job password=Test.par2", "Job", "Test"), # Password= including extension
("Job}}Name{{FTW", "Job}}Name{{FTW", None), # Both {{ and }} present but incorrect order (no password)
("./Text", "./Text", None), # Name would end up empty after the function strips the dot
],
)
def test_scan_password(self, argument, name, password):
assert nzbstuff.scan_password(argument) == (name, password)
@pytest.mark.parametrize(
"file_name, clean_file_name",
[
("my_awesome_nzb_file.pAr2.nZb", "my_awesome_nzb_file"),
("my_awesome_nzb_file.....pAr2.nZb", "my_awesome_nzb_file"),
("my_awesome_nzb_file....par2..", "my_awesome_nzb_file"),
(" my_awesome_nzb_file .pAr.nZb", "my_awesome_nzb_file"),
("with.extension.and.period.par2.", "with.extension.and.period"),
("nothing.in.here", "nothing.in.here"),
(" just.space ", "just.space"),
("http://test.par2 ", "http://test.par2"),
],
)
def test_create_work_name(self, file_name, clean_file_name):
# Only test stuff specific for create_work_name
# The sanitizing is already tested in tests for sanitize_foldername
assert nzbstuff.create_work_name(file_name) == clean_file_name
@pytest.mark.parametrize(
"subject, filename",
[
('Great stuff (001/143) - "Filename.txt" yEnc (1/1)', "Filename.txt"),
(
'"910a284f98ebf57f6a531cd96da48838.vol01-03.par2" yEnc (1/3)',
"910a284f98ebf57f6a531cd96da48838.vol01-03.par2",
),
('Subject-KrzpfTest [02/30] - ""KrzpfTest.part.nzb"" yEnc', "KrzpfTest.part.nzb"),
(
'[PRiVATE]-[WtFnZb]-[Supertje-_S03E11-12_-blabla_+_blabla_WEBDL-480p.mkv]-[4/12] - "" yEnc 9786 (1/1366)',
"Supertje-_S03E11-12_-blabla_+_blabla_WEBDL-480p.mkv",
),
(
'[N3wZ] MAlXD245333\\::[PRiVATE]-[WtFnZb]-[Show.S04E04.720p.AMZN.WEBRip.x264-GalaxyTV.mkv]-[1/2] - "" yEnc 293197257 (1/573)',
"Show.S04E04.720p.AMZN.WEBRip.x264-GalaxyTV.mkv",
),
(
'reftestnzb bf1664007a71 [1/6] - "20b9152c-57eb-4d02-9586-66e30b8e3ac2" yEnc (1/22) 15728640',
"20b9152c-57eb-4d02-9586-66e30b8e3ac2",
),
(
"Re: REQ Author Child's The Book-Thanks much - Child, Lee - Author - The Book.epub (1/1)",
"REQ Author Child's The Book-Thanks much - Child, Lee - Author - The Book.epub",
),
('63258-0[001/101] - "63258-2.0" yEnc (1/250) (1/250)', "63258-2.0"),
# If specified between ", the extension is allowed to be too long
('63258-0[001/101] - "63258-2.0toolong" yEnc (1/250) (1/250)', "63258-2.0toolong"),
(
"Singer - A Album (2005) - [04/25] - 02 Sweetest Somebody (I Know).flac",
"Singer - A Album (2005) - [04/25] - 02 Sweetest Somebody (I Know).flac",
),
("<>random!>", "<>random!>"),
("nZb]-[Supertje-_S03E11-12_", "nZb]-[Supertje-_S03E11-12_"),
("Bla [Now it's done.exe]", "Now it's done.exe"),
# If specified between [], the extension should be a valid one
("Bla [Now it's done.123nonsense]", "Bla [Now it's done.123nonsense]"),
('[PRiVATE]-[WtFnZb]-[00000.clpi]-[1/46] - "" yEnc 788 (1/1)', "00000.clpi"),
(
'[PRiVATE]-[WtFnZb]-[Video_(2001)_AC5.1_-RELEASE_[TAoE].mkv]-[1/23] - "" yEnc 1234567890 (1/23456)',
"Video_(2001)_AC5.1_-RELEASE_[TAoE].mkv",
),
(
"[PRiVATE]-[WtFnZb]-[219]-[1/series.name.s01e01.1080p.web.h264-group.mkv] - "
" yEnc (1/[PRiVATE] \\c2b510b594\\::686ea969999193.155368eba4965e56a8cd263382e012.f2712fdc::/97bd201cf931/) 1 (1/0)",
"series.name.s01e01.1080p.web.h264-group.mkv",
),
(
"[PRiVATE]-[WtFnZb]-[/More.Bla.S02E01.1080p.WEB.h264-EDITH[eztv.re].mkv-WtF[nZb]/"
'More.Bla.S02E01.1080p.WEB.h264-EDITH.mkv]-[1/2] - "" yEnc 2990558544 (1/4173)',
"More.Bla.S02E01.1080p.WEB.h264-EDITH[eztv.re].mkv",
),
],
)
def test_name_extractor(self, subject, filename):
assert nzbstuff.name_extractor(subject) == filename