mirror of
https://github.com/navidrome/navidrome.git
synced 2026-04-17 04:59:37 -04:00
* refactor: move playlist business logic from repositories to core.Playlists service Move authorization, permission checks, and orchestration logic from playlist repositories to the core.Playlists service, following the existing pattern used by core.Share and core.Library. Changes: - Expand core.Playlists interface with read, mutation, track management, and REST adapter methods - Add playlistRepositoryWrapper for REST Save/Update/Delete with permission checks (follows Share/Library pattern) - Simplify persistence/playlist_repository.go: remove isWritable(), auth checks from Delete()/Put()/updatePlaylist() - Simplify persistence/playlist_track_repository.go: remove isTracksEditable() and permission checks from Add/Delete/Reorder - Update Subsonic API handlers to route through service - Update Native API handlers to accept core.Playlists instead of model.DataStore * test: add coverage for playlist service methods and REST wrapper Add 30 new tests covering the service methods added during the playlist refactoring: - Delete: owner, admin, denied, not found - Create: new playlist, replace tracks, admin bypass, denied, not found - AddTracks: owner, admin, denied, smart playlist, not found - RemoveTracks: owner, smart playlist denied, non-owner denied - ReorderTrack: owner, smart playlist denied - NewRepository wrapper: Save (owner assignment, ID clearing), Update (owner, admin, denied, ownership change, not found), Delete (delegation with permission checks) Expand mockedPlaylistRepo with Get, Delete, Tracks, GetWithTracks, and rest.Persistable methods. Add mockedPlaylistTrackRepo for track operation verification. * fix: add authorization check to playlist Update method Added ownership verification to the Subsonic Update endpoint in the playlist service layer. The authorization check was present in the old repository code but was not carried over during the refactoring to the service layer, allowing any authenticated user to modify playlists they don't own via the Subsonic API. Also added corresponding tests for the Update method's permission logic. * refactor: improve playlist permission checks and error handling, add e2e tests Signed-off-by: Deluan <deluan@navidrome.org> * refactor: rename core.Playlists to playlists package and update references Signed-off-by: Deluan <deluan@navidrome.org> * refactor: rename playlists_internal_test.go to parse_m3u_test.go and update tests; add new parse_nsp.go and rest_adapter.go files Signed-off-by: Deluan <deluan@navidrome.org> * fix: block track mutations on smart playlists in Create and Update Create now rejects replacing tracks on smart playlists (pre-existing gap). Update now uses checkTracksEditable instead of checkWritable when track changes are requested, restoring the protection that was removed from the repository layer during the refactoring. Metadata-only updates on smart playlists remain allowed. * test: add smart playlist protection tests to ensure readonly behavior and mutation restrictions * refactor: optimize track removal and renumbering in playlists Signed-off-by: Deluan <deluan@navidrome.org> * refactor: implement track reordering in playlists with SQL updates Signed-off-by: Deluan <deluan@navidrome.org> * refactor: wrap track deletion and reordering in transactions for consistency Signed-off-by: Deluan <deluan@navidrome.org> * refactor: remove unused getTracks method from playlistTrackRepository Signed-off-by: Deluan <deluan@navidrome.org> * refactor: optimize playlist track renumbering with CTE-based UPDATE Replace the DELETE + re-INSERT renumbering strategy with a two-step UPDATE approach using a materialized CTE and ROW_NUMBER() window function. The previous approach (SELECT all IDs, DELETE all tracks, re-INSERT in chunks of 200) required 13 SQL operations for a 2000-track playlist. The new approach uses just 2 UPDATEs: first negating all IDs to clear the positive space, then assigning sequential positions via UPDATE...FROM with a CTE. This avoids the UNIQUE constraint violations that affected the original correlated subquery while reducing per-delete request time from ~110ms to ~12ms on a 2000-track playlist. Signed-off-by: Deluan <deluan@navidrome.org> * refactor: rename New function to NewPlaylists for clarity Signed-off-by: Deluan <deluan@navidrome.org> * refactor: update mock playlist repository and tests for consistency Signed-off-by: Deluan <deluan@navidrome.org> --------- Signed-off-by: Deluan <deluan@navidrome.org>
377 lines
12 KiB
Go
377 lines
12 KiB
Go
package scanner
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"maps"
|
|
"slices"
|
|
"sync/atomic"
|
|
"time"
|
|
|
|
ppl "github.com/google/go-pipeline/pkg/pipeline"
|
|
"github.com/navidrome/navidrome/conf"
|
|
"github.com/navidrome/navidrome/consts"
|
|
"github.com/navidrome/navidrome/core/artwork"
|
|
"github.com/navidrome/navidrome/core/playlists"
|
|
"github.com/navidrome/navidrome/db"
|
|
"github.com/navidrome/navidrome/log"
|
|
"github.com/navidrome/navidrome/model"
|
|
"github.com/navidrome/navidrome/utils/run"
|
|
"github.com/navidrome/navidrome/utils/slice"
|
|
)
|
|
|
|
type scannerImpl struct {
|
|
ds model.DataStore
|
|
cw artwork.CacheWarmer
|
|
pls playlists.Playlists
|
|
}
|
|
|
|
// scanState holds the state of an in-progress scan, to be passed to the various phases
|
|
type scanState struct {
|
|
progress chan<- *ProgressInfo
|
|
fullScan bool
|
|
changesDetected atomic.Bool
|
|
libraries model.Libraries // Store libraries list for consistency across phases
|
|
targets map[int][]string // Optional: map[libraryID][]folderPaths for selective scans
|
|
totalLibraryCount int // Total number of libraries (unfiltered), for cross-library move detection
|
|
}
|
|
|
|
func (s *scanState) sendProgress(info *ProgressInfo) {
|
|
if s.progress != nil {
|
|
s.progress <- info
|
|
}
|
|
}
|
|
|
|
func (s *scanState) isSelectiveScan() bool {
|
|
return len(s.targets) > 0
|
|
}
|
|
|
|
func (s *scanState) sendWarning(msg string) {
|
|
s.sendProgress(&ProgressInfo{Warning: msg})
|
|
}
|
|
|
|
func (s *scanState) sendError(err error) {
|
|
s.sendProgress(&ProgressInfo{Error: err.Error()})
|
|
}
|
|
|
|
func (s *scannerImpl) scanFolders(ctx context.Context, fullScan bool, targets []model.ScanTarget, progress chan<- *ProgressInfo) {
|
|
startTime := time.Now()
|
|
|
|
state := scanState{
|
|
progress: progress,
|
|
fullScan: fullScan,
|
|
changesDetected: atomic.Bool{},
|
|
}
|
|
|
|
// Set changesDetected to true for full scans to ensure all maintenance operations run
|
|
if fullScan {
|
|
state.changesDetected.Store(true)
|
|
}
|
|
|
|
// Get libraries and optionally filter by targets
|
|
allLibs, err := s.ds.Library(ctx).GetAll()
|
|
if err != nil {
|
|
state.sendWarning(fmt.Sprintf("getting libraries: %s", err))
|
|
return
|
|
}
|
|
state.totalLibraryCount = len(allLibs)
|
|
|
|
if len(targets) > 0 {
|
|
// Selective scan: filter libraries and build targets map
|
|
state.targets = make(map[int][]string)
|
|
|
|
for _, target := range targets {
|
|
folderPath := target.FolderPath
|
|
if folderPath == "" {
|
|
folderPath = "."
|
|
}
|
|
state.targets[target.LibraryID] = append(state.targets[target.LibraryID], folderPath)
|
|
}
|
|
|
|
// Filter libraries to only those in targets
|
|
state.libraries = slice.Filter(allLibs, func(lib model.Library) bool {
|
|
return len(state.targets[lib.ID]) > 0
|
|
})
|
|
|
|
log.Info(ctx, "Scanner: Starting selective scan", "fullScan", state.fullScan, "numLibraries", len(state.libraries), "numTargets", len(targets))
|
|
} else {
|
|
// Full library scan
|
|
state.libraries = allLibs
|
|
log.Info(ctx, "Scanner: Starting scan", "fullScan", state.fullScan, "numLibraries", len(state.libraries))
|
|
}
|
|
|
|
// Store scan type and start time
|
|
scanType := "quick"
|
|
if state.fullScan {
|
|
scanType = "full"
|
|
}
|
|
if state.isSelectiveScan() {
|
|
scanType += "-selective"
|
|
}
|
|
_ = s.ds.Property(ctx).Put(consts.LastScanTypeKey, scanType)
|
|
_ = s.ds.Property(ctx).Put(consts.LastScanStartTimeKey, startTime.Format(time.RFC3339))
|
|
|
|
// if there was a full scan in progress, force a full scan
|
|
if !state.fullScan {
|
|
for _, lib := range state.libraries {
|
|
if lib.FullScanInProgress {
|
|
log.Info(ctx, "Scanner: Interrupted full scan detected", "lib", lib.Name)
|
|
state.fullScan = true
|
|
if state.isSelectiveScan() {
|
|
_ = s.ds.Property(ctx).Put(consts.LastScanTypeKey, "full-selective")
|
|
} else {
|
|
_ = s.ds.Property(ctx).Put(consts.LastScanTypeKey, "full")
|
|
}
|
|
break
|
|
}
|
|
}
|
|
}
|
|
|
|
// Prepare libraries for scanning (initialize LastScanStartedAt if needed)
|
|
err = s.prepareLibrariesForScan(ctx, &state)
|
|
if err != nil {
|
|
log.Error(ctx, "Scanner: Error preparing libraries for scan", err)
|
|
state.sendError(err)
|
|
return
|
|
}
|
|
|
|
err = run.Sequentially(
|
|
// Phase 1: Scan all libraries and import new/updated files
|
|
runPhase[*folderEntry](ctx, 1, createPhaseFolders(ctx, &state, s.ds, s.cw)),
|
|
|
|
// Phase 2: Process missing files, checking for moves
|
|
runPhase[*missingTracks](ctx, 2, createPhaseMissingTracks(ctx, &state, s.ds)),
|
|
|
|
// Phases 3 and 4 can be run in parallel
|
|
run.Parallel(
|
|
// Phase 3: Refresh all new/changed albums and update artists
|
|
runPhase[*model.Album](ctx, 3, createPhaseRefreshAlbums(ctx, &state, s.ds)),
|
|
|
|
// Phase 4: Import/update playlists
|
|
runPhase[*model.Folder](ctx, 4, createPhasePlaylists(ctx, &state, s.ds, s.pls, s.cw)),
|
|
),
|
|
|
|
// Final Steps (cannot be parallelized):
|
|
|
|
// Run GC if there were any changes (Remove dangling tracks, empty albums and artists, and orphan annotations)
|
|
s.runGC(ctx, &state),
|
|
|
|
// Refresh artist and tags stats
|
|
s.runRefreshStats(ctx, &state),
|
|
|
|
// Update last_scan_completed_at for all libraries
|
|
s.runUpdateLibraries(ctx, &state),
|
|
|
|
// Optimize DB
|
|
s.runOptimize(ctx),
|
|
)
|
|
if err != nil {
|
|
log.Error(ctx, "Scanner: Finished with error", "duration", time.Since(startTime), err)
|
|
_ = s.ds.Property(ctx).Put(consts.LastScanErrorKey, err.Error())
|
|
state.sendError(err)
|
|
return
|
|
}
|
|
|
|
_ = s.ds.Property(ctx).Put(consts.LastScanErrorKey, "")
|
|
|
|
if state.changesDetected.Load() {
|
|
state.sendProgress(&ProgressInfo{ChangesDetected: true})
|
|
}
|
|
|
|
if state.isSelectiveScan() {
|
|
log.Info(ctx, "Scanner: Finished scanning selected folders", "duration", time.Since(startTime), "numTargets", len(targets))
|
|
} else {
|
|
log.Info(ctx, "Scanner: Finished scanning all libraries", "duration", time.Since(startTime))
|
|
}
|
|
}
|
|
|
|
// prepareLibrariesForScan initializes the scan for all libraries in the state.
|
|
// It calls ScanBegin for libraries that haven't started scanning yet (LastScanStartedAt is zero),
|
|
// reloads them to get the updated state, and filters out any libraries that fail to initialize.
|
|
func (s *scannerImpl) prepareLibrariesForScan(ctx context.Context, state *scanState) error {
|
|
var successfulLibs []model.Library
|
|
|
|
for _, lib := range state.libraries {
|
|
if lib.LastScanStartedAt.IsZero() {
|
|
// This is a new scan - mark it as started
|
|
err := s.ds.Library(ctx).ScanBegin(lib.ID, state.fullScan)
|
|
if err != nil {
|
|
log.Error(ctx, "Scanner: Error marking scan start", "lib", lib.Name, err)
|
|
state.sendWarning(err.Error())
|
|
continue
|
|
}
|
|
|
|
// Reload library to get updated state (timestamps, etc.)
|
|
reloadedLib, err := s.ds.Library(ctx).Get(lib.ID)
|
|
if err != nil {
|
|
log.Error(ctx, "Scanner: Error reloading library", "lib", lib.Name, err)
|
|
state.sendWarning(err.Error())
|
|
continue
|
|
}
|
|
lib = *reloadedLib
|
|
} else {
|
|
// This is a resumed scan
|
|
log.Debug(ctx, "Scanner: Resuming previous scan", "lib", lib.Name,
|
|
"lastScanStartedAt", lib.LastScanStartedAt, "fullScan", lib.FullScanInProgress)
|
|
}
|
|
|
|
successfulLibs = append(successfulLibs, lib)
|
|
}
|
|
|
|
if len(successfulLibs) == 0 {
|
|
return fmt.Errorf("no libraries available for scanning")
|
|
}
|
|
|
|
// Update state with only successfully initialized libraries
|
|
state.libraries = successfulLibs
|
|
return nil
|
|
}
|
|
|
|
func (s *scannerImpl) runGC(ctx context.Context, state *scanState) func() error {
|
|
return func() error {
|
|
state.sendProgress(&ProgressInfo{ForceUpdate: true})
|
|
return s.ds.WithTx(func(tx model.DataStore) error {
|
|
if state.changesDetected.Load() {
|
|
start := time.Now()
|
|
|
|
// For selective scans, extract library IDs to scope GC operations
|
|
var libraryIDs []int
|
|
if state.isSelectiveScan() {
|
|
libraryIDs = slices.Collect(maps.Keys(state.targets))
|
|
log.Debug(ctx, "Scanner: Running selective GC", "libraryIDs", libraryIDs)
|
|
}
|
|
|
|
err := tx.GC(ctx, libraryIDs...)
|
|
if err != nil {
|
|
log.Error(ctx, "Scanner: Error running GC", err)
|
|
return fmt.Errorf("running GC: %w", err)
|
|
}
|
|
log.Debug(ctx, "Scanner: GC completed", "elapsed", time.Since(start))
|
|
} else {
|
|
log.Debug(ctx, "Scanner: No changes detected, skipping GC")
|
|
}
|
|
return nil
|
|
}, "scanner: GC")
|
|
}
|
|
}
|
|
|
|
func (s *scannerImpl) runRefreshStats(ctx context.Context, state *scanState) func() error {
|
|
return func() error {
|
|
if !state.changesDetected.Load() {
|
|
log.Debug(ctx, "Scanner: No changes detected, skipping refreshing stats")
|
|
return nil
|
|
}
|
|
start := time.Now()
|
|
stats, err := s.ds.Artist(ctx).RefreshStats(state.fullScan)
|
|
if err != nil {
|
|
log.Error(ctx, "Scanner: Error refreshing artists stats", err)
|
|
return fmt.Errorf("refreshing artists stats: %w", err)
|
|
}
|
|
log.Debug(ctx, "Scanner: Refreshed artist stats", "stats", stats, "elapsed", time.Since(start))
|
|
|
|
start = time.Now()
|
|
err = s.ds.Tag(ctx).UpdateCounts()
|
|
if err != nil {
|
|
log.Error(ctx, "Scanner: Error updating tag counts", err)
|
|
return fmt.Errorf("updating tag counts: %w", err)
|
|
}
|
|
log.Debug(ctx, "Scanner: Updated tag counts", "elapsed", time.Since(start))
|
|
return nil
|
|
}
|
|
}
|
|
|
|
func (s *scannerImpl) runOptimize(ctx context.Context) func() error {
|
|
return func() error {
|
|
start := time.Now()
|
|
db.Optimize(ctx)
|
|
log.Debug(ctx, "Scanner: Optimized DB", "elapsed", time.Since(start))
|
|
return nil
|
|
}
|
|
}
|
|
|
|
func (s *scannerImpl) runUpdateLibraries(ctx context.Context, state *scanState) func() error {
|
|
return func() error {
|
|
start := time.Now()
|
|
return s.ds.WithTx(func(tx model.DataStore) error {
|
|
for _, lib := range state.libraries {
|
|
err := tx.Library(ctx).ScanEnd(lib.ID)
|
|
if err != nil {
|
|
log.Error(ctx, "Scanner: Error updating last scan completed", "lib", lib.Name, err)
|
|
return fmt.Errorf("updating last scan completed: %w", err)
|
|
}
|
|
err = tx.Property(ctx).Put(consts.PIDTrackKey, conf.Server.PID.Track)
|
|
if err != nil {
|
|
log.Error(ctx, "Scanner: Error updating track PID conf", err)
|
|
return fmt.Errorf("updating track PID conf: %w", err)
|
|
}
|
|
err = tx.Property(ctx).Put(consts.PIDAlbumKey, conf.Server.PID.Album)
|
|
if err != nil {
|
|
log.Error(ctx, "Scanner: Error updating album PID conf", err)
|
|
return fmt.Errorf("updating album PID conf: %w", err)
|
|
}
|
|
if state.changesDetected.Load() {
|
|
log.Debug(ctx, "Scanner: Refreshing library stats", "lib", lib.Name)
|
|
if err := tx.Library(ctx).RefreshStats(lib.ID); err != nil {
|
|
log.Error(ctx, "Scanner: Error refreshing library stats", "lib", lib.Name, err)
|
|
return fmt.Errorf("refreshing library stats: %w", err)
|
|
}
|
|
} else {
|
|
log.Debug(ctx, "Scanner: No changes detected, skipping library stats refresh", "lib", lib.Name)
|
|
}
|
|
}
|
|
log.Debug(ctx, "Scanner: Updated libraries after scan", "elapsed", time.Since(start), "numLibraries", len(state.libraries))
|
|
return nil
|
|
}, "scanner: update libraries")
|
|
}
|
|
}
|
|
|
|
type phase[T any] interface {
|
|
producer() ppl.Producer[T]
|
|
stages() []ppl.Stage[T]
|
|
finalize(error) error
|
|
description() string
|
|
}
|
|
|
|
func runPhase[T any](ctx context.Context, phaseNum int, phase phase[T]) func() error {
|
|
return func() error {
|
|
log.Debug(ctx, fmt.Sprintf("Scanner: Starting phase %d: %s", phaseNum, phase.description()))
|
|
start := time.Now()
|
|
|
|
producer := phase.producer()
|
|
stages := phase.stages()
|
|
|
|
// Prepend a counter stage to the phase's pipeline
|
|
counter, countStageFn := countTasks[T]()
|
|
stages = append([]ppl.Stage[T]{ppl.NewStage(countStageFn, ppl.Name("count tasks"))}, stages...)
|
|
|
|
var err error
|
|
if log.IsGreaterOrEqualTo(log.LevelDebug) {
|
|
var m *ppl.Metrics
|
|
m, err = ppl.Measure(producer, stages...)
|
|
log.Info(ctx, "Scanner: "+m.String(), err)
|
|
} else {
|
|
err = ppl.Do(producer, stages...)
|
|
}
|
|
|
|
err = phase.finalize(err)
|
|
|
|
if err != nil {
|
|
log.Error(ctx, fmt.Sprintf("Scanner: Error processing libraries in phase %d", phaseNum), "elapsed", time.Since(start), err)
|
|
} else {
|
|
log.Debug(ctx, fmt.Sprintf("Scanner: Finished phase %d", phaseNum), "elapsed", time.Since(start), "totalTasks", counter.Load())
|
|
}
|
|
|
|
return err
|
|
}
|
|
}
|
|
|
|
func countTasks[T any]() (*atomic.Int64, func(T) (T, error)) {
|
|
counter := atomic.Int64{}
|
|
return &counter, func(in T) (T, error) {
|
|
counter.Add(1)
|
|
return in, nil
|
|
}
|
|
}
|
|
|
|
var _ scanner = (*scannerImpl)(nil)
|