mirror of
https://github.com/navidrome/navidrome.git
synced 2025-12-23 23:18:05 -05:00
feat(ui): implement new event stream connection logic
Added a new event stream connection method to enhance the handling of server events. This includes a reconnect mechanism for improved reliability in case of connection errors. The configuration now allows toggling the new event stream feature via `devNewEventStream`. Additionally, tests were added to ensure the new functionality works as expected, including reconnection behavior after an error. Signed-off-by: Deluan <deluan@navidrome.org>
This commit is contained in:
@@ -116,6 +116,7 @@ type configOptions struct {
|
||||
DevSidebarPlaylists bool
|
||||
DevShowArtistPage bool
|
||||
DevUIShowConfig bool
|
||||
DevNewEventStream bool
|
||||
DevOffsetOptimize int
|
||||
DevArtworkMaxRequests int
|
||||
DevArtworkThrottleBacklogLimit int
|
||||
@@ -586,6 +587,7 @@ func setViperDefaults() {
|
||||
viper.SetDefault("devsidebarplaylists", true)
|
||||
viper.SetDefault("devshowartistpage", true)
|
||||
viper.SetDefault("devuishowconfig", true)
|
||||
viper.SetDefault("devneweventstream", true)
|
||||
viper.SetDefault("devoffsetoptimize", 50000)
|
||||
viper.SetDefault("devartworkmaxrequests", max(2, runtime.NumCPU()/3))
|
||||
viper.SetDefault("devartworkthrottlebackloglimit", consts.RequestThrottleBacklogLimit)
|
||||
|
||||
@@ -67,6 +67,7 @@ func serveIndex(ds model.DataStore, fs fs.FS, shareInfo *model.Share) http.Handl
|
||||
"lastFMEnabled": conf.Server.LastFM.Enabled,
|
||||
"devShowArtistPage": conf.Server.DevShowArtistPage,
|
||||
"devUIShowConfig": conf.Server.DevUIShowConfig,
|
||||
"devNewEventStream": conf.Server.DevNewEventStream,
|
||||
"listenBrainzEnabled": conf.Server.ListenBrainz.Enabled,
|
||||
"enableExternalServices": conf.Server.EnableExternalServices,
|
||||
"enableReplayGain": conf.Server.EnableReplayGain,
|
||||
|
||||
@@ -102,6 +102,7 @@ var _ = Describe("serveIndex", func() {
|
||||
Entry("defaultDownsamplingFormat", func() { conf.Server.DefaultDownsamplingFormat = "mp3" }, "defaultDownsamplingFormat", "mp3"),
|
||||
Entry("enableUserEditing", func() { conf.Server.EnableUserEditing = false }, "enableUserEditing", false),
|
||||
Entry("enableSharing", func() { conf.Server.EnableSharing = true }, "enableSharing", true),
|
||||
Entry("devNewEventStream", func() { conf.Server.DevNewEventStream = true }, "devNewEventStream", true),
|
||||
)
|
||||
|
||||
DescribeTable("sets other UI configuration values",
|
||||
|
||||
@@ -32,6 +32,7 @@ const defaultConfig = {
|
||||
enableNowPlaying: true,
|
||||
devShowArtistPage: true,
|
||||
devUIShowConfig: true,
|
||||
devNewEventStream: false,
|
||||
enableReplayGain: true,
|
||||
defaultDownsamplingFormat: 'opus',
|
||||
publicBaseUrl: '/share',
|
||||
|
||||
@@ -12,6 +12,49 @@ const newEventStream = async () => {
|
||||
return new EventSource(url)
|
||||
}
|
||||
|
||||
let eventStream
|
||||
let reconnectTimer
|
||||
const RECONNECT_DELAY = 5000
|
||||
|
||||
const setupHandlers = (stream, dispatchFn) => {
|
||||
stream.addEventListener('serverStart', eventHandler(dispatchFn))
|
||||
stream.addEventListener('scanStatus', throttledEventHandler(dispatchFn))
|
||||
stream.addEventListener('refreshResource', eventHandler(dispatchFn))
|
||||
if (config.enableNowPlaying) {
|
||||
stream.addEventListener('nowPlayingCount', eventHandler(dispatchFn))
|
||||
}
|
||||
stream.addEventListener('keepAlive', eventHandler(dispatchFn))
|
||||
stream.onerror = (e) => {
|
||||
// eslint-disable-next-line no-console
|
||||
console.log('EventStream error', e)
|
||||
dispatchFn(serverDown())
|
||||
if (stream) stream.close()
|
||||
scheduleReconnect(dispatchFn)
|
||||
}
|
||||
}
|
||||
|
||||
const scheduleReconnect = (dispatchFn) => {
|
||||
if (!reconnectTimer) {
|
||||
reconnectTimer = setTimeout(() => {
|
||||
reconnectTimer = null
|
||||
connect(dispatchFn)
|
||||
}, RECONNECT_DELAY)
|
||||
}
|
||||
}
|
||||
|
||||
const connect = async (dispatchFn) => {
|
||||
try {
|
||||
const stream = await newEventStream()
|
||||
eventStream = stream
|
||||
setupHandlers(stream, dispatchFn)
|
||||
return stream
|
||||
} catch (e) {
|
||||
// eslint-disable-next-line no-console
|
||||
console.log(`Error connecting to server:`, e)
|
||||
scheduleReconnect(dispatchFn)
|
||||
}
|
||||
}
|
||||
|
||||
const eventHandler = (dispatchFn) => (event) => {
|
||||
const data = JSON.parse(event.data)
|
||||
if (event.type !== 'keepAlive') {
|
||||
@@ -22,10 +65,7 @@ const eventHandler = (dispatchFn) => (event) => {
|
||||
const throttledEventHandler = (dispatchFn) =>
|
||||
throttle(eventHandler(dispatchFn), 100, { trailing: true })
|
||||
|
||||
const startEventStream = async (dispatchFn) => {
|
||||
if (!localStorage.getItem('is-authenticated')) {
|
||||
return Promise.resolve()
|
||||
}
|
||||
const startEventStreamLegacy = async (dispatchFn) => {
|
||||
return newEventStream()
|
||||
.then((newStream) => {
|
||||
newStream.addEventListener('serverStart', eventHandler(dispatchFn))
|
||||
@@ -51,4 +91,22 @@ const startEventStream = async (dispatchFn) => {
|
||||
})
|
||||
}
|
||||
|
||||
const startEventStreamNew = async (dispatchFn) => {
|
||||
if (eventStream) {
|
||||
eventStream.close()
|
||||
eventStream = null
|
||||
}
|
||||
return connect(dispatchFn)
|
||||
}
|
||||
|
||||
const startEventStream = async (dispatchFn) => {
|
||||
if (!localStorage.getItem('is-authenticated')) {
|
||||
return Promise.resolve()
|
||||
}
|
||||
if (config.devNewEventStream) {
|
||||
return startEventStreamNew(dispatchFn)
|
||||
}
|
||||
return startEventStreamLegacy(dispatchFn)
|
||||
}
|
||||
|
||||
export { startEventStream }
|
||||
|
||||
49
ui/src/eventStream.test.js
Normal file
49
ui/src/eventStream.test.js
Normal file
@@ -0,0 +1,49 @@
|
||||
import { describe, it, beforeEach, vi, expect } from 'vitest'
|
||||
import { startEventStream } from './eventStream'
|
||||
import { serverDown } from './actions'
|
||||
import config from './config'
|
||||
|
||||
class MockEventSource {
|
||||
constructor(url) {
|
||||
this.url = url
|
||||
this.readyState = 1
|
||||
this.listeners = {}
|
||||
this.onerror = null
|
||||
}
|
||||
addEventListener(type, handler) {
|
||||
this.listeners[type] = handler
|
||||
}
|
||||
close() {
|
||||
this.readyState = 2
|
||||
}
|
||||
}
|
||||
|
||||
describe('startEventStream', () => {
|
||||
vi.useFakeTimers()
|
||||
let dispatch
|
||||
let instance
|
||||
|
||||
beforeEach(() => {
|
||||
dispatch = vi.fn()
|
||||
global.EventSource = vi.fn((url) => {
|
||||
instance = new MockEventSource(url)
|
||||
return instance
|
||||
})
|
||||
localStorage.setItem('is-authenticated', 'true')
|
||||
localStorage.setItem('token', 'abc')
|
||||
config.devNewEventStream = true
|
||||
})
|
||||
|
||||
afterEach(() => {
|
||||
config.devNewEventStream = false
|
||||
})
|
||||
|
||||
it('reconnects after an error', async () => {
|
||||
await startEventStream(dispatch)
|
||||
expect(global.EventSource).toHaveBeenCalledTimes(1)
|
||||
instance.onerror(new Event('error'))
|
||||
expect(dispatch).toHaveBeenCalledWith(serverDown())
|
||||
vi.advanceTimersByTime(5000)
|
||||
expect(global.EventSource).toHaveBeenCalledTimes(2)
|
||||
})
|
||||
})
|
||||
Reference in New Issue
Block a user