diff --git a/conf/configuration.go b/conf/configuration.go index dc7e75b7e..818c53c74 100644 --- a/conf/configuration.go +++ b/conf/configuration.go @@ -14,7 +14,7 @@ import ( "github.com/kr/pretty" "github.com/navidrome/navidrome/consts" "github.com/navidrome/navidrome/log" - "github.com/navidrome/navidrome/utils/chain" + "github.com/navidrome/navidrome/utils/run" "github.com/robfig/cron/v3" "github.com/spf13/viper" ) @@ -276,7 +276,7 @@ func Load(noConfigDump bool) { log.SetLogSourceLine(Server.DevLogSourceLine) log.SetRedacting(Server.EnableLogRedacting) - err = chain.RunSequentially( + err = run.Sequentially( validateScanSchedule, validateBackupSchedule, validatePlaylistsPath, diff --git a/db/migrations/20241026183640_support_new_scanner.go b/db/migrations/20241026183640_support_new_scanner.go index 251b27f63..fcbef7e4e 100644 --- a/db/migrations/20241026183640_support_new_scanner.go +++ b/db/migrations/20241026183640_support_new_scanner.go @@ -13,7 +13,7 @@ import ( "github.com/navidrome/navidrome/log" "github.com/navidrome/navidrome/model" - "github.com/navidrome/navidrome/utils/chain" + "github.com/navidrome/navidrome/utils/run" "github.com/pressly/goose/v3" ) @@ -25,7 +25,7 @@ func upSupportNewScanner(ctx context.Context, tx *sql.Tx) error { execute := createExecuteFunc(ctx, tx) addColumn := createAddColumnFunc(ctx, tx) - return chain.RunSequentially( + return run.Sequentially( upSupportNewScanner_CreateTableFolder(ctx, execute), upSupportNewScanner_PopulateTableFolder(ctx, tx), upSupportNewScanner_UpdateTableMediaFile(ctx, execute, addColumn), @@ -213,7 +213,7 @@ update media_file set path = replace(substr(path, %d), '\', '/');`, libPathLen+2 func upSupportNewScanner_UpdateTableMediaFile(_ context.Context, execute execStmtFunc, addColumn addColumnFunc) execFunc { return func() error { - return chain.RunSequentially( + return run.Sequentially( execute(` alter table media_file add column folder_id varchar default '' not null; @@ -288,7 +288,7 @@ create index if not exists album_mbz_release_group_id func upSupportNewScanner_UpdateTableArtist(_ context.Context, execute execStmtFunc, addColumn addColumnFunc) execFunc { return func() error { - return chain.RunSequentially( + return run.Sequentially( execute(` alter table artist drop column album_count; diff --git a/persistence/library_repository.go b/persistence/library_repository.go index 442f747c5..fdeccc953 100644 --- a/persistence/library_repository.go +++ b/persistence/library_repository.go @@ -9,7 +9,7 @@ import ( "github.com/navidrome/navidrome/conf" "github.com/navidrome/navidrome/log" "github.com/navidrome/navidrome/model" - "github.com/navidrome/navidrome/utils/chain" + "github.com/navidrome/navidrome/utils/run" "github.com/pocketbase/dbx" ) @@ -151,7 +151,7 @@ func (r *libraryRepository) RefreshStats(id int) error { var songsRes, albumsRes, artistsRes, foldersRes, filesRes, missingRes struct{ Count int64 } var sizeRes struct{ Sum int64 } - err := chain.RunParallel( + err := run.Parallel( func() error { return r.queryOne(Select("count(*) as count").From("media_file").Where(Eq{"library_id": id, "missing": false}), &songsRes) }, diff --git a/persistence/persistence.go b/persistence/persistence.go index 2536b9c35..ac607f85f 100644 --- a/persistence/persistence.go +++ b/persistence/persistence.go @@ -9,7 +9,7 @@ import ( "github.com/navidrome/navidrome/db" "github.com/navidrome/navidrome/log" "github.com/navidrome/navidrome/model" - "github.com/navidrome/navidrome/utils/chain" + "github.com/navidrome/navidrome/utils/run" "github.com/pocketbase/dbx" ) @@ -167,7 +167,7 @@ func (s *SQLStore) GC(ctx context.Context) error { } } - err := chain.RunSequentially( + err := run.Sequentially( trace(ctx, "purge empty albums", func() error { return s.Album(ctx).(*albumRepository).purgeEmpty() }), trace(ctx, "purge empty artists", func() error { return s.Artist(ctx).(*artistRepository).purgeEmpty() }), trace(ctx, "mark missing artists", func() error { return s.Artist(ctx).(*artistRepository).markMissing() }), diff --git a/scanner/scanner.go b/scanner/scanner.go index 2d17e5cc0..d84c58a3e 100644 --- a/scanner/scanner.go +++ b/scanner/scanner.go @@ -14,7 +14,7 @@ import ( "github.com/navidrome/navidrome/db" "github.com/navidrome/navidrome/log" "github.com/navidrome/navidrome/model" - "github.com/navidrome/navidrome/utils/chain" + "github.com/navidrome/navidrome/utils/run" ) type scannerImpl struct { @@ -75,7 +75,7 @@ func (s *scannerImpl) scanAll(ctx context.Context, fullScan bool, progress chan< } } - err = chain.RunSequentially( + err = run.Sequentially( // Phase 1: Scan all libraries and import new/updated files runPhase[*folderEntry](ctx, 1, createPhaseFolders(ctx, &state, s.ds, s.cw, libs)), @@ -83,7 +83,7 @@ func (s *scannerImpl) scanAll(ctx context.Context, fullScan bool, progress chan< runPhase[*missingTracks](ctx, 2, createPhaseMissingTracks(ctx, &state, s.ds)), // Phases 3 and 4 can be run in parallel - chain.RunParallel( + run.Parallel( // Phase 3: Refresh all new/changed albums and update artists runPhase[*model.Album](ctx, 3, createPhaseRefreshAlbums(ctx, &state, s.ds, libs)), diff --git a/utils/chain/chain_test.go b/utils/chain/chain_test.go deleted file mode 100644 index 1c6010fb3..000000000 --- a/utils/chain/chain_test.go +++ /dev/null @@ -1,51 +0,0 @@ -package chain_test - -import ( - "errors" - "testing" - - "github.com/navidrome/navidrome/utils/chain" - . "github.com/onsi/ginkgo/v2" - . "github.com/onsi/gomega" -) - -func TestChain(t *testing.T) { - RegisterFailHandler(Fail) - RunSpecs(t, "chain Suite") -} - -var _ = Describe("RunSequentially", func() { - It("should return nil if no functions are provided", func() { - err := chain.RunSequentially() - Expect(err).To(BeNil()) - }) - - It("should return nil if all functions succeed", func() { - err := chain.RunSequentially( - func() error { return nil }, - func() error { return nil }, - ) - Expect(err).To(BeNil()) - }) - - It("should return the error from the first failing function", func() { - expectedErr := errors.New("error in function 2") - err := chain.RunSequentially( - func() error { return nil }, - func() error { return expectedErr }, - func() error { return errors.New("error in function 3") }, - ) - Expect(err).To(Equal(expectedErr)) - }) - - It("should not run functions after the first failing function", func() { - expectedErr := errors.New("error in function 1") - var runCount int - err := chain.RunSequentially( - func() error { runCount++; return expectedErr }, - func() error { runCount++; return nil }, - ) - Expect(err).To(Equal(expectedErr)) - Expect(runCount).To(Equal(1)) - }) -}) diff --git a/utils/chain/chain.go b/utils/run/run.go similarity index 68% rename from utils/chain/chain.go rename to utils/run/run.go index b93dbd93d..182eec42c 100644 --- a/utils/chain/chain.go +++ b/utils/run/run.go @@ -1,11 +1,11 @@ -package chain +package run import "golang.org/x/sync/errgroup" -// RunSequentially runs the given functions sequentially, +// Sequentially runs the given functions sequentially, // If any function returns an error, it stops the execution and returns that error. // If all functions return nil, it returns nil. -func RunSequentially(fs ...func() error) error { +func Sequentially(fs ...func() error) error { for _, f := range fs { if err := f(); err != nil { return err @@ -14,9 +14,9 @@ func RunSequentially(fs ...func() error) error { return nil } -// RunParallel runs the given functions in parallel, +// Parallel runs the given functions in parallel, // It waits for all functions to finish and returns the first error encountered. -func RunParallel(fs ...func() error) func() error { +func Parallel(fs ...func() error) func() error { return func() error { g := errgroup.Group{} for _, f := range fs { diff --git a/utils/run/run_test.go b/utils/run/run_test.go new file mode 100644 index 000000000..07d2d3994 --- /dev/null +++ b/utils/run/run_test.go @@ -0,0 +1,171 @@ +package run_test + +import ( + "errors" + "sync/atomic" + "testing" + "time" + + "github.com/navidrome/navidrome/utils/run" + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" +) + +func TestRun(t *testing.T) { + RegisterFailHandler(Fail) + RunSpecs(t, "Run Suite") +} + +var _ = Describe("Sequentially", func() { + It("should return nil if no functions are provided", func() { + err := run.Sequentially() + Expect(err).To(BeNil()) + }) + + It("should return nil if all functions succeed", func() { + err := run.Sequentially( + func() error { return nil }, + func() error { return nil }, + ) + Expect(err).To(BeNil()) + }) + + It("should return the error from the first failing function", func() { + expectedErr := errors.New("error in function 2") + err := run.Sequentially( + func() error { return nil }, + func() error { return expectedErr }, + func() error { return errors.New("error in function 3") }, + ) + Expect(err).To(Equal(expectedErr)) + }) + + It("should not run functions after the first failing function", func() { + expectedErr := errors.New("error in function 1") + var runCount int + err := run.Sequentially( + func() error { runCount++; return expectedErr }, + func() error { runCount++; return nil }, + ) + Expect(err).To(Equal(expectedErr)) + Expect(runCount).To(Equal(1)) + }) +}) + +var _ = Describe("Parallel", func() { + It("should return a function that returns nil if no functions are provided", func() { + parallelFunc := run.Parallel() + err := parallelFunc() + Expect(err).To(BeNil()) + }) + + It("should return a function that returns nil if all functions succeed", func() { + parallelFunc := run.Parallel( + func() error { return nil }, + func() error { return nil }, + func() error { return nil }, + ) + err := parallelFunc() + Expect(err).To(BeNil()) + }) + + It("should return the first error encountered when functions fail", func() { + expectedErr := errors.New("parallel error") + parallelFunc := run.Parallel( + func() error { return nil }, + func() error { return expectedErr }, + func() error { return errors.New("another error") }, + ) + err := parallelFunc() + Expect(err).To(HaveOccurred()) + // Note: We can't guarantee which error will be returned first in parallel execution + // but we can ensure an error is returned + }) + + It("should run all functions in parallel", func() { + var runCount atomic.Int32 + sync := make(chan struct{}) + + parallelFunc := run.Parallel( + func() error { + runCount.Add(1) + <-sync + runCount.Add(-1) + return nil + }, + func() error { + runCount.Add(1) + <-sync + runCount.Add(-1) + return nil + }, + func() error { + runCount.Add(1) + <-sync + runCount.Add(-1) + return nil + }, + ) + + // Run the parallel function in a goroutine + go func() { + Expect(parallelFunc()).To(Succeed()) + }() + + // Wait for all functions to start running + Eventually(func() int32 { return runCount.Load() }).Should(Equal(int32(3))) + + // Release the functions to complete + close(sync) + + // Wait for all functions to finish + Eventually(func() int32 { return runCount.Load() }).Should(Equal(int32(0))) + }) + + It("should wait for all functions to complete before returning", func() { + var completedCount atomic.Int32 + + parallelFunc := run.Parallel( + func() error { + completedCount.Add(1) + return nil + }, + func() error { + completedCount.Add(1) + return nil + }, + func() error { + completedCount.Add(1) + return nil + }, + ) + + Expect(parallelFunc()).To(Succeed()) + Expect(completedCount.Load()).To(Equal(int32(3))) + }) + + It("should return an error even if other functions are still running", func() { + expectedErr := errors.New("fast error") + var slowFunctionCompleted bool + + parallelFunc := run.Parallel( + func() error { + return expectedErr // Return error immediately + }, + func() error { + time.Sleep(50 * time.Millisecond) // Slow function + slowFunctionCompleted = true + return nil + }, + ) + + start := time.Now() + err := parallelFunc() + duration := time.Since(start) + + Expect(err).To(HaveOccurred()) + // Should wait for all functions to complete, even if one fails early + Expect(duration).To(BeNumerically(">=", 50*time.Millisecond)) + Expect(slowFunctionCompleted).To(BeTrue()) + }) +})