mirror of
https://github.com/calibrain/shelfmark.git
synced 2026-04-20 05:51:21 -04:00
## Changelog ### 🌐 Network Resilience - **Auto DNS rotation**: New `CUSTOM_DNS=auto` mode (now default) starts with system DNS and automatically rotates through Cloudflare, Google, Quad9, and OpenDNS when failures are detected. DNS results are cached to improve performance. - **Mirror failover**: Anna's Archive requests automatically fail over between mirrors (.org, .se, .li) when one is unreachable - **Round-robin source distribution**: Concurrent downloads are distributed across different AA partner servers to avoid rate limiting ### 📥 Download Reliability - **Much more reliable downloads**: Improved parsing of Anna's Archive pages, smarter source prioritization, and better retry logic with exponential backoff - **Download resume support**: Interrupted downloads can now resume from where they left off (if the server supports Range requests) - **Cookie sharing**: Cloudflare bypass cookies are extracted and shared with subsequent requests, often avoiding the need for re-bypass entirely - **Stall detection**: Downloads with no progress for 5 minutes are automatically cancelled and retried - **Staggered concurrent downloads**: Small delays between starting concurrent downloads to avoid hitting rate limits - **Source failure tracking**: After multiple failures from the same source type (e.g., Libgen), that source is temporarily skipped - **Lazy welib loading**: Welib sources are fetched as a fallback only when primary sources fail (unless `PRIORITIZE_WELIB` is enabled) ### 🛡️ Cloudflare & Protection Bypass - **DDOS-Guard support**: Internal bypasser now detects and handles DDOS-Guard challenges with dedicated bypass strategies - **Cancellation support**: Bypass operations can now be cancelled mid-operation when user cancels a download - **Smart warmup**: Chrome driver is pre-warmed when first client connects (controlled by `BYPASS_WARMUP_ON_CONNECT` env var) and shuts down after periods of inactivity ### 🔌 External Bypasser (FlareSolverr) - **Improved resilience**: Retry with exponential backoff, mirror/DNS rotation on failure, and proper timeout handling - **Cancellation support**: External bypasser operations respect cancellation flags ### 🖥️ Web UI Improvements - **Simplified download status**: Removed intermediate states (bypassing, verifying, ingesting) — now just shows Queued → Resolving → Downloading → Complete - **Status messages**: Downloads show detailed status like "Trying Anna's Archive (Server 3)" or "Server busy, trying next...", or live waitlist countdowns. - **Improved download sidebar**: - Downloads sorted by add time (newest first) - X button moved to top-right corner for better UX - Wave animation on in-progress items - Error messages shown directly on failed items - X button on completed/errored items clears them from the list ### ⚙️ Configuration Changes - **`CUSTOM_DNS=auto`** is now the default (previously empty/system DNS) - **`DOWNLOAD_PROGRESS_UPDATE_INTERVAL`** default changed from 5s to 1s for smoother progress - **`BYPASS_WARMUP_ON_CONNECT`** (default: true) — warm up Chrome when first client connects ### 🐛 Bug Fixes - **Download cancellation actually works**: Fixed issue where cancelling downloads didn't properly stop in-progress operations - **WELIB prioritization**: Fixed `PRIORITIZE_WELIB` not being respected - **File exists handling**: Downloads to same filename now get `_1`, `_2` suffix instead of overwriting - **Empty search results**: "No books found" now returns empty list instead of throwing exception - **Search unavailable error**: Network/mirror failures during search now return proper 503 error to client
514 lines
19 KiB
Python
514 lines
19 KiB
Python
"""Backend logic for the book download application."""
|
|
|
|
import os
|
|
import random
|
|
import shutil
|
|
import subprocess
|
|
import threading
|
|
import time
|
|
from concurrent.futures import Future, ThreadPoolExecutor
|
|
from pathlib import Path
|
|
from threading import Event, Lock
|
|
from typing import Any, Dict, List, Optional, Tuple
|
|
|
|
import book_manager
|
|
from book_manager import SearchUnavailable
|
|
from config import CUSTOM_SCRIPT
|
|
from env import (
|
|
DOWNLOAD_PATHS, DOWNLOAD_PROGRESS_UPDATE_INTERVAL, INGEST_DIR,
|
|
MAIN_LOOP_SLEEP_TIME, MAX_CONCURRENT_DOWNLOADS, TMP_DIR, USE_BOOK_TITLE,
|
|
)
|
|
from logger import setup_logger
|
|
from models import BookInfo, QueueStatus, SearchFilters, book_queue
|
|
|
|
logger = setup_logger(__name__)
|
|
|
|
# WebSocket manager (initialized by app.py)
|
|
try:
|
|
from websocket_manager import ws_manager
|
|
except ImportError:
|
|
ws_manager = None
|
|
|
|
# Progress update throttling - track last broadcast time per book
|
|
_progress_last_broadcast: Dict[str, float] = {}
|
|
_progress_lock = Lock()
|
|
|
|
# Stall detection - track last activity time per download
|
|
_last_activity: Dict[str, float] = {}
|
|
STALL_TIMEOUT = 300 # 5 minutes without progress/status update = stalled
|
|
|
|
def search_books(query: str, filters: SearchFilters) -> List[Dict[str, Any]]:
|
|
"""Search for books matching the query.
|
|
|
|
Args:
|
|
query: Search term
|
|
filters: Search filters object
|
|
|
|
Returns:
|
|
List[Dict]: List of book information dictionaries
|
|
"""
|
|
try:
|
|
books = book_manager.search_books(query, filters)
|
|
return [_book_info_to_dict(book) for book in books]
|
|
except SearchUnavailable as e:
|
|
logger.warning(f"Search unavailable: {e}")
|
|
raise
|
|
except Exception as e:
|
|
logger.error_trace(f"Error searching books: {e}")
|
|
return []
|
|
|
|
def get_book_info(book_id: str) -> Optional[Dict[str, Any]]:
|
|
"""Get detailed information for a specific book.
|
|
|
|
Args:
|
|
book_id: Book identifier
|
|
|
|
Returns:
|
|
Optional[Dict]: Book information dictionary if found
|
|
"""
|
|
try:
|
|
book = book_manager.get_book_info(book_id)
|
|
return _book_info_to_dict(book)
|
|
except Exception as e:
|
|
logger.error_trace(f"Error getting book info: {e}")
|
|
return None
|
|
|
|
def queue_book(book_id: str, priority: int = 0) -> bool:
|
|
"""Add a book to the download queue with specified priority.
|
|
|
|
Args:
|
|
book_id: Book identifier
|
|
priority: Priority level (lower number = higher priority)
|
|
|
|
Returns:
|
|
bool: True if book was successfully queued
|
|
"""
|
|
try:
|
|
book_info = book_manager.get_book_info(book_id)
|
|
book_queue.add(book_id, book_info, priority)
|
|
logger.info(f"Book queued with priority {priority}: {book_info.title}")
|
|
|
|
# Broadcast status update via WebSocket
|
|
if ws_manager:
|
|
ws_manager.broadcast_status_update(queue_status())
|
|
|
|
return True
|
|
except Exception as e:
|
|
logger.error_trace(f"Error queueing book: {e}")
|
|
return False
|
|
|
|
def queue_status() -> Dict[str, Dict[str, Any]]:
|
|
"""Get current status of the download queue.
|
|
|
|
Returns:
|
|
Dict: Queue status organized by status type with serialized book data
|
|
"""
|
|
status = book_queue.get_status()
|
|
for _, books in status.items():
|
|
for _, book_info in books.items():
|
|
if book_info.download_path:
|
|
if not os.path.exists(book_info.download_path):
|
|
book_info.download_path = None
|
|
|
|
# Convert Enum keys to strings and BookInfo objects to dicts for JSON serialization
|
|
return {
|
|
status_type.value: {
|
|
book_id: _book_info_to_dict(book_info)
|
|
for book_id, book_info in books.items()
|
|
}
|
|
for status_type, books in status.items()
|
|
}
|
|
|
|
def get_book_data(book_id: str) -> Tuple[Optional[bytes], BookInfo]:
|
|
"""Get book data for a specific book, including its title.
|
|
|
|
Args:
|
|
book_id: Book identifier
|
|
|
|
Returns:
|
|
Tuple[Optional[bytes], str]: Book data if available, and the book title
|
|
"""
|
|
try:
|
|
book_info = book_queue._book_data[book_id]
|
|
path = book_info.download_path
|
|
with open(path, "rb") as f:
|
|
return f.read(), book_info
|
|
except Exception as e:
|
|
logger.error_trace(f"Error getting book data: {e}")
|
|
if book_info:
|
|
book_info.download_path = None
|
|
return None, book_info if book_info else BookInfo(id=book_id, title="Unknown")
|
|
|
|
def _book_info_to_dict(book: BookInfo) -> Dict[str, Any]:
|
|
"""Convert BookInfo object to dictionary representation."""
|
|
return {
|
|
key: value for key, value in book.__dict__.items()
|
|
if value is not None
|
|
}
|
|
|
|
def _prepare_download_folder(book_info: BookInfo) -> Path:
|
|
"""Prepare final content-type subdir"""
|
|
content = book_info.content
|
|
content_dir = DOWNLOAD_PATHS.get(content) if content and content in DOWNLOAD_PATHS else INGEST_DIR
|
|
os.makedirs(content_dir, exist_ok=True)
|
|
return content_dir
|
|
|
|
def _download_book_with_cancellation(book_id: str, cancel_flag: Event) -> Optional[str]:
|
|
"""Download and process a book with cancellation support.
|
|
|
|
Args:
|
|
book_id: Book identifier
|
|
cancel_flag: Threading event to signal cancellation
|
|
|
|
Returns:
|
|
str: Path to the downloaded book if successful, None otherwise
|
|
"""
|
|
try:
|
|
# Check for cancellation before starting
|
|
if cancel_flag.is_set():
|
|
logger.info(f"Download cancelled before starting: {book_id}")
|
|
return None
|
|
|
|
book_info = book_queue._book_data[book_id]
|
|
logger.info(f"Starting download: {book_info.title}")
|
|
|
|
if not book_info.download_urls:
|
|
raise ValueError(f"No download URLs available for {book_id}")
|
|
|
|
# get_filename() resolves format as side effect
|
|
full_name = book_info.get_filename()
|
|
book_name = full_name if USE_BOOK_TITLE else f"{book_id}.{book_info.format or 'bin'}"
|
|
book_path = TMP_DIR / book_name
|
|
|
|
# Check cancellation before download
|
|
if cancel_flag.is_set():
|
|
logger.info(f"Download cancelled before book manager call: {book_id}")
|
|
return None
|
|
|
|
progress_callback = lambda progress: update_download_progress(book_id, progress)
|
|
status_callback = lambda status, message=None: update_download_status(book_id, status, message)
|
|
|
|
# Set status to resolving immediately when processing starts
|
|
update_download_status(book_id, "resolving")
|
|
|
|
success_download_url = book_manager.download_book(book_info, book_path, progress_callback, cancel_flag, status_callback)
|
|
|
|
# Stop progress updates
|
|
cancel_flag.wait(0.1) # Brief pause for progress thread cleanup
|
|
|
|
if cancel_flag.is_set():
|
|
logger.info(f"Download cancelled during download: {book_id}")
|
|
# Clean up partial download
|
|
if book_path.exists():
|
|
book_path.unlink()
|
|
return None
|
|
|
|
if not success_download_url:
|
|
raise Exception("Unknown error downloading book")
|
|
|
|
# Check cancellation before post-processing
|
|
if cancel_flag.is_set():
|
|
logger.info(f"Download cancelled before post-processing: {book_id}")
|
|
if book_path.exists():
|
|
book_path.unlink()
|
|
return None
|
|
|
|
logger.debug(f"Post-processing download: {book_info.title}")
|
|
|
|
if CUSTOM_SCRIPT:
|
|
logger.info(f"Running custom script: {CUSTOM_SCRIPT}")
|
|
subprocess.run([CUSTOM_SCRIPT, book_path])
|
|
|
|
# Regenerate filename with fallback to successful download URL for format
|
|
full_name = book_info.get_filename(success_download_url)
|
|
book_name = full_name if USE_BOOK_TITLE else f"{book_id}.{book_info.format or 'bin'}"
|
|
|
|
final_dir = _prepare_download_folder(book_info)
|
|
intermediate_path = final_dir / f"{book_id}.crdownload"
|
|
final_path = final_dir / book_name
|
|
|
|
# Handle file already exists - add suffix to avoid overwrite
|
|
if final_path.exists():
|
|
base = final_path.stem
|
|
ext = final_path.suffix
|
|
counter = 1
|
|
while final_path.exists():
|
|
final_path = final_dir / f"{base}_{counter}{ext}"
|
|
counter += 1
|
|
logger.info(f"File already exists, saving as: {final_path.name}")
|
|
|
|
if os.path.exists(book_path):
|
|
logger.info(f"Moving book to ingest directory: {book_path} -> {final_path}")
|
|
try:
|
|
shutil.move(book_path, intermediate_path)
|
|
except Exception as e:
|
|
try:
|
|
logger.debug(f"Error moving book: {e}, will try copying instead")
|
|
shutil.move(book_path, intermediate_path)
|
|
except Exception as e:
|
|
logger.debug(f"Error copying book: {e}, will try copying without permissions instead")
|
|
shutil.copyfile(book_path, intermediate_path)
|
|
os.remove(book_path)
|
|
|
|
# Final cancellation check before completing
|
|
if cancel_flag.is_set():
|
|
logger.info(f"Download cancelled before final rename: {book_id}")
|
|
if intermediate_path.exists():
|
|
intermediate_path.unlink()
|
|
return None
|
|
|
|
os.rename(intermediate_path, final_path)
|
|
logger.info(f"Download completed successfully: {book_info.title}")
|
|
|
|
return str(final_path)
|
|
except Exception as e:
|
|
if cancel_flag.is_set():
|
|
logger.info(f"Download cancelled during error handling: {book_id}")
|
|
else:
|
|
logger.error_trace(f"Error downloading book: {e}")
|
|
return None
|
|
|
|
def update_download_progress(book_id: str, progress: float) -> None:
|
|
"""Update download progress with throttled WebSocket broadcasts.
|
|
|
|
Progress is always stored in the queue, but WebSocket broadcasts are
|
|
throttled to avoid flooding clients with updates. Broadcasts occur:
|
|
- At most once per DOWNLOAD_PROGRESS_UPDATE_INTERVAL seconds
|
|
- Always at 0% (start) and 100% (complete)
|
|
- On significant progress jumps (>10%)
|
|
"""
|
|
book_queue.update_progress(book_id, progress)
|
|
|
|
# Track activity for stall detection
|
|
with _progress_lock:
|
|
_last_activity[book_id] = time.time()
|
|
|
|
# Broadcast progress via WebSocket with throttling
|
|
if ws_manager:
|
|
current_time = time.time()
|
|
should_broadcast = False
|
|
|
|
with _progress_lock:
|
|
last_broadcast = _progress_last_broadcast.get(book_id, 0)
|
|
last_progress = _progress_last_broadcast.get(f"{book_id}_progress", 0)
|
|
time_elapsed = current_time - last_broadcast
|
|
|
|
# Always broadcast at start (0%) or completion (>=99%)
|
|
if progress <= 1 or progress >= 99:
|
|
should_broadcast = True
|
|
# Broadcast if enough time has passed (convert interval from seconds)
|
|
elif time_elapsed >= DOWNLOAD_PROGRESS_UPDATE_INTERVAL:
|
|
should_broadcast = True
|
|
# Broadcast on significant progress jumps (>10%)
|
|
elif progress - last_progress >= 10:
|
|
should_broadcast = True
|
|
|
|
if should_broadcast:
|
|
_progress_last_broadcast[book_id] = current_time
|
|
_progress_last_broadcast[f"{book_id}_progress"] = progress
|
|
|
|
if should_broadcast:
|
|
ws_manager.broadcast_download_progress(book_id, progress, 'downloading')
|
|
|
|
def update_download_status(book_id: str, status: str, message: Optional[str] = None) -> None:
|
|
"""Update download status with optional detailed message.
|
|
|
|
Args:
|
|
book_id: Book identifier
|
|
status: Status string (e.g., 'resolving', 'downloading')
|
|
message: Optional detailed status message for UI display
|
|
"""
|
|
# Map string status to QueueStatus enum
|
|
status_map = {
|
|
'queued': QueueStatus.QUEUED,
|
|
'resolving': QueueStatus.RESOLVING,
|
|
'downloading': QueueStatus.DOWNLOADING,
|
|
'complete': QueueStatus.COMPLETE,
|
|
'available': QueueStatus.AVAILABLE,
|
|
'error': QueueStatus.ERROR,
|
|
'done': QueueStatus.DONE,
|
|
'cancelled': QueueStatus.CANCELLED,
|
|
}
|
|
|
|
queue_status_enum = status_map.get(status.lower())
|
|
if queue_status_enum:
|
|
book_queue.update_status(book_id, queue_status_enum)
|
|
|
|
# Track activity for stall detection
|
|
with _progress_lock:
|
|
_last_activity[book_id] = time.time()
|
|
|
|
# Update status message if provided (empty string clears the message)
|
|
if message is not None:
|
|
book_queue.update_status_message(book_id, message)
|
|
|
|
# Broadcast status update via WebSocket
|
|
if ws_manager:
|
|
ws_manager.broadcast_status_update(queue_status())
|
|
|
|
def cancel_download(book_id: str) -> bool:
|
|
"""Cancel a download.
|
|
|
|
Args:
|
|
book_id: Book identifier to cancel
|
|
|
|
Returns:
|
|
bool: True if cancellation was successful
|
|
"""
|
|
result = book_queue.cancel_download(book_id)
|
|
|
|
# Broadcast status update via WebSocket
|
|
if result and ws_manager and ws_manager.is_enabled():
|
|
ws_manager.broadcast_status_update(queue_status())
|
|
|
|
return result
|
|
|
|
def set_book_priority(book_id: str, priority: int) -> bool:
|
|
"""Set priority for a queued book.
|
|
|
|
Args:
|
|
book_id: Book identifier
|
|
priority: New priority level (lower = higher priority)
|
|
|
|
Returns:
|
|
bool: True if priority was successfully changed
|
|
"""
|
|
return book_queue.set_priority(book_id, priority)
|
|
|
|
def reorder_queue(book_priorities: Dict[str, int]) -> bool:
|
|
"""Bulk reorder queue.
|
|
|
|
Args:
|
|
book_priorities: Dict mapping book_id to new priority
|
|
|
|
Returns:
|
|
bool: True if reordering was successful
|
|
"""
|
|
return book_queue.reorder_queue(book_priorities)
|
|
|
|
def get_queue_order() -> List[Dict[str, any]]:
|
|
"""Get current queue order for display."""
|
|
return book_queue.get_queue_order()
|
|
|
|
def get_active_downloads() -> List[str]:
|
|
"""Get list of currently active downloads."""
|
|
return book_queue.get_active_downloads()
|
|
|
|
def clear_completed() -> int:
|
|
"""Clear all completed downloads from tracking."""
|
|
return book_queue.clear_completed()
|
|
|
|
def _cleanup_progress_tracking(book_id: str) -> None:
|
|
"""Clean up progress tracking data for a completed/cancelled download."""
|
|
with _progress_lock:
|
|
_progress_last_broadcast.pop(book_id, None)
|
|
_progress_last_broadcast.pop(f"{book_id}_progress", None)
|
|
_last_activity.pop(book_id, None)
|
|
|
|
def _process_single_download(book_id: str, cancel_flag: Event) -> None:
|
|
"""Process a single download job."""
|
|
try:
|
|
# Status will be updated through callbacks during download process
|
|
# (resolving -> downloading -> complete)
|
|
download_path = _download_book_with_cancellation(book_id, cancel_flag)
|
|
|
|
# Clean up progress tracking
|
|
_cleanup_progress_tracking(book_id)
|
|
|
|
if cancel_flag.is_set():
|
|
book_queue.update_status(book_id, QueueStatus.CANCELLED)
|
|
# Broadcast cancellation
|
|
if ws_manager:
|
|
ws_manager.broadcast_status_update(queue_status())
|
|
return
|
|
|
|
if download_path:
|
|
book_queue.update_download_path(book_id, download_path)
|
|
new_status = QueueStatus.COMPLETE
|
|
else:
|
|
new_status = QueueStatus.ERROR
|
|
|
|
book_queue.update_status(book_id, new_status)
|
|
|
|
# Broadcast final status (completed or error)
|
|
if ws_manager:
|
|
ws_manager.broadcast_status_update(queue_status())
|
|
|
|
|
|
except Exception as e:
|
|
# Clean up progress tracking even on error
|
|
_cleanup_progress_tracking(book_id)
|
|
|
|
if not cancel_flag.is_set():
|
|
logger.error_trace(f"Error in download processing: {e}")
|
|
book_queue.update_status(book_id, QueueStatus.ERROR)
|
|
# Set error message if not already set by download_book()
|
|
if book_id in book_queue._book_data and not book_queue._book_data[book_id].status_message:
|
|
book_queue.update_status_message(book_id, f"Download failed: {type(e).__name__}: {str(e)}")
|
|
else:
|
|
logger.info(f"Download cancelled: {book_id}")
|
|
book_queue.update_status(book_id, QueueStatus.CANCELLED)
|
|
|
|
# Broadcast error/cancelled status
|
|
if ws_manager:
|
|
ws_manager.broadcast_status_update(queue_status())
|
|
|
|
def concurrent_download_loop() -> None:
|
|
"""Main download coordinator using ThreadPoolExecutor for concurrent downloads."""
|
|
logger.info(f"Starting concurrent download loop with {MAX_CONCURRENT_DOWNLOADS} workers")
|
|
|
|
with ThreadPoolExecutor(max_workers=MAX_CONCURRENT_DOWNLOADS, thread_name_prefix="BookDownload") as executor:
|
|
active_futures: Dict[Future, str] = {} # Track active download futures
|
|
|
|
while True:
|
|
# Clean up completed futures
|
|
completed_futures = [f for f in active_futures if f.done()]
|
|
for future in completed_futures:
|
|
book_id = active_futures.pop(future)
|
|
try:
|
|
future.result() # This will raise any exceptions from the worker
|
|
except Exception as e:
|
|
logger.error_trace(f"Future exception for {book_id}: {e}")
|
|
|
|
# Check for stalled downloads (no activity in STALL_TIMEOUT seconds)
|
|
current_time = time.time()
|
|
with _progress_lock:
|
|
for future, book_id in list(active_futures.items()):
|
|
last_active = _last_activity.get(book_id, current_time)
|
|
if current_time - last_active > STALL_TIMEOUT:
|
|
logger.warning(f"Download stalled for {book_id}, cancelling")
|
|
book_queue.cancel_download(book_id)
|
|
book_queue.update_status_message(book_id, f"Download stalled (no activity for {STALL_TIMEOUT}s)")
|
|
|
|
# Start new downloads if we have capacity
|
|
while len(active_futures) < MAX_CONCURRENT_DOWNLOADS:
|
|
next_download = book_queue.get_next()
|
|
if not next_download:
|
|
break
|
|
|
|
# Stagger concurrent downloads to avoid rate limiting on shared download servers
|
|
# Only delay if other downloads are already active
|
|
if active_futures:
|
|
stagger_delay = random.uniform(2, 5)
|
|
logger.debug(f"Staggering download start by {stagger_delay:.1f}s")
|
|
time.sleep(stagger_delay)
|
|
|
|
book_id, cancel_flag = next_download
|
|
|
|
# Submit download job to thread pool
|
|
future = executor.submit(_process_single_download, book_id, cancel_flag)
|
|
active_futures[future] = book_id
|
|
|
|
# Brief sleep to prevent busy waiting
|
|
time.sleep(MAIN_LOOP_SLEEP_TIME)
|
|
|
|
# Start concurrent download coordinator
|
|
download_coordinator_thread = threading.Thread(
|
|
target=concurrent_download_loop,
|
|
daemon=True,
|
|
name="DownloadCoordinator"
|
|
)
|
|
download_coordinator_thread.start()
|
|
|
|
logger.info(f"Download system initialized with {MAX_CONCURRENT_DOWNLOADS} concurrent workers")
|