Add missing typing hints to several files

This commit is contained in:
Safihre
2025-11-17 15:39:47 +01:00
parent 80f57a2b9a
commit 67a5a552fd
12 changed files with 56 additions and 52 deletions

View File

@@ -20,7 +20,7 @@ jobs:
builder/SABnzbd.spec
tests
--line-length=120
--target-version=py38
--target-version=py39
--check
--diff

View File

@@ -307,7 +307,7 @@ class OptionList(Option):
self,
section: str,
keyword: str,
default_val: Union[str, List, None] = None,
default_val: Union[str, list, None] = None,
validation: Optional[Callable] = None,
add: bool = True,
public: bool = True,
@@ -318,7 +318,7 @@ class OptionList(Option):
default_val = []
super().__init__(section, keyword, default_val, add=add, public=public, protect=protect)
def set(self, value: Union[str, List]) -> Optional[str]:
def set(self, value: Union[str, list]) -> Optional[str]:
"""Set the list given a comma-separated string or a list"""
error = None
if value is not None:

View File

@@ -70,7 +70,7 @@ def conditional_cache(cache_time: int):
Empty results (None, empty collections, empty strings, False, 0) are not cached.
If a keyword argument of `force=True` is used, the cache is skipped.
Unhashable types (such as List) can not be used as an input to the wrapped function in the current implementation!
Unhashable types (such as list) can not be used as an input to the wrapped function in the current implementation!
:param cache_time: Time in seconds to cache non-empty results
"""

View File

@@ -25,7 +25,7 @@ import subprocess
import time
import threading
import logging
from typing import Optional, Dict, List, Tuple
from typing import Optional
import sabnzbd
import sabnzbd.cfg as cfg

View File

@@ -33,7 +33,7 @@ import fnmatch
import stat
import ctypes
import random
from typing import Union, List, Tuple, Any, Dict, Optional, BinaryIO
from typing import Union, Any, Optional, BinaryIO
try:
import win32api
@@ -330,7 +330,7 @@ def sanitize_files(folder: Optional[str] = None, filelist: Optional[list[str]] =
return output_filelist
def strip_extensions(name: str, ext_to_remove: tuple[str, ...] = (".nzb", ".par", ".par2")):
def strip_extensions(name: str, ext_to_remove: tuple[str, ...] = (".nzb", ".par", ".par2")) -> str:
"""Strip extensions from a filename, without sanitizing the filename"""
name_base, ext = os.path.splitext(name)
while ext.lower() in ext_to_remove:
@@ -613,7 +613,7 @@ def make_script_path(script: str) -> Optional[str]:
return script_path
def get_admin_path(name: str, future: bool):
def get_admin_path(name: str, future: bool) -> str:
"""Return news-style full path to job-admin folder of names job
or else the old cache path
"""
@@ -660,7 +660,7 @@ def set_permissions(path: str, recursive: bool = True):
UNWANTED_FILE_PERMISSIONS = stat.S_ISUID | stat.S_ISGID | stat.S_IXUSR | stat.S_IXGRP | stat.S_IXOTH
def removexbits(path: str, custom_permissions: int = None):
def removexbits(path: str, custom_permissions: Optional[int] = None):
"""Remove all the x-bits from files, respecting current or custom permissions"""
if os.path.isfile(path):
# Use custom permissions as base
@@ -1033,7 +1033,7 @@ def diskspace(force: bool = False) -> dict[str, tuple[float, float]]:
}
def get_new_id(prefix, folder, check_list=None):
def get_new_id(prefix: str, folder: str, check_list: Optional[list] = None) -> str:
"""Return unique prefixed admin identifier within folder
optionally making sure that id is not in the check_list.
"""
@@ -1054,7 +1054,7 @@ def get_new_id(prefix, folder, check_list=None):
raise IOError
def save_data(data, _id, path, do_pickle=True, silent=False):
def save_data(data: Any, _id: str, path: str, do_pickle: bool = True, silent: bool = False):
"""Save data to a diskfile"""
if not silent:
logging.debug("[%s] Saving data for %s in %s", sabnzbd.misc.caller_name(), _id, path)
@@ -1081,7 +1081,7 @@ def save_data(data, _id, path, do_pickle=True, silent=False):
time.sleep(0.1)
def load_data(data_id, path, remove=True, do_pickle=True, silent=False):
def load_data(data_id: str, path: str, remove: bool = True, do_pickle: bool = True, silent: bool = False) -> Any:
"""Read data from disk file"""
path = os.path.join(path, data_id)
@@ -1129,7 +1129,7 @@ def save_admin(data: Any, data_id: str):
save_data(data, data_id, sabnzbd.cfg.admin_dir.get_path())
def load_admin(data_id: str, remove=False, silent=False) -> Any:
def load_admin(data_id: str, remove: bool = False, silent: bool = False) -> Any:
"""Read data in admin folder in specified format"""
logging.debug("[%s] Loading data for %s", sabnzbd.misc.caller_name(), data_id)
return load_data(data_id, sabnzbd.cfg.admin_dir.get_path(), remove=remove, silent=silent)
@@ -1196,7 +1196,7 @@ def purge_log_files():
logging.debug("Finished puring log files")
def directory_is_writable_with_file(mydir, myfilename):
def directory_is_writable_with_file(mydir: str, myfilename: str) -> bool:
filename = os.path.join(mydir, myfilename)
if os.path.exists(filename):
try:

View File

@@ -34,7 +34,7 @@ import copy
from random import randint
from xml.sax.saxutils import escape
from Cheetah.Template import Template
from typing import Optional, Callable, Union, Any, Dict, List
from typing import Optional, Callable, Union, Any
from guessit.api import properties as guessit_properties
import sabnzbd

View File

@@ -41,7 +41,7 @@ import math
import rarfile
from threading import Thread
from collections.abc import Iterable
from typing import Union, Tuple, Any, AnyStr, Optional, List, Dict, Collection
from typing import Union, Tuple, Any, AnyStr, Optional, Collection
import sabnzbd
import sabnzbd.getipaddress
@@ -190,7 +190,7 @@ def clean_comma_separated_list(inp: Any) -> list[str]:
return result_ids
def cmp(x, y):
def cmp(x: Any, y: Any) -> int:
"""
Replacement for built-in function cmp that was removed in Python 3
@@ -234,7 +234,7 @@ def cat_pp_script_sanitizer(
return cat, pp, script
def name_to_cat(fname, cat=None):
def name_to_cat(fname: str, cat: Optional[str] = None) -> tuple[str, Optional[str]]:
"""Retrieve category from file name, but only if "cat" is None."""
if cat is None and fname.startswith("{{"):
n = fname.find("}}")
@@ -246,7 +246,9 @@ def name_to_cat(fname, cat=None):
return fname, cat
def cat_to_opts(cat, pp=None, script=None, priority=None) -> tuple[str, int, str, int]:
def cat_to_opts(
cat: Optional[str], pp: Optional[int] = None, script: Optional[str] = None, priority: Optional[int] = None
) -> tuple[str, int, str, int]:
"""Derive options from category, if options not already defined.
Specified options have priority over category-options.
If no valid category is given, special category '*' will supply default values
@@ -331,12 +333,12 @@ _wildcard_to_regex = {
}
def wildcard_to_re(text):
def wildcard_to_re(text: str) -> str:
"""Convert plain wildcard string (with '*' and '?') to regex."""
return "".join([_wildcard_to_regex.get(ch, ch) for ch in text])
def convert_filter(text):
def convert_filter(text: str) -> Optional[re.Pattern]:
"""Return compiled regex.
If string starts with re: it's a real regex
else quote all regex specials, replace '*' by '.*'
@@ -353,7 +355,7 @@ def convert_filter(text):
return None
def cat_convert(cat):
def cat_convert(cat: Optional[str]) -> Optional[str]:
"""Convert indexer's category/group-name to user categories.
If no match found, but indexer-cat equals user-cat, then return user-cat
If no match found, but the indexer-cat starts with the user-cat, return user-cat
@@ -397,7 +399,7 @@ _SERVICE_KEY = "SYSTEM\\CurrentControlSet\\services\\"
_SERVICE_PARM = "CommandLine"
def get_serv_parms(service):
def get_serv_parms(service: str) -> list[str]:
"""Get the service command line parameters from Registry"""
service_parms = []
try:
@@ -416,7 +418,7 @@ def get_serv_parms(service):
return service_parms
def set_serv_parms(service, args):
def set_serv_parms(service: str, args: list) -> bool:
"""Set the service command line parameters in Registry"""
serv = []
for arg in args:
@@ -444,7 +446,7 @@ def get_from_url(url: str) -> Optional[str]:
return None
def convert_version(text):
def convert_version(text: str) -> tuple[int, bool]:
"""Convert version string to numerical value and a testversion indicator"""
version = 0
test = True
@@ -551,7 +553,7 @@ def check_latest_version():
)
def upload_file_to_sabnzbd(url, fp):
def upload_file_to_sabnzbd(url: str, fp: str):
"""Function for uploading nzbs to a running SABnzbd instance"""
try:
fp = urllib.parse.quote_plus(fp)
@@ -644,7 +646,7 @@ def to_units(val: Union[int, float], postfix="") -> str:
return f"{sign}{val:.{decimals}f}{units}"
def caller_name(skip=2):
def caller_name(skip: int = 2) -> str:
"""Get a name of a caller in the format module.method
Originally used: https://gist.github.com/techtonik/2151727
Adapted for speed by using sys calls directly
@@ -682,7 +684,7 @@ def exit_sab(value: int):
os._exit(value)
def split_host(srv):
def split_host(srv: Optional[str]) -> tuple[Optional[str], Optional[int]]:
"""Split host:port notation, allowing for IPV6"""
if not srv:
return None, None
@@ -704,7 +706,7 @@ def split_host(srv):
return out[0], port
def get_cache_limit():
def get_cache_limit() -> str:
"""Depending on OS, calculate cache limits.
In ArticleCache it will make sure we stay
within system limits for 32/64 bit
@@ -742,7 +744,7 @@ def get_cache_limit():
return ""
def get_windows_memory():
def get_windows_memory() -> int:
"""Use ctypes to extract available memory"""
class MEMORYSTATUSEX(ctypes.Structure):
@@ -768,14 +770,14 @@ def get_windows_memory():
return stat.ullTotalPhys
def get_macos_memory():
def get_macos_memory() -> float:
"""Use system-call to extract total memory on macOS"""
system_output = run_command(["sysctl", "hw.memsize"])
return float(system_output.split()[1])
@conditional_cache(cache_time=3600)
def get_cpu_name():
def get_cpu_name() -> Optional[str]:
"""Find the CPU name (which needs a different method per OS), and return it
If none found, return platform.platform()"""
@@ -875,7 +877,7 @@ def on_cleanup_list(filename: str, skip_nzb: bool = False) -> bool:
return False
def memory_usage():
def memory_usage() -> Optional[str]:
try:
# Probably only works on Linux because it uses /proc/<pid>/statm
with open("/proc/%d/statm" % os.getpid()) as t:
@@ -897,7 +899,7 @@ except Exception:
_HAVE_STATM = _PAGE_SIZE and memory_usage()
def loadavg():
def loadavg() -> str:
"""Return 1, 5 and 15 minute load average of host or "" if not supported"""
p = ""
if not sabnzbd.WINDOWS and not sabnzbd.MACOS:
@@ -972,7 +974,7 @@ def bool_conv(value: Any) -> bool:
return bool(int_conv(value))
def create_https_certificates(ssl_cert, ssl_key):
def create_https_certificates(ssl_cert: str, ssl_key: str) -> bool:
"""Create self-signed HTTPS certificates and store in paths 'ssl_cert' and 'ssl_key'"""
try:
from sabnzbd.utils.certgen import generate_key, generate_local_cert
@@ -1051,7 +1053,7 @@ def is_sample(filename: str) -> bool:
return bool(re.search(RE_SAMPLE, filename))
def find_on_path(targets):
def find_on_path(targets: Union[str, tuple[str, ...]]) -> Optional[str]:
"""Search the PATH for a program and return full path"""
if sabnzbd.WINDOWS:
paths = os.getenv("PATH").split(";")
@@ -1224,7 +1226,7 @@ def match_str(text: AnyStr, matches: tuple[AnyStr, ...]) -> Optional[AnyStr]:
return None
def recursive_html_escape(input_dict_or_list: Union[dict[str, Any], List], exclude_items: tuple[str, ...] = ()):
def recursive_html_escape(input_dict_or_list: Union[dict[str, Any], list], exclude_items: tuple[str, ...] = ()):
"""Recursively update the input_dict in-place with html-safe values"""
if isinstance(input_dict_or_list, (dict, list)):
if isinstance(input_dict_or_list, dict):
@@ -1259,7 +1261,9 @@ def list2cmdline_unrar(lst: list[str]) -> str:
return " ".join(nlst)
def build_and_run_command(command: list[str], windows_unrar_command: bool = False, text_mode: bool = True, **kwargs):
def build_and_run_command(
command: list[str], windows_unrar_command: bool = False, text_mode: bool = True, **kwargs
) -> subprocess.Popen:
"""Builds and then runs command with necessary flags and optional
IONice and Nice commands. Optional Popen arguments can be supplied.
On Windows we need to run our own list2cmdline for Unrar.
@@ -1326,7 +1330,7 @@ def build_and_run_command(command: list[str], windows_unrar_command: bool = Fals
return subprocess.Popen(command, **popen_kwargs)
def run_command(cmd: list[str], **kwargs):
def run_command(cmd: list[str], **kwargs) -> str:
"""Run simple external command and return output as a string."""
with build_and_run_command(cmd, **kwargs) as p:
txt = p.stdout.read()
@@ -1359,7 +1363,7 @@ def set_socks5_proxy():
socket.socket = socks.socksocket
def set_https_verification(value):
def set_https_verification(value: bool) -> bool:
"""Set HTTPS-verification state while returning current setting
False = disable verification
"""
@@ -1381,7 +1385,7 @@ def request_repair():
pass
def check_repair_request():
def check_repair_request() -> bool:
"""Return True if repair request found, remove afterwards"""
path = os.path.join(cfg.admin_dir.get_path(), REPAIR_REQUEST)
if os.path.exists(path):
@@ -1615,7 +1619,7 @@ class SABRarFile(rarfile.RarFile):
self._file_parser._info_list.append(rar_obj)
self._file_parser._info_map[rar_obj.filename.rstrip("/")] = rar_obj
def filelist(self):
def filelist(self) -> list[str]:
"""Return list of filenames in archive."""
return [f.filename for f in self.infolist() if not f.isdir()]

View File

@@ -29,7 +29,7 @@ import io
import shutil
import functools
import rarfile
from typing import BinaryIO, Optional, Any, Union
from typing import BinaryIO, Optional, Any, Union, Callable
import sabnzbd
from sabnzbd.encoding import correct_unknown_encoding, ubtou
@@ -866,7 +866,7 @@ def rar_extract_core(
##############################################################################
# 7Zip Functions
##############################################################################
def unseven(nzo: NzbObject, workdir_complete: str, one_folder: bool, sevens: list[str]):
def unseven(nzo: NzbObject, workdir_complete: str, one_folder: bool, sevens: list[str]) -> tuple[bool, list[str]]:
"""Unpack multiple sets '7z' of 7Zip files from 'download_path' to 'workdir_complete.
When 'delete' is set, originals will be deleted.
"""
@@ -1762,7 +1762,7 @@ def sfv_check(sfvs: list[str], nzo: NzbObject) -> bool:
return result
def parse_sfv(sfv_filename):
def parse_sfv(sfv_filename: str) -> dict[str, bytes]:
"""Parse SFV file and return dictionary of crc32's and filenames"""
results = {}
with open(sfv_filename, mode="rb") as sfv_list:
@@ -1787,12 +1787,12 @@ def add_time_left(perc: float, start_time: Optional[float] = None, time_used: Op
return ""
def pre_queue(nzo: NzbObject, pp, cat):
def pre_queue(nzo: NzbObject, pp: str, cat: str) -> list[Any]:
"""Run pre-queue script (if any) and process results.
pp and cat are supplied separate since they can change.
"""
def fix(p):
def fix(p: Any) -> str:
# If added via API, some items can still be "None" (as a string)
if is_none(p):
return ""
@@ -1909,6 +1909,6 @@ class SevenZip:
p.wait()
return data
def close(self):
def close(self) -> None:
"""Close file"""
pass

View File

@@ -31,7 +31,7 @@ import http.client
import json
import apprise
from threading import Thread
from typing import Optional, Dict, Union
from typing import Optional, Union
import sabnzbd
import sabnzbd.cfg

View File

@@ -30,7 +30,7 @@ import zipfile
import tempfile
import cherrypy._cpreqbody
from typing import Optional, Dict, Any, Union, List, Tuple
from typing import Optional, Any, Union
import sabnzbd
from sabnzbd import nzbstuff

View File

@@ -25,7 +25,7 @@ import re
import guessit
from rebulk.match import MatchesDict
from string import whitespace, punctuation
from typing import Optional, Union, List, Tuple, Dict
from typing import Optional, Union
import sabnzbd
from sabnzbd.filesystem import (

View File

@@ -22,7 +22,7 @@ import io
import os
import time
from http.client import RemoteDisconnected
from typing import BinaryIO, Optional, Dict, List
from typing import BinaryIO, Optional
import pytest
from random import choice, randint