refactor: make NowPlaying dispatch asynchronous with worker pool (#4757)

* feat: make NowPlaying dispatch asynchronous with worker pool

Implemented asynchronous NowPlaying dispatch using a queue worker pattern similar to cacheWarmer. Instead of dispatching NowPlaying updates synchronously during the HTTP request, they are now queued and processed by background workers at controlled intervals.

Key changes:
- Added nowPlayingEntry struct to represent queued entries
- Added npQueue map (keyed by playerId), npMu mutex, and npSignal channel to playTracker
- Implemented enqueueNowPlaying() to add entries to the queue
- Implemented nowPlayingWorker() that polls every 100ms, drains queue, and processes entries
- Changed NowPlaying() to queue dispatch instead of calling synchronously
- Renamed dispatchNowPlaying() to dispatchNowPlayingAsync() and updated it to use background context

Benefits:
- HTTP handlers return immediately without waiting for scrobbler responses
- Deduplication by key: rapid calls (seeking) only dispatch latest state
- Fire-and-forget: one-shot attempts with logged failures
- Backpressure-free: worker processes at its own pace
- Tests updated to use Eventually() assertions for async dispatch

Signed-off-by: Deluan <deluan@navidrome.org>

* fix(play_tracker): increase timeout duration for signal handling

Signed-off-by: Deluan <deluan@navidrome.org>

* refactor(play_tracker): simplify queue processing by directly assigning entries

Signed-off-by: Deluan <deluan@navidrome.org>

---------

Signed-off-by: Deluan <deluan@navidrome.org>
This commit is contained in:
Deluan Quintão
2025-12-01 22:21:54 -05:00
committed by GitHub
parent 33d9ce6ecc
commit 0faf744e32
3 changed files with 147 additions and 41 deletions

View File

@@ -38,9 +38,9 @@ var _ = Describe("BufferedScrobbler", func() {
It("forwards NowPlaying calls", func() { It("forwards NowPlaying calls", func() {
track := &model.MediaFile{ID: "123", Title: "Test Track"} track := &model.MediaFile{ID: "123", Title: "Test Track"}
Expect(bs.NowPlaying(ctx, "user1", track, 0)).To(Succeed()) Expect(bs.NowPlaying(ctx, "user1", track, 0)).To(Succeed())
Expect(scr.NowPlayingCalled).To(BeTrue()) Expect(scr.GetNowPlayingCalled()).To(BeTrue())
Expect(scr.UserID).To(Equal("user1")) Expect(scr.GetUserID()).To(Equal("user1"))
Expect(scr.Track).To(Equal(track)) Expect(scr.GetTrack()).To(Equal(track))
}) })
It("enqueues scrobbles to buffer", func() { It("enqueues scrobbles to buffer", func() {

View File

@@ -31,6 +31,12 @@ type Submission struct {
Timestamp time.Time Timestamp time.Time
} }
type nowPlayingEntry struct {
userId string
track *model.MediaFile
position int
}
type PlayTracker interface { type PlayTracker interface {
NowPlaying(ctx context.Context, playerId string, playerName string, trackId string, position int) error NowPlaying(ctx context.Context, playerId string, playerName string, trackId string, position int) error
GetNowPlaying(ctx context.Context) ([]NowPlayingInfo, error) GetNowPlaying(ctx context.Context) ([]NowPlayingInfo, error)
@@ -52,6 +58,11 @@ type playTracker struct {
pluginScrobblers map[string]Scrobbler pluginScrobblers map[string]Scrobbler
pluginLoader PluginLoader pluginLoader PluginLoader
mu sync.RWMutex mu sync.RWMutex
npQueue map[string]nowPlayingEntry
npMu sync.Mutex
npSignal chan struct{}
shutdown chan struct{}
workerDone chan struct{}
} }
func GetPlayTracker(ds model.DataStore, broker events.Broker, pluginManager PluginLoader) PlayTracker { func GetPlayTracker(ds model.DataStore, broker events.Broker, pluginManager PluginLoader) PlayTracker {
@@ -71,6 +82,10 @@ func newPlayTracker(ds model.DataStore, broker events.Broker, pluginManager Plug
builtinScrobblers: make(map[string]Scrobbler), builtinScrobblers: make(map[string]Scrobbler),
pluginScrobblers: make(map[string]Scrobbler), pluginScrobblers: make(map[string]Scrobbler),
pluginLoader: pluginManager, pluginLoader: pluginManager,
npQueue: make(map[string]nowPlayingEntry),
npSignal: make(chan struct{}, 1),
shutdown: make(chan struct{}),
workerDone: make(chan struct{}),
} }
if conf.Server.EnableNowPlaying { if conf.Server.EnableNowPlaying {
m.OnExpiration(func(_ string, _ NowPlayingInfo) { m.OnExpiration(func(_ string, _ NowPlayingInfo) {
@@ -90,9 +105,16 @@ func newPlayTracker(ds model.DataStore, broker events.Broker, pluginManager Plug
p.builtinScrobblers[name] = s p.builtinScrobblers[name] = s
} }
log.Debug("List of builtin scrobblers enabled", "names", enabled) log.Debug("List of builtin scrobblers enabled", "names", enabled)
go p.nowPlayingWorker()
return p return p
} }
// stopNowPlayingWorker stops the background worker. This is primarily for testing.
func (p *playTracker) stopNowPlayingWorker() {
close(p.shutdown)
<-p.workerDone // Wait for worker to finish
}
// pluginNamesMatchScrobblers returns true if the set of pluginNames matches the keys in pluginScrobblers // pluginNamesMatchScrobblers returns true if the set of pluginNames matches the keys in pluginScrobblers
func pluginNamesMatchScrobblers(pluginNames []string, scrobblers map[string]Scrobbler) bool { func pluginNamesMatchScrobblers(pluginNames []string, scrobblers map[string]Scrobbler) bool {
if len(pluginNames) != len(scrobblers) { if len(pluginNames) != len(scrobblers) {
@@ -198,11 +220,58 @@ func (p *playTracker) NowPlaying(ctx context.Context, playerId string, playerNam
} }
player, _ := request.PlayerFrom(ctx) player, _ := request.PlayerFrom(ctx)
if player.ScrobbleEnabled { if player.ScrobbleEnabled {
p.dispatchNowPlaying(ctx, user.ID, mf, position) p.enqueueNowPlaying(playerId, user.ID, mf, position)
} }
return nil return nil
} }
func (p *playTracker) enqueueNowPlaying(playerId string, userId string, track *model.MediaFile, position int) {
p.npMu.Lock()
defer p.npMu.Unlock()
p.npQueue[playerId] = nowPlayingEntry{
userId: userId,
track: track,
position: position,
}
p.sendNowPlayingSignal()
}
func (p *playTracker) sendNowPlayingSignal() {
// Don't block if the previous signal was not read yet
select {
case p.npSignal <- struct{}{}:
default:
}
}
func (p *playTracker) nowPlayingWorker() {
defer close(p.workerDone)
for {
select {
case <-p.shutdown:
return
case <-time.After(time.Second):
case <-p.npSignal:
}
p.npMu.Lock()
if len(p.npQueue) == 0 {
p.npMu.Unlock()
continue
}
// Keep a copy of the entries to process and clear the queue
entries := p.npQueue
p.npQueue = make(map[string]nowPlayingEntry)
p.npMu.Unlock()
// Process entries without holding lock
for _, entry := range entries {
p.dispatchNowPlaying(context.Background(), entry.userId, entry.track, entry.position)
}
}
}
func (p *playTracker) dispatchNowPlaying(ctx context.Context, userId string, t *model.MediaFile, position int) { func (p *playTracker) dispatchNowPlaying(ctx context.Context, userId string, t *model.MediaFile, position int) {
if t.Artist == consts.UnknownArtist { if t.Artist == consts.UnknownArtist {
log.Debug(ctx, "Ignoring external NowPlaying update for track with unknown artist", "track", t.Title, "artist", t.Artist) log.Debug(ctx, "Ignoring external NowPlaying update for track with unknown artist", "track", t.Title, "artist", t.Artist)

View File

@@ -24,15 +24,26 @@ import (
// Moved to top-level scope to avoid linter issues // Moved to top-level scope to avoid linter issues
type mockPluginLoader struct { type mockPluginLoader struct {
mu sync.RWMutex
names []string names []string
scrobblers map[string]Scrobbler scrobblers map[string]Scrobbler
} }
func (m *mockPluginLoader) PluginNames(service string) []string { func (m *mockPluginLoader) PluginNames(service string) []string {
m.mu.RLock()
defer m.mu.RUnlock()
return m.names return m.names
} }
func (m *mockPluginLoader) SetNames(names []string) {
m.mu.Lock()
defer m.mu.Unlock()
m.names = names
}
func (m *mockPluginLoader) LoadScrobbler(name string) (Scrobbler, bool) { func (m *mockPluginLoader) LoadScrobbler(name string) (Scrobbler, bool) {
m.mu.RLock()
defer m.mu.RUnlock()
s, ok := m.scrobblers[name] s, ok := m.scrobblers[name]
return s, ok return s, ok
} }
@@ -46,7 +57,7 @@ var _ = Describe("PlayTracker", func() {
var album model.Album var album model.Album
var artist1 model.Artist var artist1 model.Artist
var artist2 model.Artist var artist2 model.Artist
var fake fakeScrobbler var fake *fakeScrobbler
BeforeEach(func() { BeforeEach(func() {
DeferCleanup(configtest.SetupConfig()) DeferCleanup(configtest.SetupConfig())
@@ -54,16 +65,16 @@ var _ = Describe("PlayTracker", func() {
ctx = request.WithUser(ctx, model.User{ID: "u-1"}) ctx = request.WithUser(ctx, model.User{ID: "u-1"})
ctx = request.WithPlayer(ctx, model.Player{ScrobbleEnabled: true}) ctx = request.WithPlayer(ctx, model.Player{ScrobbleEnabled: true})
ds = &tests.MockDataStore{} ds = &tests.MockDataStore{}
fake = fakeScrobbler{Authorized: true} fake = &fakeScrobbler{Authorized: true}
Register("fake", func(model.DataStore) Scrobbler { Register("fake", func(model.DataStore) Scrobbler {
return &fake return fake
}) })
Register("disabled", func(model.DataStore) Scrobbler { Register("disabled", func(model.DataStore) Scrobbler {
return nil return nil
}) })
eventBroker = &fakeEventBroker{} eventBroker = &fakeEventBroker{}
tracker = newPlayTracker(ds, eventBroker, nil) tracker = newPlayTracker(ds, eventBroker, nil)
tracker.(*playTracker).builtinScrobblers["fake"] = &fake // Bypass buffering for tests tracker.(*playTracker).builtinScrobblers["fake"] = fake // Bypass buffering for tests
track = model.MediaFile{ track = model.MediaFile{
ID: "123", ID: "123",
@@ -86,6 +97,11 @@ var _ = Describe("PlayTracker", func() {
_ = ds.Album(ctx).(*tests.MockAlbumRepo).Put(&album) _ = ds.Album(ctx).(*tests.MockAlbumRepo).Put(&album)
}) })
AfterEach(func() {
// Stop the worker goroutine to prevent data races between tests
tracker.(*playTracker).stopNowPlayingWorker()
})
It("does not register disabled scrobblers", func() { It("does not register disabled scrobblers", func() {
Expect(tracker.(*playTracker).builtinScrobblers).To(HaveKey("fake")) Expect(tracker.(*playTracker).builtinScrobblers).To(HaveKey("fake"))
Expect(tracker.(*playTracker).builtinScrobblers).ToNot(HaveKey("disabled")) Expect(tracker.(*playTracker).builtinScrobblers).ToNot(HaveKey("disabled"))
@@ -95,10 +111,10 @@ var _ = Describe("PlayTracker", func() {
It("sends track to agent", func() { It("sends track to agent", func() {
err := tracker.NowPlaying(ctx, "player-1", "player-one", "123", 0) err := tracker.NowPlaying(ctx, "player-1", "player-one", "123", 0)
Expect(err).ToNot(HaveOccurred()) Expect(err).ToNot(HaveOccurred())
Expect(fake.NowPlayingCalled).To(BeTrue()) Eventually(func() bool { return fake.GetNowPlayingCalled() }).Should(BeTrue())
Expect(fake.UserID).To(Equal("u-1")) Expect(fake.GetUserID()).To(Equal("u-1"))
Expect(fake.Track.ID).To(Equal("123")) Expect(fake.GetTrack().ID).To(Equal("123"))
Expect(fake.Track.Participants).To(Equal(track.Participants)) Expect(fake.GetTrack().Participants).To(Equal(track.Participants))
}) })
It("does not send track to agent if user has not authorized", func() { It("does not send track to agent if user has not authorized", func() {
fake.Authorized = false fake.Authorized = false
@@ -106,7 +122,7 @@ var _ = Describe("PlayTracker", func() {
err := tracker.NowPlaying(ctx, "player-1", "player-one", "123", 0) err := tracker.NowPlaying(ctx, "player-1", "player-one", "123", 0)
Expect(err).ToNot(HaveOccurred()) Expect(err).ToNot(HaveOccurred())
Expect(fake.NowPlayingCalled).To(BeFalse()) Expect(fake.GetNowPlayingCalled()).To(BeFalse())
}) })
It("does not send track to agent if player is not enabled to send scrobbles", func() { It("does not send track to agent if player is not enabled to send scrobbles", func() {
ctx = request.WithPlayer(ctx, model.Player{ScrobbleEnabled: false}) ctx = request.WithPlayer(ctx, model.Player{ScrobbleEnabled: false})
@@ -114,7 +130,7 @@ var _ = Describe("PlayTracker", func() {
err := tracker.NowPlaying(ctx, "player-1", "player-one", "123", 0) err := tracker.NowPlaying(ctx, "player-1", "player-one", "123", 0)
Expect(err).ToNot(HaveOccurred()) Expect(err).ToNot(HaveOccurred())
Expect(fake.NowPlayingCalled).To(BeFalse()) Expect(fake.GetNowPlayingCalled()).To(BeFalse())
}) })
It("does not send track to agent if artist is unknown", func() { It("does not send track to agent if artist is unknown", func() {
track.Artist = consts.UnknownArtist track.Artist = consts.UnknownArtist
@@ -122,7 +138,7 @@ var _ = Describe("PlayTracker", func() {
err := tracker.NowPlaying(ctx, "player-1", "player-one", "123", 0) err := tracker.NowPlaying(ctx, "player-1", "player-one", "123", 0)
Expect(err).ToNot(HaveOccurred()) Expect(err).ToNot(HaveOccurred())
Expect(fake.NowPlayingCalled).To(BeFalse()) Expect(fake.GetNowPlayingCalled()).To(BeFalse())
}) })
It("stores position when greater than zero", func() { It("stores position when greater than zero", func() {
@@ -130,11 +146,12 @@ var _ = Describe("PlayTracker", func() {
err := tracker.NowPlaying(ctx, "player-1", "player-one", "123", pos) err := tracker.NowPlaying(ctx, "player-1", "player-one", "123", pos)
Expect(err).ToNot(HaveOccurred()) Expect(err).ToNot(HaveOccurred())
Eventually(func() int { return fake.GetPosition() }).Should(Equal(pos))
playing, err := tracker.GetNowPlaying(ctx) playing, err := tracker.GetNowPlaying(ctx)
Expect(err).ToNot(HaveOccurred()) Expect(err).ToNot(HaveOccurred())
Expect(playing).To(HaveLen(1)) Expect(playing).To(HaveLen(1))
Expect(playing[0].Position).To(Equal(pos)) Expect(playing[0].Position).To(Equal(pos))
Expect(fake.Position).To(Equal(pos))
}) })
It("sends event with count", func() { It("sends event with count", func() {
@@ -210,7 +227,7 @@ var _ = Describe("PlayTracker", func() {
Expect(err).ToNot(HaveOccurred()) Expect(err).ToNot(HaveOccurred())
Expect(fake.ScrobbleCalled.Load()).To(BeTrue()) Expect(fake.ScrobbleCalled.Load()).To(BeTrue())
Expect(fake.UserID).To(Equal("u-1")) Expect(fake.GetUserID()).To(Equal("u-1"))
lastScrobble := fake.LastScrobble.Load() lastScrobble := fake.LastScrobble.Load()
Expect(lastScrobble.TimeStamp).To(BeTemporally("~", ts, 1*time.Second)) Expect(lastScrobble.TimeStamp).To(BeTemporally("~", ts, 1*time.Second))
Expect(lastScrobble.ID).To(Equal("123")) Expect(lastScrobble.ID).To(Equal("123"))
@@ -278,45 +295,46 @@ var _ = Describe("PlayTracker", func() {
Describe("Plugin scrobbler logic", func() { Describe("Plugin scrobbler logic", func() {
var pluginLoader *mockPluginLoader var pluginLoader *mockPluginLoader
var pluginFake fakeScrobbler var pluginFake *fakeScrobbler
BeforeEach(func() { BeforeEach(func() {
pluginFake = fakeScrobbler{Authorized: true} pluginFake = &fakeScrobbler{Authorized: true}
pluginLoader = &mockPluginLoader{ pluginLoader = &mockPluginLoader{
names: []string{"plugin1"}, names: []string{"plugin1"},
scrobblers: map[string]Scrobbler{"plugin1": &pluginFake}, scrobblers: map[string]Scrobbler{"plugin1": pluginFake},
} }
tracker = newPlayTracker(ds, events.GetBroker(), pluginLoader) tracker = newPlayTracker(ds, events.GetBroker(), pluginLoader)
// Bypass buffering for both built-in and plugin scrobblers // Bypass buffering for both built-in and plugin scrobblers
tracker.(*playTracker).builtinScrobblers["fake"] = &fake tracker.(*playTracker).builtinScrobblers["fake"] = fake
tracker.(*playTracker).pluginScrobblers["plugin1"] = &pluginFake tracker.(*playTracker).pluginScrobblers["plugin1"] = pluginFake
}) })
It("registers and uses plugin scrobbler for NowPlaying", func() { It("registers and uses plugin scrobbler for NowPlaying", func() {
err := tracker.NowPlaying(ctx, "player-1", "player-one", "123", 0) err := tracker.NowPlaying(ctx, "player-1", "player-one", "123", 0)
Expect(err).ToNot(HaveOccurred()) Expect(err).ToNot(HaveOccurred())
Expect(pluginFake.NowPlayingCalled).To(BeTrue()) Eventually(func() bool { return pluginFake.GetNowPlayingCalled() }).Should(BeTrue())
}) })
It("removes plugin scrobbler if not present anymore", func() { It("removes plugin scrobbler if not present anymore", func() {
// First call: plugin present // First call: plugin present
_ = tracker.NowPlaying(ctx, "player-1", "player-one", "123", 0) _ = tracker.NowPlaying(ctx, "player-1", "player-one", "123", 0)
Expect(pluginFake.NowPlayingCalled).To(BeTrue()) Eventually(func() bool { return pluginFake.GetNowPlayingCalled() }).Should(BeTrue())
pluginFake.NowPlayingCalled = false pluginFake.nowPlayingCalled.Store(false)
// Remove plugin // Remove plugin
pluginLoader.names = []string{} pluginLoader.SetNames([]string{})
_ = tracker.NowPlaying(ctx, "player-1", "player-one", "123", 0) _ = tracker.NowPlaying(ctx, "player-1", "player-one", "123", 0)
Expect(pluginFake.NowPlayingCalled).To(BeFalse()) // Should not be called since plugin was removed
Consistently(func() bool { return pluginFake.GetNowPlayingCalled() }).Should(BeFalse())
}) })
It("calls both builtin and plugin scrobblers for NowPlaying", func() { It("calls both builtin and plugin scrobblers for NowPlaying", func() {
fake.NowPlayingCalled = false fake.nowPlayingCalled.Store(false)
pluginFake.NowPlayingCalled = false pluginFake.nowPlayingCalled.Store(false)
err := tracker.NowPlaying(ctx, "player-1", "player-one", "123", 0) err := tracker.NowPlaying(ctx, "player-1", "player-one", "123", 0)
Expect(err).ToNot(HaveOccurred()) Expect(err).ToNot(HaveOccurred())
Expect(fake.NowPlayingCalled).To(BeTrue()) Eventually(func() bool { return fake.GetNowPlayingCalled() }).Should(BeTrue())
Expect(pluginFake.NowPlayingCalled).To(BeTrue()) Eventually(func() bool { return pluginFake.GetNowPlayingCalled() }).Should(BeTrue())
}) })
It("calls plugin scrobbler for Submit", func() { It("calls plugin scrobbler for Submit", func() {
@@ -359,7 +377,7 @@ var _ = Describe("PlayTracker", func() {
It("calls Stop on scrobblers when removing them", func() { It("calls Stop on scrobblers when removing them", func() {
// Change the plugin names to simulate a plugin being removed // Change the plugin names to simulate a plugin being removed
mockPlugin.names = []string{} mockPlugin.SetNames([]string{})
// Call refreshPluginScrobblers which should detect the removed plugin // Call refreshPluginScrobblers which should detect the removed plugin
pTracker.refreshPluginScrobblers() pTracker.refreshPluginScrobblers()
@@ -375,32 +393,51 @@ var _ = Describe("PlayTracker", func() {
type fakeScrobbler struct { type fakeScrobbler struct {
Authorized bool Authorized bool
NowPlayingCalled bool nowPlayingCalled atomic.Bool
ScrobbleCalled atomic.Bool ScrobbleCalled atomic.Bool
UserID string userID atomic.Pointer[string]
Track *model.MediaFile track atomic.Pointer[model.MediaFile]
Position int position atomic.Int32
LastScrobble atomic.Pointer[Scrobble] LastScrobble atomic.Pointer[Scrobble]
Error error Error error
} }
func (f *fakeScrobbler) GetNowPlayingCalled() bool {
return f.nowPlayingCalled.Load()
}
func (f *fakeScrobbler) GetUserID() string {
if p := f.userID.Load(); p != nil {
return *p
}
return ""
}
func (f *fakeScrobbler) GetTrack() *model.MediaFile {
return f.track.Load()
}
func (f *fakeScrobbler) GetPosition() int {
return int(f.position.Load())
}
func (f *fakeScrobbler) IsAuthorized(ctx context.Context, userId string) bool { func (f *fakeScrobbler) IsAuthorized(ctx context.Context, userId string) bool {
return f.Error == nil && f.Authorized return f.Error == nil && f.Authorized
} }
func (f *fakeScrobbler) NowPlaying(ctx context.Context, userId string, track *model.MediaFile, position int) error { func (f *fakeScrobbler) NowPlaying(ctx context.Context, userId string, track *model.MediaFile, position int) error {
f.NowPlayingCalled = true f.nowPlayingCalled.Store(true)
if f.Error != nil { if f.Error != nil {
return f.Error return f.Error
} }
f.UserID = userId f.userID.Store(&userId)
f.Track = track f.track.Store(track)
f.Position = position f.position.Store(int32(position))
return nil return nil
} }
func (f *fakeScrobbler) Scrobble(ctx context.Context, userId string, s Scrobble) error { func (f *fakeScrobbler) Scrobble(ctx context.Context, userId string, s Scrobble) error {
f.UserID = userId f.userID.Store(&userId)
f.LastScrobble.Store(&s) f.LastScrobble.Store(&s)
f.ScrobbleCalled.Store(true) f.ScrobbleCalled.Store(true)
if f.Error != nil { if f.Error != nil {