Use Event's to handle Post Processing queue

See #3209
This commit is contained in:
Safihre
2025-12-01 15:21:20 +01:00
parent a4657e2bd3
commit 75be6b5850
4 changed files with 53 additions and 22 deletions

View File

@@ -781,12 +781,12 @@ def _api_watched_now(name: str, kwargs: dict[str, Union[str, list[str]]]) -> byt
def _api_resume_pp(name: str, kwargs: dict[str, Union[str, list[str]]]) -> bytes:
sabnzbd.PostProcessor.paused = False
sabnzbd.PostProcessor.resume()
return report()
def _api_pause_pp(name: str, kwargs: dict[str, Union[str, list[str]]]) -> bytes:
sabnzbd.PostProcessor.paused = True
sabnzbd.PostProcessor.pause()
return report()

View File

@@ -409,8 +409,9 @@ class Downloader(Thread):
@NzbQueueLocker
def resume_from_postproc(self):
logging.info("Post-processing finished, resuming download")
self.paused_for_postproc = False
if self.paused_for_postproc:
logging.info("Post-processing finished, resuming download")
self.paused_for_postproc = False
@NzbQueueLocker
def disconnect(self):

View File

@@ -39,7 +39,7 @@ from sabnzbd.newsunpack import (
rar_sort,
is_sfv_file,
)
from threading import Thread
from threading import Thread, Event
from sabnzbd.misc import (
on_cleanup_list,
is_sample,
@@ -116,6 +116,9 @@ class PostProcessor(Thread):
# Regular queue for jobs that might need more attention
self.slow_queue: queue.Queue[Optional[NzbObject]] = queue.Queue()
# Event to signal when work is available or state changes
self.work_available = Event()
# Load all old jobs
for nzo in self.history_queue:
self.process(nzo)
@@ -180,6 +183,9 @@ class PostProcessor(Thread):
self.save()
history_updated()
# Signal that work is available
self.work_available.set()
def remove(self, nzo: NzbObject):
"""Remove given nzo from the queue"""
try:
@@ -192,8 +198,20 @@ class PostProcessor(Thread):
def stop(self):
"""Stop thread after finishing running job"""
self.__stop = True
self.slow_queue.put(None)
self.fast_queue.put(None)
# Wake up the processor thread to check stop flag
self.work_available.set()
def pause(self):
"""Pause post-processing"""
self.paused = True
logging.info("Pausing post-processing")
def resume(self):
"""Resume post-processing"""
self.paused = False
logging.info("Resuming post-processing")
# Wake up the processor thread
self.work_available.set()
def cancel_pp(self, nzo_ids: list[str]) -> Optional[bool]:
"""Abort Direct Unpack and change the status, so that the PP is canceled"""
@@ -265,27 +283,39 @@ class PostProcessor(Thread):
while not self.__stop:
self.__busy = False
if self.paused:
time.sleep(5)
continue
# Set NzbObject object to None so references from this thread do not keep the
# object alive until the next job is added to post-processing (see #1628)
nzo = None
# Wait for work to be available (no timeout!)
self.work_available.wait()
self.work_available.clear() # Reset for next iteration
# Check if we should stop
if self.__stop:
break
# If paused, loop back
if self.paused:
continue
# If queues are empty (spurious wake or race condition), loop back
if self.slow_queue.empty() and self.fast_queue.empty():
continue
# Something in the fast queue?
try:
# Every few fast-jobs we should check allow a
# Every few fast-jobs we should allow a
# slow job so that they don't wait forever
if self.__fast_job_count >= MAX_FAST_JOB_COUNT and self.slow_queue.qsize():
raise queue.Empty
nzo = self.fast_queue.get(timeout=2)
nzo = self.fast_queue.get_nowait()
self.__fast_job_count += 1
except queue.Empty:
# Try the slow queue
try:
nzo = self.slow_queue.get(timeout=2)
nzo = self.slow_queue.get_nowait()
# Reset fast-counter
self.__fast_job_count = 0
except queue.Empty:
@@ -296,10 +326,6 @@ class PostProcessor(Thread):
# No fast or slow jobs, better luck next loop!
continue
# Stop job
if not nzo:
continue
# Job was already deleted.
if not nzo.work_name:
check_eoq = True
@@ -328,7 +354,7 @@ class PostProcessor(Thread):
self.external_process = None
check_eoq = True
# Allow download to proceed
# Allow download to proceed if it was paused for post-processing
sabnzbd.Downloader.resume_from_postproc()

View File

@@ -337,7 +337,11 @@ class Scheduler:
sabnzbd.downloader.unpause_all()
sabnzbd.Downloader.set_paused_state(paused or paused_all)
sabnzbd.PostProcessor.paused = pause_post
# Handle pause_post state with proper notification
if pause_post and not sabnzbd.PostProcessor.paused:
sabnzbd.PostProcessor.pause()
elif not pause_post and sabnzbd.PostProcessor.paused:
sabnzbd.PostProcessor.resume()
if speedlimit is not None:
sabnzbd.Downloader.limit_speed(speedlimit)
@@ -506,11 +510,11 @@ def sort_schedules(all_events, now=None):
def pp_pause():
sabnzbd.PostProcessor.paused = True
sabnzbd.PostProcessor.pause()
def pp_resume():
sabnzbd.PostProcessor.paused = False
sabnzbd.PostProcessor.resume()
def enable_server(server):