From 0faf744e32112fd0caef9e0c0f0b531081e86dfa Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Deluan=20Quint=C3=A3o?= Date: Mon, 1 Dec 2025 22:21:54 -0500 Subject: [PATCH] refactor: make NowPlaying dispatch asynchronous with worker pool (#4757) * feat: make NowPlaying dispatch asynchronous with worker pool Implemented asynchronous NowPlaying dispatch using a queue worker pattern similar to cacheWarmer. Instead of dispatching NowPlaying updates synchronously during the HTTP request, they are now queued and processed by background workers at controlled intervals. Key changes: - Added nowPlayingEntry struct to represent queued entries - Added npQueue map (keyed by playerId), npMu mutex, and npSignal channel to playTracker - Implemented enqueueNowPlaying() to add entries to the queue - Implemented nowPlayingWorker() that polls every 100ms, drains queue, and processes entries - Changed NowPlaying() to queue dispatch instead of calling synchronously - Renamed dispatchNowPlaying() to dispatchNowPlayingAsync() and updated it to use background context Benefits: - HTTP handlers return immediately without waiting for scrobbler responses - Deduplication by key: rapid calls (seeking) only dispatch latest state - Fire-and-forget: one-shot attempts with logged failures - Backpressure-free: worker processes at its own pace - Tests updated to use Eventually() assertions for async dispatch Signed-off-by: Deluan * fix(play_tracker): increase timeout duration for signal handling Signed-off-by: Deluan * refactor(play_tracker): simplify queue processing by directly assigning entries Signed-off-by: Deluan --------- Signed-off-by: Deluan --- core/scrobbler/buffered_scrobbler_test.go | 6 +- core/scrobbler/play_tracker.go | 71 +++++++++++++- core/scrobbler/play_tracker_test.go | 111 ++++++++++++++-------- 3 files changed, 147 insertions(+), 41 deletions(-) diff --git a/core/scrobbler/buffered_scrobbler_test.go b/core/scrobbler/buffered_scrobbler_test.go index 063e0de8c..9fbca6f71 100644 --- a/core/scrobbler/buffered_scrobbler_test.go +++ b/core/scrobbler/buffered_scrobbler_test.go @@ -38,9 +38,9 @@ var _ = Describe("BufferedScrobbler", func() { It("forwards NowPlaying calls", func() { track := &model.MediaFile{ID: "123", Title: "Test Track"} Expect(bs.NowPlaying(ctx, "user1", track, 0)).To(Succeed()) - Expect(scr.NowPlayingCalled).To(BeTrue()) - Expect(scr.UserID).To(Equal("user1")) - Expect(scr.Track).To(Equal(track)) + Expect(scr.GetNowPlayingCalled()).To(BeTrue()) + Expect(scr.GetUserID()).To(Equal("user1")) + Expect(scr.GetTrack()).To(Equal(track)) }) It("enqueues scrobbles to buffer", func() { diff --git a/core/scrobbler/play_tracker.go b/core/scrobbler/play_tracker.go index 3b71a2100..d7ab0e6cf 100644 --- a/core/scrobbler/play_tracker.go +++ b/core/scrobbler/play_tracker.go @@ -31,6 +31,12 @@ type Submission struct { Timestamp time.Time } +type nowPlayingEntry struct { + userId string + track *model.MediaFile + position int +} + type PlayTracker interface { NowPlaying(ctx context.Context, playerId string, playerName string, trackId string, position int) error GetNowPlaying(ctx context.Context) ([]NowPlayingInfo, error) @@ -52,6 +58,11 @@ type playTracker struct { pluginScrobblers map[string]Scrobbler pluginLoader PluginLoader mu sync.RWMutex + npQueue map[string]nowPlayingEntry + npMu sync.Mutex + npSignal chan struct{} + shutdown chan struct{} + workerDone chan struct{} } func GetPlayTracker(ds model.DataStore, broker events.Broker, pluginManager PluginLoader) PlayTracker { @@ -71,6 +82,10 @@ func newPlayTracker(ds model.DataStore, broker events.Broker, pluginManager Plug builtinScrobblers: make(map[string]Scrobbler), pluginScrobblers: make(map[string]Scrobbler), pluginLoader: pluginManager, + npQueue: make(map[string]nowPlayingEntry), + npSignal: make(chan struct{}, 1), + shutdown: make(chan struct{}), + workerDone: make(chan struct{}), } if conf.Server.EnableNowPlaying { m.OnExpiration(func(_ string, _ NowPlayingInfo) { @@ -90,9 +105,16 @@ func newPlayTracker(ds model.DataStore, broker events.Broker, pluginManager Plug p.builtinScrobblers[name] = s } log.Debug("List of builtin scrobblers enabled", "names", enabled) + go p.nowPlayingWorker() return p } +// stopNowPlayingWorker stops the background worker. This is primarily for testing. +func (p *playTracker) stopNowPlayingWorker() { + close(p.shutdown) + <-p.workerDone // Wait for worker to finish +} + // pluginNamesMatchScrobblers returns true if the set of pluginNames matches the keys in pluginScrobblers func pluginNamesMatchScrobblers(pluginNames []string, scrobblers map[string]Scrobbler) bool { if len(pluginNames) != len(scrobblers) { @@ -198,11 +220,58 @@ func (p *playTracker) NowPlaying(ctx context.Context, playerId string, playerNam } player, _ := request.PlayerFrom(ctx) if player.ScrobbleEnabled { - p.dispatchNowPlaying(ctx, user.ID, mf, position) + p.enqueueNowPlaying(playerId, user.ID, mf, position) } return nil } +func (p *playTracker) enqueueNowPlaying(playerId string, userId string, track *model.MediaFile, position int) { + p.npMu.Lock() + defer p.npMu.Unlock() + p.npQueue[playerId] = nowPlayingEntry{ + userId: userId, + track: track, + position: position, + } + p.sendNowPlayingSignal() +} + +func (p *playTracker) sendNowPlayingSignal() { + // Don't block if the previous signal was not read yet + select { + case p.npSignal <- struct{}{}: + default: + } +} + +func (p *playTracker) nowPlayingWorker() { + defer close(p.workerDone) + for { + select { + case <-p.shutdown: + return + case <-time.After(time.Second): + case <-p.npSignal: + } + + p.npMu.Lock() + if len(p.npQueue) == 0 { + p.npMu.Unlock() + continue + } + + // Keep a copy of the entries to process and clear the queue + entries := p.npQueue + p.npQueue = make(map[string]nowPlayingEntry) + p.npMu.Unlock() + + // Process entries without holding lock + for _, entry := range entries { + p.dispatchNowPlaying(context.Background(), entry.userId, entry.track, entry.position) + } + } +} + func (p *playTracker) dispatchNowPlaying(ctx context.Context, userId string, t *model.MediaFile, position int) { if t.Artist == consts.UnknownArtist { log.Debug(ctx, "Ignoring external NowPlaying update for track with unknown artist", "track", t.Title, "artist", t.Artist) diff --git a/core/scrobbler/play_tracker_test.go b/core/scrobbler/play_tracker_test.go index 7b4785bb5..f300f7796 100644 --- a/core/scrobbler/play_tracker_test.go +++ b/core/scrobbler/play_tracker_test.go @@ -24,15 +24,26 @@ import ( // Moved to top-level scope to avoid linter issues type mockPluginLoader struct { + mu sync.RWMutex names []string scrobblers map[string]Scrobbler } func (m *mockPluginLoader) PluginNames(service string) []string { + m.mu.RLock() + defer m.mu.RUnlock() return m.names } +func (m *mockPluginLoader) SetNames(names []string) { + m.mu.Lock() + defer m.mu.Unlock() + m.names = names +} + func (m *mockPluginLoader) LoadScrobbler(name string) (Scrobbler, bool) { + m.mu.RLock() + defer m.mu.RUnlock() s, ok := m.scrobblers[name] return s, ok } @@ -46,7 +57,7 @@ var _ = Describe("PlayTracker", func() { var album model.Album var artist1 model.Artist var artist2 model.Artist - var fake fakeScrobbler + var fake *fakeScrobbler BeforeEach(func() { DeferCleanup(configtest.SetupConfig()) @@ -54,16 +65,16 @@ var _ = Describe("PlayTracker", func() { ctx = request.WithUser(ctx, model.User{ID: "u-1"}) ctx = request.WithPlayer(ctx, model.Player{ScrobbleEnabled: true}) ds = &tests.MockDataStore{} - fake = fakeScrobbler{Authorized: true} + fake = &fakeScrobbler{Authorized: true} Register("fake", func(model.DataStore) Scrobbler { - return &fake + return fake }) Register("disabled", func(model.DataStore) Scrobbler { return nil }) eventBroker = &fakeEventBroker{} tracker = newPlayTracker(ds, eventBroker, nil) - tracker.(*playTracker).builtinScrobblers["fake"] = &fake // Bypass buffering for tests + tracker.(*playTracker).builtinScrobblers["fake"] = fake // Bypass buffering for tests track = model.MediaFile{ ID: "123", @@ -86,6 +97,11 @@ var _ = Describe("PlayTracker", func() { _ = ds.Album(ctx).(*tests.MockAlbumRepo).Put(&album) }) + AfterEach(func() { + // Stop the worker goroutine to prevent data races between tests + tracker.(*playTracker).stopNowPlayingWorker() + }) + It("does not register disabled scrobblers", func() { Expect(tracker.(*playTracker).builtinScrobblers).To(HaveKey("fake")) Expect(tracker.(*playTracker).builtinScrobblers).ToNot(HaveKey("disabled")) @@ -95,10 +111,10 @@ var _ = Describe("PlayTracker", func() { It("sends track to agent", func() { err := tracker.NowPlaying(ctx, "player-1", "player-one", "123", 0) Expect(err).ToNot(HaveOccurred()) - Expect(fake.NowPlayingCalled).To(BeTrue()) - Expect(fake.UserID).To(Equal("u-1")) - Expect(fake.Track.ID).To(Equal("123")) - Expect(fake.Track.Participants).To(Equal(track.Participants)) + Eventually(func() bool { return fake.GetNowPlayingCalled() }).Should(BeTrue()) + Expect(fake.GetUserID()).To(Equal("u-1")) + Expect(fake.GetTrack().ID).To(Equal("123")) + Expect(fake.GetTrack().Participants).To(Equal(track.Participants)) }) It("does not send track to agent if user has not authorized", func() { fake.Authorized = false @@ -106,7 +122,7 @@ var _ = Describe("PlayTracker", func() { err := tracker.NowPlaying(ctx, "player-1", "player-one", "123", 0) Expect(err).ToNot(HaveOccurred()) - Expect(fake.NowPlayingCalled).To(BeFalse()) + Expect(fake.GetNowPlayingCalled()).To(BeFalse()) }) It("does not send track to agent if player is not enabled to send scrobbles", func() { ctx = request.WithPlayer(ctx, model.Player{ScrobbleEnabled: false}) @@ -114,7 +130,7 @@ var _ = Describe("PlayTracker", func() { err := tracker.NowPlaying(ctx, "player-1", "player-one", "123", 0) Expect(err).ToNot(HaveOccurred()) - Expect(fake.NowPlayingCalled).To(BeFalse()) + Expect(fake.GetNowPlayingCalled()).To(BeFalse()) }) It("does not send track to agent if artist is unknown", func() { track.Artist = consts.UnknownArtist @@ -122,7 +138,7 @@ var _ = Describe("PlayTracker", func() { err := tracker.NowPlaying(ctx, "player-1", "player-one", "123", 0) Expect(err).ToNot(HaveOccurred()) - Expect(fake.NowPlayingCalled).To(BeFalse()) + Expect(fake.GetNowPlayingCalled()).To(BeFalse()) }) It("stores position when greater than zero", func() { @@ -130,11 +146,12 @@ var _ = Describe("PlayTracker", func() { err := tracker.NowPlaying(ctx, "player-1", "player-one", "123", pos) Expect(err).ToNot(HaveOccurred()) + Eventually(func() int { return fake.GetPosition() }).Should(Equal(pos)) + playing, err := tracker.GetNowPlaying(ctx) Expect(err).ToNot(HaveOccurred()) Expect(playing).To(HaveLen(1)) Expect(playing[0].Position).To(Equal(pos)) - Expect(fake.Position).To(Equal(pos)) }) It("sends event with count", func() { @@ -210,7 +227,7 @@ var _ = Describe("PlayTracker", func() { Expect(err).ToNot(HaveOccurred()) Expect(fake.ScrobbleCalled.Load()).To(BeTrue()) - Expect(fake.UserID).To(Equal("u-1")) + Expect(fake.GetUserID()).To(Equal("u-1")) lastScrobble := fake.LastScrobble.Load() Expect(lastScrobble.TimeStamp).To(BeTemporally("~", ts, 1*time.Second)) Expect(lastScrobble.ID).To(Equal("123")) @@ -278,45 +295,46 @@ var _ = Describe("PlayTracker", func() { Describe("Plugin scrobbler logic", func() { var pluginLoader *mockPluginLoader - var pluginFake fakeScrobbler + var pluginFake *fakeScrobbler BeforeEach(func() { - pluginFake = fakeScrobbler{Authorized: true} + pluginFake = &fakeScrobbler{Authorized: true} pluginLoader = &mockPluginLoader{ names: []string{"plugin1"}, - scrobblers: map[string]Scrobbler{"plugin1": &pluginFake}, + scrobblers: map[string]Scrobbler{"plugin1": pluginFake}, } tracker = newPlayTracker(ds, events.GetBroker(), pluginLoader) // Bypass buffering for both built-in and plugin scrobblers - tracker.(*playTracker).builtinScrobblers["fake"] = &fake - tracker.(*playTracker).pluginScrobblers["plugin1"] = &pluginFake + tracker.(*playTracker).builtinScrobblers["fake"] = fake + tracker.(*playTracker).pluginScrobblers["plugin1"] = pluginFake }) It("registers and uses plugin scrobbler for NowPlaying", func() { err := tracker.NowPlaying(ctx, "player-1", "player-one", "123", 0) Expect(err).ToNot(HaveOccurred()) - Expect(pluginFake.NowPlayingCalled).To(BeTrue()) + Eventually(func() bool { return pluginFake.GetNowPlayingCalled() }).Should(BeTrue()) }) It("removes plugin scrobbler if not present anymore", func() { // First call: plugin present _ = tracker.NowPlaying(ctx, "player-1", "player-one", "123", 0) - Expect(pluginFake.NowPlayingCalled).To(BeTrue()) - pluginFake.NowPlayingCalled = false + Eventually(func() bool { return pluginFake.GetNowPlayingCalled() }).Should(BeTrue()) + pluginFake.nowPlayingCalled.Store(false) // Remove plugin - pluginLoader.names = []string{} + pluginLoader.SetNames([]string{}) _ = tracker.NowPlaying(ctx, "player-1", "player-one", "123", 0) - Expect(pluginFake.NowPlayingCalled).To(BeFalse()) + // Should not be called since plugin was removed + Consistently(func() bool { return pluginFake.GetNowPlayingCalled() }).Should(BeFalse()) }) It("calls both builtin and plugin scrobblers for NowPlaying", func() { - fake.NowPlayingCalled = false - pluginFake.NowPlayingCalled = false + fake.nowPlayingCalled.Store(false) + pluginFake.nowPlayingCalled.Store(false) err := tracker.NowPlaying(ctx, "player-1", "player-one", "123", 0) Expect(err).ToNot(HaveOccurred()) - Expect(fake.NowPlayingCalled).To(BeTrue()) - Expect(pluginFake.NowPlayingCalled).To(BeTrue()) + Eventually(func() bool { return fake.GetNowPlayingCalled() }).Should(BeTrue()) + Eventually(func() bool { return pluginFake.GetNowPlayingCalled() }).Should(BeTrue()) }) It("calls plugin scrobbler for Submit", func() { @@ -359,7 +377,7 @@ var _ = Describe("PlayTracker", func() { It("calls Stop on scrobblers when removing them", func() { // Change the plugin names to simulate a plugin being removed - mockPlugin.names = []string{} + mockPlugin.SetNames([]string{}) // Call refreshPluginScrobblers which should detect the removed plugin pTracker.refreshPluginScrobblers() @@ -375,32 +393,51 @@ var _ = Describe("PlayTracker", func() { type fakeScrobbler struct { Authorized bool - NowPlayingCalled bool + nowPlayingCalled atomic.Bool ScrobbleCalled atomic.Bool - UserID string - Track *model.MediaFile - Position int + userID atomic.Pointer[string] + track atomic.Pointer[model.MediaFile] + position atomic.Int32 LastScrobble atomic.Pointer[Scrobble] Error error } +func (f *fakeScrobbler) GetNowPlayingCalled() bool { + return f.nowPlayingCalled.Load() +} + +func (f *fakeScrobbler) GetUserID() string { + if p := f.userID.Load(); p != nil { + return *p + } + return "" +} + +func (f *fakeScrobbler) GetTrack() *model.MediaFile { + return f.track.Load() +} + +func (f *fakeScrobbler) GetPosition() int { + return int(f.position.Load()) +} + func (f *fakeScrobbler) IsAuthorized(ctx context.Context, userId string) bool { return f.Error == nil && f.Authorized } func (f *fakeScrobbler) NowPlaying(ctx context.Context, userId string, track *model.MediaFile, position int) error { - f.NowPlayingCalled = true + f.nowPlayingCalled.Store(true) if f.Error != nil { return f.Error } - f.UserID = userId - f.Track = track - f.Position = position + f.userID.Store(&userId) + f.track.Store(track) + f.position.Store(int32(position)) return nil } func (f *fakeScrobbler) Scrobble(ctx context.Context, userId string, s Scrobble) error { - f.UserID = userId + f.userID.Store(&userId) f.LastScrobble.Store(&s) f.ScrobbleCalled.Store(true) if f.Error != nil {