mirror of
https://github.com/navidrome/navidrome.git
synced 2025-12-23 23:18:05 -05:00
fix(plugins): add metrics on callbacks and improve plugin method calling (#4304)
* refactor: implement OnSchedulerCallback method in wasmSchedulerCallback Added the OnSchedulerCallback method to the wasmSchedulerCallback struct, enabling it to handle scheduler callback events. This method constructs a SchedulerCallbackRequest and invokes the corresponding plugin method, facilitating better integration with the scheduling system. The changes improve the plugin's ability to respond to scheduled events, enhancing overall functionality. Signed-off-by: Deluan <deluan@navidrome.org> * fix(plugins): update executeCallback method to use callMethod Modified the executeCallback method to accept an additional parameter, methodName, which specifies the callback method to be executed. This change ensures that the correct method is called for each WebSocket event, improving the accuracy of callback execution for plugins. Signed-off-by: Deluan <deluan@navidrome.org> * fix(plugins): capture OnInit metrics Signed-off-by: Deluan <deluan@navidrome.org> * fix(plugins): improve logging for metrics in callMethod Updated the logging statement in the callMethod function to include the elapsed time as a separate key in the log output. This change enhances the clarity of the logged metrics, making it easier to analyze the performance of plugin requests and troubleshoot any issues that may arise. Signed-off-by: Deluan <deluan@navidrome.org> * fix(plugins): enhance logging for schedule callback execution Signed-off-by: Deluan <deluan@navidrome.org> * refactor(server): streamline scrobbler stopping logic Refactored the logic for stopping scrobbler instances when they are removed. The new implementation introduces a `stoppableScrobbler` interface to simplify the type assertion process, allowing for a more concise and readable code structure. This change ensures that any scrobbler implementing the `Stop` method is properly stopped before removal, improving the overall reliability of the plugin management system. Signed-off-by: Deluan <deluan@navidrome.org> * fix(plugins): improve plugin lifecycle management and error handling Enhanced the plugin lifecycle management by implementing error handling in the OnInit method. The changes include the addition of specific error conditions that can be returned during plugin initialization, allowing for better management of plugin states. Additionally, the unregisterPlugin method was updated to ensure proper cleanup of plugins that fail to initialize, improving overall stability and reliability of the plugin system. Signed-off-by: Deluan <deluan@navidrome.org> * refactor(plugins): remove unused LoadAllPlugins and related methods Eliminated the LoadAllPlugins, LoadAllMediaAgents, and LoadAllScrobblers methods from the manager implementation as they were not utilized in the codebase. This cleanup reduces complexity and improves maintainability by removing redundant code, allowing for a more streamlined plugin management process. Signed-off-by: Deluan <deluan@navidrome.org> * fix(plugins): update logging configuration for plugins Configured logging for multiple plugins to remove timestamps and source file/line information, while adding specific prefixes for better identification. Signed-off-by: Deluan <deluan@navidrome.org> * fix(plugins): clear initialization state when unregistering a plugin Added functionality to clear the initialization state of a plugin in the lifecycle manager when it is unregistered. This change ensures that the lifecycle state is accurately maintained, preventing potential issues with plugins that may be re-registered after being unregistered. The new method `clearInitialized` was implemented to handle this state management. Signed-off-by: Deluan <deluan@navidrome.org> * test: add unit tests for convertError function, rename to checkErr Added comprehensive unit tests for the convertError function to ensure correct behavior across various scenarios, including handling nil responses, typed nils, and responses implementing errorResponse. These tests validate that the function returns the expected results without panicking and correctly wraps original errors when necessary. Signed-off-by: Deluan <deluan@navidrome.org> * fix(plugins): update plugin base implementation and method calls Refactored the plugin base implementation by renaming `wasmBasePlugin` to `baseCapability` across multiple files. Updated method calls in the `wasmMediaAgent`, `wasmSchedulerCallback`, and `wasmScrobblerPlugin` to align with the new base structure. These changes improve code clarity and maintainability by standardizing the plugin architecture, ensuring consistent usage of the base capabilities across different plugin types. Signed-off-by: Deluan <deluan@navidrome.org> * fix(discord): handle failed connections and improve heartbeat checks Added a new method to clean up failed connections, which cancels the heartbeat schedule, closes the WebSocket connection, and removes cache entries. Enhanced the heartbeat check to log failures and trigger the cleanup process on the first failure. These changes ensure better management of user connections and improve the overall reliability of the RPC system. Signed-off-by: Deluan <deluan@navidrome.org> --------- Signed-off-by: Deluan <deluan@navidrome.org>
This commit is contained in:
4
.github/workflows/validate-translations.sh
vendored
4
.github/workflows/validate-translations.sh
vendored
@@ -233,6 +233,4 @@ elif [[ "$VERBOSE" == "true" ]]; then
|
||||
echo -e "${GREEN}All translation files are structurally valid${NC}"
|
||||
fi
|
||||
|
||||
exit 0
|
||||
|
||||
# Contains AI-generated edits.
|
||||
exit 0
|
||||
@@ -138,23 +138,18 @@ func (p *playTracker) refreshPluginScrobblers() {
|
||||
}
|
||||
}
|
||||
|
||||
type stoppableScrobbler interface {
|
||||
Scrobbler
|
||||
Stop()
|
||||
}
|
||||
|
||||
// Process removals - remove plugins that no longer exist
|
||||
for name, scrobbler := range p.pluginScrobblers {
|
||||
if _, exists := current[name]; !exists {
|
||||
// Type assertion to access the Stop method
|
||||
// We need to ensure this works even with interface objects
|
||||
if bs, ok := scrobbler.(*bufferedScrobbler); ok {
|
||||
log.Debug("Stopping buffered scrobbler goroutine", "name", name)
|
||||
bs.Stop()
|
||||
} else {
|
||||
// For tests - try to see if this is a mock with a Stop method
|
||||
type stoppable interface {
|
||||
Stop()
|
||||
}
|
||||
if s, ok := scrobbler.(stoppable); ok {
|
||||
log.Debug("Stopping mock scrobbler", "name", name)
|
||||
s.Stop()
|
||||
}
|
||||
// If the scrobbler implements stoppableScrobbler, call Stop() before removing it
|
||||
if stoppable, ok := scrobbler.(stoppableScrobbler); ok {
|
||||
log.Debug("Stopping scrobbler", "name", name)
|
||||
stoppable.Stop()
|
||||
}
|
||||
delete(p.pluginScrobblers, name)
|
||||
}
|
||||
|
||||
@@ -17,7 +17,7 @@ func newWasmMediaAgent(wasmPath, pluginID string, m *managerImpl, runtime api.Wa
|
||||
return nil
|
||||
}
|
||||
return &wasmMediaAgent{
|
||||
wasmBasePlugin: newWasmBasePlugin[api.MetadataAgent, *api.MetadataAgentPlugin](
|
||||
baseCapability: newBaseCapability[api.MetadataAgent, *api.MetadataAgentPlugin](
|
||||
wasmPath,
|
||||
pluginID,
|
||||
CapabilityMetadataAgent,
|
||||
@@ -32,7 +32,7 @@ func newWasmMediaAgent(wasmPath, pluginID string, m *managerImpl, runtime api.Wa
|
||||
|
||||
// wasmMediaAgent adapts a MetadataAgent plugin to implement the agents.Interface
|
||||
type wasmMediaAgent struct {
|
||||
*wasmBasePlugin[api.MetadataAgent, *api.MetadataAgentPlugin]
|
||||
*baseCapability[api.MetadataAgent, *api.MetadataAgentPlugin]
|
||||
}
|
||||
|
||||
func (w *wasmMediaAgent) AgentName() string {
|
||||
@@ -49,108 +49,108 @@ func (w *wasmMediaAgent) mapError(err error) error {
|
||||
// Album-related methods
|
||||
|
||||
func (w *wasmMediaAgent) GetAlbumInfo(ctx context.Context, name, artist, mbid string) (*agents.AlbumInfo, error) {
|
||||
return callMethod(ctx, w, "GetAlbumInfo", func(inst api.MetadataAgent) (*agents.AlbumInfo, error) {
|
||||
res, err := inst.GetAlbumInfo(ctx, &api.AlbumInfoRequest{Name: name, Artist: artist, Mbid: mbid})
|
||||
if err != nil {
|
||||
return nil, w.mapError(err)
|
||||
}
|
||||
if res == nil || res.Info == nil {
|
||||
return nil, agents.ErrNotFound
|
||||
}
|
||||
info := res.Info
|
||||
return &agents.AlbumInfo{
|
||||
Name: info.Name,
|
||||
MBID: info.Mbid,
|
||||
Description: info.Description,
|
||||
URL: info.Url,
|
||||
}, nil
|
||||
res, err := callMethod(ctx, w, "GetAlbumInfo", func(inst api.MetadataAgent) (*api.AlbumInfoResponse, error) {
|
||||
return inst.GetAlbumInfo(ctx, &api.AlbumInfoRequest{Name: name, Artist: artist, Mbid: mbid})
|
||||
})
|
||||
if err != nil {
|
||||
return nil, w.mapError(err)
|
||||
}
|
||||
if res == nil || res.Info == nil {
|
||||
return nil, agents.ErrNotFound
|
||||
}
|
||||
info := res.Info
|
||||
return &agents.AlbumInfo{
|
||||
Name: info.Name,
|
||||
MBID: info.Mbid,
|
||||
Description: info.Description,
|
||||
URL: info.Url,
|
||||
}, nil
|
||||
}
|
||||
|
||||
func (w *wasmMediaAgent) GetAlbumImages(ctx context.Context, name, artist, mbid string) ([]agents.ExternalImage, error) {
|
||||
return callMethod(ctx, w, "GetAlbumImages", func(inst api.MetadataAgent) ([]agents.ExternalImage, error) {
|
||||
res, err := inst.GetAlbumImages(ctx, &api.AlbumImagesRequest{Name: name, Artist: artist, Mbid: mbid})
|
||||
if err != nil {
|
||||
return nil, w.mapError(err)
|
||||
}
|
||||
return convertExternalImages(res.Images), nil
|
||||
res, err := callMethod(ctx, w, "GetAlbumImages", func(inst api.MetadataAgent) (*api.AlbumImagesResponse, error) {
|
||||
return inst.GetAlbumImages(ctx, &api.AlbumImagesRequest{Name: name, Artist: artist, Mbid: mbid})
|
||||
})
|
||||
if err != nil {
|
||||
return nil, w.mapError(err)
|
||||
}
|
||||
return convertExternalImages(res.Images), nil
|
||||
}
|
||||
|
||||
// Artist-related methods
|
||||
|
||||
func (w *wasmMediaAgent) GetArtistMBID(ctx context.Context, id string, name string) (string, error) {
|
||||
return callMethod(ctx, w, "GetArtistMBID", func(inst api.MetadataAgent) (string, error) {
|
||||
res, err := inst.GetArtistMBID(ctx, &api.ArtistMBIDRequest{Id: id, Name: name})
|
||||
if err != nil {
|
||||
return "", w.mapError(err)
|
||||
}
|
||||
return res.GetMbid(), nil
|
||||
res, err := callMethod(ctx, w, "GetArtistMBID", func(inst api.MetadataAgent) (*api.ArtistMBIDResponse, error) {
|
||||
return inst.GetArtistMBID(ctx, &api.ArtistMBIDRequest{Id: id, Name: name})
|
||||
})
|
||||
if err != nil {
|
||||
return "", w.mapError(err)
|
||||
}
|
||||
return res.GetMbid(), nil
|
||||
}
|
||||
|
||||
func (w *wasmMediaAgent) GetArtistURL(ctx context.Context, id, name, mbid string) (string, error) {
|
||||
return callMethod(ctx, w, "GetArtistURL", func(inst api.MetadataAgent) (string, error) {
|
||||
res, err := inst.GetArtistURL(ctx, &api.ArtistURLRequest{Id: id, Name: name, Mbid: mbid})
|
||||
if err != nil {
|
||||
return "", w.mapError(err)
|
||||
}
|
||||
return res.GetUrl(), nil
|
||||
res, err := callMethod(ctx, w, "GetArtistURL", func(inst api.MetadataAgent) (*api.ArtistURLResponse, error) {
|
||||
return inst.GetArtistURL(ctx, &api.ArtistURLRequest{Id: id, Name: name, Mbid: mbid})
|
||||
})
|
||||
if err != nil {
|
||||
return "", w.mapError(err)
|
||||
}
|
||||
return res.GetUrl(), nil
|
||||
}
|
||||
|
||||
func (w *wasmMediaAgent) GetArtistBiography(ctx context.Context, id, name, mbid string) (string, error) {
|
||||
return callMethod(ctx, w, "GetArtistBiography", func(inst api.MetadataAgent) (string, error) {
|
||||
res, err := inst.GetArtistBiography(ctx, &api.ArtistBiographyRequest{Id: id, Name: name, Mbid: mbid})
|
||||
if err != nil {
|
||||
return "", w.mapError(err)
|
||||
}
|
||||
return res.GetBiography(), nil
|
||||
res, err := callMethod(ctx, w, "GetArtistBiography", func(inst api.MetadataAgent) (*api.ArtistBiographyResponse, error) {
|
||||
return inst.GetArtistBiography(ctx, &api.ArtistBiographyRequest{Id: id, Name: name, Mbid: mbid})
|
||||
})
|
||||
if err != nil {
|
||||
return "", w.mapError(err)
|
||||
}
|
||||
return res.GetBiography(), nil
|
||||
}
|
||||
|
||||
func (w *wasmMediaAgent) GetSimilarArtists(ctx context.Context, id, name, mbid string, limit int) ([]agents.Artist, error) {
|
||||
return callMethod(ctx, w, "GetSimilarArtists", func(inst api.MetadataAgent) ([]agents.Artist, error) {
|
||||
resp, err := inst.GetSimilarArtists(ctx, &api.ArtistSimilarRequest{Id: id, Name: name, Mbid: mbid, Limit: int32(limit)})
|
||||
if err != nil {
|
||||
return nil, w.mapError(err)
|
||||
}
|
||||
artists := make([]agents.Artist, 0, len(resp.GetArtists()))
|
||||
for _, a := range resp.GetArtists() {
|
||||
artists = append(artists, agents.Artist{
|
||||
Name: a.GetName(),
|
||||
MBID: a.GetMbid(),
|
||||
})
|
||||
}
|
||||
return artists, nil
|
||||
resp, err := callMethod(ctx, w, "GetSimilarArtists", func(inst api.MetadataAgent) (*api.ArtistSimilarResponse, error) {
|
||||
return inst.GetSimilarArtists(ctx, &api.ArtistSimilarRequest{Id: id, Name: name, Mbid: mbid, Limit: int32(limit)})
|
||||
})
|
||||
if err != nil {
|
||||
return nil, w.mapError(err)
|
||||
}
|
||||
artists := make([]agents.Artist, 0, len(resp.GetArtists()))
|
||||
for _, a := range resp.GetArtists() {
|
||||
artists = append(artists, agents.Artist{
|
||||
Name: a.GetName(),
|
||||
MBID: a.GetMbid(),
|
||||
})
|
||||
}
|
||||
return artists, nil
|
||||
}
|
||||
|
||||
func (w *wasmMediaAgent) GetArtistImages(ctx context.Context, id, name, mbid string) ([]agents.ExternalImage, error) {
|
||||
return callMethod(ctx, w, "GetArtistImages", func(inst api.MetadataAgent) ([]agents.ExternalImage, error) {
|
||||
res, err := inst.GetArtistImages(ctx, &api.ArtistImageRequest{Id: id, Name: name, Mbid: mbid})
|
||||
if err != nil {
|
||||
return nil, w.mapError(err)
|
||||
}
|
||||
return convertExternalImages(res.Images), nil
|
||||
resp, err := callMethod(ctx, w, "GetArtistImages", func(inst api.MetadataAgent) (*api.ArtistImageResponse, error) {
|
||||
return inst.GetArtistImages(ctx, &api.ArtistImageRequest{Id: id, Name: name, Mbid: mbid})
|
||||
})
|
||||
if err != nil {
|
||||
return nil, w.mapError(err)
|
||||
}
|
||||
return convertExternalImages(resp.Images), nil
|
||||
}
|
||||
|
||||
func (w *wasmMediaAgent) GetArtistTopSongs(ctx context.Context, id, artistName, mbid string, count int) ([]agents.Song, error) {
|
||||
return callMethod(ctx, w, "GetArtistTopSongs", func(inst api.MetadataAgent) ([]agents.Song, error) {
|
||||
resp, err := inst.GetArtistTopSongs(ctx, &api.ArtistTopSongsRequest{Id: id, ArtistName: artistName, Mbid: mbid, Count: int32(count)})
|
||||
if err != nil {
|
||||
return nil, w.mapError(err)
|
||||
}
|
||||
songs := make([]agents.Song, 0, len(resp.GetSongs()))
|
||||
for _, s := range resp.GetSongs() {
|
||||
songs = append(songs, agents.Song{
|
||||
Name: s.GetName(),
|
||||
MBID: s.GetMbid(),
|
||||
})
|
||||
}
|
||||
return songs, nil
|
||||
resp, err := callMethod(ctx, w, "GetArtistTopSongs", func(inst api.MetadataAgent) (*api.ArtistTopSongsResponse, error) {
|
||||
return inst.GetArtistTopSongs(ctx, &api.ArtistTopSongsRequest{Id: id, ArtistName: artistName, Mbid: mbid, Count: int32(count)})
|
||||
})
|
||||
if err != nil {
|
||||
return nil, w.mapError(err)
|
||||
}
|
||||
songs := make([]agents.Song, 0, len(resp.GetSongs()))
|
||||
for _, s := range resp.GetSongs() {
|
||||
songs = append(songs, agents.Song{
|
||||
Name: s.GetName(),
|
||||
MBID: s.GetMbid(),
|
||||
})
|
||||
}
|
||||
return songs, nil
|
||||
}
|
||||
|
||||
// Helper function to convert ExternalImage objects from the API to the agents package
|
||||
|
||||
@@ -7,6 +7,7 @@ import (
|
||||
"github.com/navidrome/navidrome/conf"
|
||||
"github.com/navidrome/navidrome/conf/configtest"
|
||||
"github.com/navidrome/navidrome/core/agents"
|
||||
"github.com/navidrome/navidrome/core/metrics"
|
||||
"github.com/navidrome/navidrome/plugins/api"
|
||||
. "github.com/onsi/ginkgo/v2"
|
||||
. "github.com/onsi/gomega"
|
||||
@@ -23,7 +24,7 @@ var _ = Describe("Adapter Media Agent", func() {
|
||||
DeferCleanup(configtest.SetupConfig())
|
||||
conf.Server.Plugins.Folder = testDataDir
|
||||
|
||||
mgr = createManager(nil, nil)
|
||||
mgr = createManager(nil, metrics.NewNoopInstance())
|
||||
mgr.ScanPlugins()
|
||||
})
|
||||
|
||||
|
||||
@@ -16,7 +16,7 @@ func newWasmSchedulerCallback(wasmPath, pluginID string, m *managerImpl, runtime
|
||||
return nil
|
||||
}
|
||||
return &wasmSchedulerCallback{
|
||||
wasmBasePlugin: newWasmBasePlugin[api.SchedulerCallback, *api.SchedulerCallbackPlugin](
|
||||
baseCapability: newBaseCapability[api.SchedulerCallback, *api.SchedulerCallbackPlugin](
|
||||
wasmPath,
|
||||
pluginID,
|
||||
CapabilitySchedulerCallback,
|
||||
@@ -31,5 +31,16 @@ func newWasmSchedulerCallback(wasmPath, pluginID string, m *managerImpl, runtime
|
||||
|
||||
// wasmSchedulerCallback adapts a SchedulerCallback plugin
|
||||
type wasmSchedulerCallback struct {
|
||||
*wasmBasePlugin[api.SchedulerCallback, *api.SchedulerCallbackPlugin]
|
||||
*baseCapability[api.SchedulerCallback, *api.SchedulerCallbackPlugin]
|
||||
}
|
||||
|
||||
func (w *wasmSchedulerCallback) OnSchedulerCallback(ctx context.Context, scheduleID string, payload []byte, isRecurring bool) error {
|
||||
_, err := callMethod(ctx, w, "OnSchedulerCallback", func(inst api.SchedulerCallback) (*api.SchedulerCallbackResponse, error) {
|
||||
return inst.OnSchedulerCallback(ctx, &api.SchedulerCallbackRequest{
|
||||
ScheduleId: scheduleID,
|
||||
Payload: payload,
|
||||
IsRecurring: isRecurring,
|
||||
})
|
||||
})
|
||||
return err
|
||||
}
|
||||
|
||||
@@ -19,7 +19,7 @@ func newWasmScrobblerPlugin(wasmPath, pluginID string, m *managerImpl, runtime a
|
||||
return nil
|
||||
}
|
||||
return &wasmScrobblerPlugin{
|
||||
wasmBasePlugin: newWasmBasePlugin[api.Scrobbler, *api.ScrobblerPlugin](
|
||||
baseCapability: newBaseCapability[api.Scrobbler, *api.ScrobblerPlugin](
|
||||
wasmPath,
|
||||
pluginID,
|
||||
CapabilityScrobbler,
|
||||
@@ -33,7 +33,7 @@ func newWasmScrobblerPlugin(wasmPath, pluginID string, m *managerImpl, runtime a
|
||||
}
|
||||
|
||||
type wasmScrobblerPlugin struct {
|
||||
*wasmBasePlugin[api.Scrobbler, *api.ScrobblerPlugin]
|
||||
*baseCapability[api.Scrobbler, *api.ScrobblerPlugin]
|
||||
}
|
||||
|
||||
func (w *wasmScrobblerPlugin) IsAuthorized(ctx context.Context, userId string) bool {
|
||||
@@ -44,21 +44,16 @@ func (w *wasmScrobblerPlugin) IsAuthorized(ctx context.Context, userId string) b
|
||||
username = u.UserName
|
||||
}
|
||||
}
|
||||
|
||||
result, err := callMethod(ctx, w, "IsAuthorized", func(inst api.Scrobbler) (bool, error) {
|
||||
resp, err := inst.IsAuthorized(ctx, &api.ScrobblerIsAuthorizedRequest{
|
||||
resp, err := callMethod(ctx, w, "IsAuthorized", func(inst api.Scrobbler) (*api.ScrobblerIsAuthorizedResponse, error) {
|
||||
return inst.IsAuthorized(ctx, &api.ScrobblerIsAuthorizedRequest{
|
||||
UserId: userId,
|
||||
Username: username,
|
||||
})
|
||||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
if resp.Error != "" {
|
||||
return false, nil
|
||||
}
|
||||
return resp.Authorized, nil
|
||||
})
|
||||
return err == nil && result
|
||||
if err != nil {
|
||||
log.Warn("Error calling IsAuthorized", "userId", userId, "pluginID", w.id, err)
|
||||
}
|
||||
return err == nil && resp.Authorized
|
||||
}
|
||||
|
||||
func (w *wasmScrobblerPlugin) NowPlaying(ctx context.Context, userId string, track *model.MediaFile, position int) error {
|
||||
@@ -70,25 +65,7 @@ func (w *wasmScrobblerPlugin) NowPlaying(ctx context.Context, userId string, tra
|
||||
}
|
||||
}
|
||||
|
||||
artists := make([]*api.Artist, 0, len(track.Participants[model.RoleArtist]))
|
||||
for _, a := range track.Participants[model.RoleArtist] {
|
||||
artists = append(artists, &api.Artist{Name: a.Name, Mbid: a.MbzArtistID})
|
||||
}
|
||||
albumArtists := make([]*api.Artist, 0, len(track.Participants[model.RoleAlbumArtist]))
|
||||
for _, a := range track.Participants[model.RoleAlbumArtist] {
|
||||
albumArtists = append(albumArtists, &api.Artist{Name: a.Name, Mbid: a.MbzArtistID})
|
||||
}
|
||||
trackInfo := &api.TrackInfo{
|
||||
Id: track.ID,
|
||||
Mbid: track.MbzRecordingID,
|
||||
Name: track.Title,
|
||||
Album: track.Album,
|
||||
AlbumMbid: track.MbzAlbumID,
|
||||
Artists: artists,
|
||||
AlbumArtists: albumArtists,
|
||||
Length: int32(track.Duration),
|
||||
Position: int32(position),
|
||||
}
|
||||
trackInfo := w.toTrackInfo(track, position)
|
||||
_, err := callMethod(ctx, w, "NowPlaying", func(inst api.Scrobbler) (struct{}, error) {
|
||||
resp, err := inst.NowPlaying(ctx, &api.ScrobblerNowPlayingRequest{
|
||||
UserId: userId,
|
||||
@@ -115,26 +92,7 @@ func (w *wasmScrobblerPlugin) Scrobble(ctx context.Context, userId string, s scr
|
||||
username = u.UserName
|
||||
}
|
||||
}
|
||||
|
||||
track := &s.MediaFile
|
||||
artists := make([]*api.Artist, 0, len(track.Participants[model.RoleArtist]))
|
||||
for _, a := range track.Participants[model.RoleArtist] {
|
||||
artists = append(artists, &api.Artist{Name: a.Name, Mbid: a.MbzArtistID})
|
||||
}
|
||||
albumArtists := make([]*api.Artist, 0, len(track.Participants[model.RoleAlbumArtist]))
|
||||
for _, a := range track.Participants[model.RoleAlbumArtist] {
|
||||
albumArtists = append(albumArtists, &api.Artist{Name: a.Name, Mbid: a.MbzArtistID})
|
||||
}
|
||||
trackInfo := &api.TrackInfo{
|
||||
Id: track.ID,
|
||||
Mbid: track.MbzRecordingID,
|
||||
Name: track.Title,
|
||||
Album: track.Album,
|
||||
AlbumMbid: track.MbzAlbumID,
|
||||
Artists: artists,
|
||||
AlbumArtists: albumArtists,
|
||||
Length: int32(track.Duration),
|
||||
}
|
||||
trackInfo := w.toTrackInfo(&s.MediaFile, 0)
|
||||
_, err := callMethod(ctx, w, "Scrobble", func(inst api.Scrobbler) (struct{}, error) {
|
||||
resp, err := inst.Scrobble(ctx, &api.ScrobblerScrobbleRequest{
|
||||
UserId: userId,
|
||||
@@ -152,3 +110,27 @@ func (w *wasmScrobblerPlugin) Scrobble(ctx context.Context, userId string, s scr
|
||||
})
|
||||
return err
|
||||
}
|
||||
|
||||
func (w *wasmScrobblerPlugin) toTrackInfo(track *model.MediaFile, position int) *api.TrackInfo {
|
||||
artists := make([]*api.Artist, 0, len(track.Participants[model.RoleArtist]))
|
||||
|
||||
for _, a := range track.Participants[model.RoleArtist] {
|
||||
artists = append(artists, &api.Artist{Name: a.Name, Mbid: a.MbzArtistID})
|
||||
}
|
||||
albumArtists := make([]*api.Artist, 0, len(track.Participants[model.RoleAlbumArtist]))
|
||||
for _, a := range track.Participants[model.RoleAlbumArtist] {
|
||||
albumArtists = append(albumArtists, &api.Artist{Name: a.Name, Mbid: a.MbzArtistID})
|
||||
}
|
||||
trackInfo := &api.TrackInfo{
|
||||
Id: track.ID,
|
||||
Mbid: track.MbzRecordingID,
|
||||
Name: track.Title,
|
||||
Album: track.Album,
|
||||
AlbumMbid: track.MbzAlbumID,
|
||||
Artists: artists,
|
||||
AlbumArtists: albumArtists,
|
||||
Length: int32(track.Duration),
|
||||
Position: int32(position),
|
||||
}
|
||||
return trackInfo
|
||||
}
|
||||
|
||||
@@ -16,7 +16,7 @@ func newWasmWebSocketCallback(wasmPath, pluginID string, m *managerImpl, runtime
|
||||
return nil
|
||||
}
|
||||
return &wasmWebSocketCallback{
|
||||
wasmBasePlugin: newWasmBasePlugin[api.WebSocketCallback, *api.WebSocketCallbackPlugin](
|
||||
baseCapability: newBaseCapability[api.WebSocketCallback, *api.WebSocketCallbackPlugin](
|
||||
wasmPath,
|
||||
pluginID,
|
||||
CapabilityWebSocketCallback,
|
||||
@@ -31,5 +31,5 @@ func newWasmWebSocketCallback(wasmPath, pluginID string, m *managerImpl, runtime
|
||||
|
||||
// wasmWebSocketCallback adapts a WebSocketCallback plugin
|
||||
type wasmWebSocketCallback struct {
|
||||
*wasmBasePlugin[api.WebSocketCallback, *api.WebSocketCallbackPlugin]
|
||||
*baseCapability[api.WebSocketCallback, *api.WebSocketCallbackPlugin]
|
||||
}
|
||||
|
||||
143
plugins/base_capability.go
Normal file
143
plugins/base_capability.go
Normal file
@@ -0,0 +1,143 @@
|
||||
package plugins
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"time"
|
||||
|
||||
"github.com/navidrome/navidrome/core/metrics"
|
||||
"github.com/navidrome/navidrome/log"
|
||||
"github.com/navidrome/navidrome/model/id"
|
||||
"github.com/navidrome/navidrome/plugins/api"
|
||||
)
|
||||
|
||||
// newBaseCapability creates a new instance of baseCapability with the required parameters.
|
||||
func newBaseCapability[S any, P any](wasmPath, id, capability string, m metrics.Metrics, loader P, loadFunc loaderFunc[S, P]) *baseCapability[S, P] {
|
||||
return &baseCapability[S, P]{
|
||||
wasmPath: wasmPath,
|
||||
id: id,
|
||||
capability: capability,
|
||||
loader: loader,
|
||||
loadFunc: loadFunc,
|
||||
metrics: m,
|
||||
}
|
||||
}
|
||||
|
||||
// LoaderFunc is a generic function type that loads a plugin instance.
|
||||
type loaderFunc[S any, P any] func(ctx context.Context, loader P, path string) (S, error)
|
||||
|
||||
// baseCapability is a generic base implementation for WASM plugins.
|
||||
// S is the capability interface type and P is the plugin loader type.
|
||||
type baseCapability[S any, P any] struct {
|
||||
wasmPath string
|
||||
id string
|
||||
capability string
|
||||
loader P
|
||||
loadFunc loaderFunc[S, P]
|
||||
metrics metrics.Metrics
|
||||
}
|
||||
|
||||
func (w *baseCapability[S, P]) PluginID() string {
|
||||
return w.id
|
||||
}
|
||||
|
||||
func (w *baseCapability[S, P]) serviceName() string {
|
||||
return w.id + "_" + w.capability
|
||||
}
|
||||
|
||||
func (w *baseCapability[S, P]) getMetrics() metrics.Metrics {
|
||||
return w.metrics
|
||||
}
|
||||
|
||||
// getInstance loads a new plugin instance and returns a cleanup function.
|
||||
func (w *baseCapability[S, P]) getInstance(ctx context.Context, methodName string) (S, func(), error) {
|
||||
start := time.Now()
|
||||
// Add context metadata for tracing
|
||||
ctx = log.NewContext(ctx, "capability", w.serviceName(), "method", methodName)
|
||||
|
||||
inst, err := w.loadFunc(ctx, w.loader, w.wasmPath)
|
||||
if err != nil {
|
||||
var zero S
|
||||
return zero, func() {}, fmt.Errorf("baseCapability: failed to load instance for %s: %w", w.serviceName(), err)
|
||||
}
|
||||
// Add context metadata for tracing
|
||||
ctx = log.NewContext(ctx, "instanceID", getInstanceID(inst))
|
||||
log.Trace(ctx, "baseCapability: loaded instance", "elapsed", time.Since(start))
|
||||
return inst, func() {
|
||||
log.Trace(ctx, "baseCapability: finished using instance", "elapsed", time.Since(start))
|
||||
if closer, ok := any(inst).(interface{ Close(context.Context) error }); ok {
|
||||
_ = closer.Close(ctx)
|
||||
}
|
||||
}, nil
|
||||
}
|
||||
|
||||
type wasmPlugin[S any] interface {
|
||||
PluginID() string
|
||||
getInstance(ctx context.Context, methodName string) (S, func(), error)
|
||||
getMetrics() metrics.Metrics
|
||||
}
|
||||
|
||||
type errorMapper interface {
|
||||
mapError(err error) error
|
||||
}
|
||||
|
||||
func callMethod[S any, R any](ctx context.Context, wp WasmPlugin, methodName string, fn func(inst S) (R, error)) (R, error) {
|
||||
// Add a unique call ID to the context for tracing
|
||||
ctx = log.NewContext(ctx, "callID", id.NewRandom())
|
||||
var r R
|
||||
|
||||
p, ok := wp.(wasmPlugin[S])
|
||||
if !ok {
|
||||
log.Error(ctx, "callMethod: not a wasm plugin", "method", methodName, "pluginID", wp.PluginID())
|
||||
return r, fmt.Errorf("wasm plugin: not a wasm plugin: %s", wp.PluginID())
|
||||
}
|
||||
|
||||
inst, done, err := p.getInstance(ctx, methodName)
|
||||
if err != nil {
|
||||
return r, err
|
||||
}
|
||||
start := time.Now()
|
||||
defer done()
|
||||
r, err = checkErr(fn(inst))
|
||||
elapsed := time.Since(start)
|
||||
|
||||
if em, ok := any(p).(errorMapper); ok {
|
||||
err = em.mapError(err)
|
||||
}
|
||||
|
||||
if !errors.Is(err, api.ErrNotImplemented) {
|
||||
id := p.PluginID()
|
||||
isOk := err == nil
|
||||
metrics := p.getMetrics()
|
||||
if metrics != nil {
|
||||
metrics.RecordPluginRequest(ctx, id, methodName, isOk, elapsed.Milliseconds())
|
||||
log.Trace(ctx, "callMethod: sending metrics", "plugin", id, "method", methodName, "ok", isOk, "elapsed", elapsed)
|
||||
}
|
||||
}
|
||||
|
||||
return r, err
|
||||
}
|
||||
|
||||
// errorResponse is an interface that defines a method to retrieve an error message.
|
||||
// It is automatically implemented (generated) by all plugin responses that have an Error field
|
||||
type errorResponse interface {
|
||||
GetError() string
|
||||
}
|
||||
|
||||
// checkErr returns an updated error if the response implements errorResponse and contains an error message.
|
||||
// If the response is nil, it returns the original error. Otherwise, it wraps or creates an error as needed.
|
||||
func checkErr[T any](resp T, err error) (T, error) {
|
||||
if any(resp) == nil {
|
||||
return resp, err
|
||||
}
|
||||
respErr, ok := any(resp).(errorResponse)
|
||||
if ok && respErr.GetError() != "" {
|
||||
if err == nil {
|
||||
err = errors.New(respErr.GetError())
|
||||
} else {
|
||||
err = fmt.Errorf("%s: %w", respErr.GetError(), err)
|
||||
}
|
||||
}
|
||||
return resp, err
|
||||
}
|
||||
188
plugins/base_capability_test.go
Normal file
188
plugins/base_capability_test.go
Normal file
@@ -0,0 +1,188 @@
|
||||
package plugins
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
|
||||
. "github.com/onsi/ginkgo/v2"
|
||||
. "github.com/onsi/gomega"
|
||||
)
|
||||
|
||||
type nilInstance struct{}
|
||||
|
||||
var _ = Describe("baseCapability", func() {
|
||||
var ctx = context.Background()
|
||||
|
||||
It("should load instance using loadFunc", func() {
|
||||
called := false
|
||||
plugin := &baseCapability[*nilInstance, any]{
|
||||
wasmPath: "",
|
||||
id: "test",
|
||||
capability: "test",
|
||||
loadFunc: func(ctx context.Context, _ any, path string) (*nilInstance, error) {
|
||||
called = true
|
||||
return &nilInstance{}, nil
|
||||
},
|
||||
}
|
||||
inst, done, err := plugin.getInstance(ctx, "test")
|
||||
defer done()
|
||||
Expect(err).To(BeNil())
|
||||
Expect(inst).ToNot(BeNil())
|
||||
Expect(called).To(BeTrue())
|
||||
})
|
||||
})
|
||||
|
||||
var _ = Describe("checkErr", func() {
|
||||
Context("when resp is nil", func() {
|
||||
It("should return the original error unchanged", func() {
|
||||
var resp *testErrorResponse
|
||||
originalErr := errors.New("original error")
|
||||
|
||||
result, err := checkErr(resp, originalErr)
|
||||
|
||||
Expect(result).To(BeNil())
|
||||
Expect(err).To(Equal(originalErr))
|
||||
})
|
||||
|
||||
It("should return nil error when both resp and err are nil", func() {
|
||||
var resp *testErrorResponse
|
||||
|
||||
result, err := checkErr(resp, nil)
|
||||
|
||||
Expect(result).To(BeNil())
|
||||
Expect(err).To(BeNil())
|
||||
})
|
||||
})
|
||||
|
||||
Context("when resp is a typed nil that implements errorResponse", func() {
|
||||
It("should not panic and return original error", func() {
|
||||
var resp *testErrorResponse // typed nil
|
||||
originalErr := errors.New("original error")
|
||||
|
||||
// This should not panic
|
||||
result, err := checkErr(resp, originalErr)
|
||||
|
||||
Expect(result).To(BeNil())
|
||||
Expect(err).To(Equal(originalErr))
|
||||
})
|
||||
|
||||
It("should handle typed nil with nil error gracefully", func() {
|
||||
var resp *testErrorResponse // typed nil
|
||||
|
||||
// This should not panic
|
||||
result, err := checkErr(resp, nil)
|
||||
|
||||
Expect(result).To(BeNil())
|
||||
Expect(err).To(BeNil())
|
||||
})
|
||||
})
|
||||
|
||||
Context("when resp implements errorResponse with non-empty error", func() {
|
||||
It("should create new error when original error is nil", func() {
|
||||
resp := &testErrorResponse{errorMsg: "plugin error"}
|
||||
|
||||
result, err := checkErr(resp, nil)
|
||||
|
||||
Expect(result).To(Equal(resp))
|
||||
Expect(err).To(MatchError("plugin error"))
|
||||
})
|
||||
|
||||
It("should wrap original error when both exist", func() {
|
||||
resp := &testErrorResponse{errorMsg: "plugin error"}
|
||||
originalErr := errors.New("original error")
|
||||
|
||||
result, err := checkErr(resp, originalErr)
|
||||
|
||||
Expect(result).To(Equal(resp))
|
||||
Expect(err).To(MatchError("plugin error: original error"))
|
||||
})
|
||||
})
|
||||
|
||||
Context("when resp implements errorResponse with empty error", func() {
|
||||
It("should return original error unchanged", func() {
|
||||
resp := &testErrorResponse{errorMsg: ""}
|
||||
originalErr := errors.New("original error")
|
||||
|
||||
result, err := checkErr(resp, originalErr)
|
||||
|
||||
Expect(result).To(Equal(resp))
|
||||
Expect(err).To(Equal(originalErr))
|
||||
})
|
||||
|
||||
It("should return nil error when both are empty/nil", func() {
|
||||
resp := &testErrorResponse{errorMsg: ""}
|
||||
|
||||
result, err := checkErr(resp, nil)
|
||||
|
||||
Expect(result).To(Equal(resp))
|
||||
Expect(err).To(BeNil())
|
||||
})
|
||||
})
|
||||
|
||||
Context("when resp does not implement errorResponse", func() {
|
||||
It("should return original error unchanged", func() {
|
||||
resp := &testNonErrorResponse{data: "some data"}
|
||||
originalErr := errors.New("original error")
|
||||
|
||||
result, err := checkErr(resp, originalErr)
|
||||
|
||||
Expect(result).To(Equal(resp))
|
||||
Expect(err).To(Equal(originalErr))
|
||||
})
|
||||
|
||||
It("should return nil error when original error is nil", func() {
|
||||
resp := &testNonErrorResponse{data: "some data"}
|
||||
|
||||
result, err := checkErr(resp, nil)
|
||||
|
||||
Expect(result).To(Equal(resp))
|
||||
Expect(err).To(BeNil())
|
||||
})
|
||||
})
|
||||
|
||||
Context("when resp is a value type (not pointer)", func() {
|
||||
It("should handle value types that implement errorResponse", func() {
|
||||
resp := testValueErrorResponse{errorMsg: "value error"}
|
||||
originalErr := errors.New("original error")
|
||||
|
||||
result, err := checkErr(resp, originalErr)
|
||||
|
||||
Expect(result).To(Equal(resp))
|
||||
Expect(err).To(MatchError("value error: original error"))
|
||||
})
|
||||
|
||||
It("should handle value types with empty error", func() {
|
||||
resp := testValueErrorResponse{errorMsg: ""}
|
||||
originalErr := errors.New("original error")
|
||||
|
||||
result, err := checkErr(resp, originalErr)
|
||||
|
||||
Expect(result).To(Equal(resp))
|
||||
Expect(err).To(Equal(originalErr))
|
||||
})
|
||||
})
|
||||
})
|
||||
|
||||
// Test helper types
|
||||
type testErrorResponse struct {
|
||||
errorMsg string
|
||||
}
|
||||
|
||||
func (t *testErrorResponse) GetError() string {
|
||||
if t == nil {
|
||||
return "" // This is what would typically happen with a typed nil
|
||||
}
|
||||
return t.errorMsg
|
||||
}
|
||||
|
||||
type testNonErrorResponse struct {
|
||||
data string
|
||||
}
|
||||
|
||||
type testValueErrorResponse struct {
|
||||
errorMsg string
|
||||
}
|
||||
|
||||
func (t testValueErrorResponse) GetError() string {
|
||||
return t.errorMsg
|
||||
}
|
||||
@@ -4,11 +4,11 @@ This directory contains example plugins for Navidrome, intended for demonstratio
|
||||
|
||||
## Contents
|
||||
|
||||
- `wikimedia/`: Example plugin that retrieves artist information from Wikidata.
|
||||
- `coverartarchive/`: Example plugin that retrieves album cover images from the Cover Art Archive.
|
||||
- `crypto-ticker/`: Example plugin using websockets to log real-time cryptocurrency prices.
|
||||
- `discord-rich-presence/`: Example plugin that integrates with Discord Rich Presence to display currently playing tracks on Discord profiles.
|
||||
- `subsonicapi-demo/`: Example plugin that demonstrates how to interact with the Navidrome's Subsonic API from a plugin.
|
||||
- `wikimedia/`: Retrieves artist information from Wikidata.
|
||||
- `coverartarchive/`: Fetches album cover images from the Cover Art Archive.
|
||||
- `crypto-ticker/`: Uses websockets to log real-time cryptocurrency prices.
|
||||
- `discord-rich-presence/`: Integrates with Discord Rich Presence to display currently playing tracks on Discord profiles.
|
||||
- `subsonicapi-demo/`: Demonstrates interaction with Navidrome's Subsonic API from a plugin.
|
||||
|
||||
## Building
|
||||
|
||||
|
||||
@@ -143,5 +143,9 @@ func (CoverArtArchiveAgent) GetArtistTopSongs(ctx context.Context, req *api.Arti
|
||||
func main() {}
|
||||
|
||||
func init() {
|
||||
// Configure logging: No timestamps, no source file/line
|
||||
log.SetFlags(0)
|
||||
log.SetPrefix("[CAA] ")
|
||||
|
||||
api.RegisterMetadataAgent(CoverArtArchiveAgent{})
|
||||
}
|
||||
|
||||
@@ -15,7 +15,7 @@ This is a WebSocket-based WASM plugin for Navidrome that displays real-time cryp
|
||||
In your `navidrome.toml` file, add:
|
||||
|
||||
```toml
|
||||
[PluginSettings.crypto-ticker]
|
||||
[PluginConfig.crypto-ticker]
|
||||
tickers = "BTC,ETH,SOL,MATIC"
|
||||
```
|
||||
|
||||
|
||||
@@ -294,6 +294,10 @@ func calculatePercentChange(open, current string) string {
|
||||
func main() {}
|
||||
|
||||
func init() {
|
||||
// Configure logging: No timestamps, no source file/line, prepend [Crypto]
|
||||
log.SetFlags(0)
|
||||
log.SetPrefix("[Crypto] ")
|
||||
|
||||
api.RegisterWebSocketCallback(CryptoTickerPlugin{})
|
||||
api.RegisterLifecycleManagement(CryptoTickerPlugin{})
|
||||
api.RegisterSchedulerCallback(CryptoTickerPlugin{})
|
||||
|
||||
@@ -248,9 +248,37 @@ func (r *discordRPC) sendHeartbeat(ctx context.Context, username string) error {
|
||||
return r.sendMessage(ctx, username, heartbeatOpCode, resp.Value)
|
||||
}
|
||||
|
||||
func (r *discordRPC) cleanupFailedConnection(ctx context.Context, username string) {
|
||||
log.Printf("Cleaning up failed connection for user %s", username)
|
||||
|
||||
// Cancel the heartbeat schedule
|
||||
if resp, _ := r.sched.CancelSchedule(ctx, &scheduler.CancelRequest{ScheduleId: username}); resp.Error != "" {
|
||||
log.Printf("Failed to cancel heartbeat schedule for user %s: %s", username, resp.Error)
|
||||
}
|
||||
|
||||
// Close the WebSocket connection
|
||||
if resp, _ := r.ws.Close(ctx, &websocket.CloseRequest{
|
||||
ConnectionId: username,
|
||||
Code: 1000,
|
||||
Reason: "Connection lost",
|
||||
}); resp.Error != "" {
|
||||
log.Printf("Failed to close WebSocket connection for user %s: %s", username, resp.Error)
|
||||
}
|
||||
|
||||
// Clean up cache entries (just the sequence number, no failure tracking needed)
|
||||
_, _ = r.mem.Remove(ctx, &cache.RemoveRequest{Key: fmt.Sprintf("discord.seq.%s", username)})
|
||||
|
||||
log.Printf("Cleaned up connection for user %s", username)
|
||||
}
|
||||
|
||||
func (r *discordRPC) isConnected(ctx context.Context, username string) bool {
|
||||
// Try to send a heartbeat to test the connection
|
||||
err := r.sendHeartbeat(ctx, username)
|
||||
return err == nil
|
||||
if err != nil {
|
||||
log.Printf("Heartbeat test failed for user %s: %v", username, err)
|
||||
return false
|
||||
}
|
||||
return true
|
||||
}
|
||||
|
||||
func (r *discordRPC) connect(ctx context.Context, username string, token string) error {
|
||||
@@ -361,5 +389,14 @@ func (r *discordRPC) OnClose(_ context.Context, req *api.OnCloseRequest) (*api.O
|
||||
}
|
||||
|
||||
func (r *discordRPC) OnSchedulerCallback(ctx context.Context, req *api.SchedulerCallbackRequest) (*api.SchedulerCallbackResponse, error) {
|
||||
return nil, r.sendHeartbeat(ctx, req.ScheduleId)
|
||||
err := r.sendHeartbeat(ctx, req.ScheduleId)
|
||||
if err != nil {
|
||||
// On first heartbeat failure, immediately clean up the connection
|
||||
// The next NowPlaying call will reconnect if needed
|
||||
log.Printf("Heartbeat failed for user %s, cleaning up connection: %v", req.ScheduleId, err)
|
||||
r.cleanupFailedConnection(ctx, req.ScheduleId)
|
||||
return nil, fmt.Errorf("heartbeat failed, connection cleaned up: %w", err)
|
||||
}
|
||||
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
@@ -60,5 +60,9 @@ func (SubsonicAPIDemoPlugin) OnInit(ctx context.Context, req *api.InitRequest) (
|
||||
func main() {}
|
||||
|
||||
func init() {
|
||||
// Configure logging: No timestamps, no source file/line
|
||||
log.SetFlags(0)
|
||||
log.SetPrefix("[Subsonic Plugin] ")
|
||||
|
||||
api.RegisterLifecycleManagement(&SubsonicAPIDemoPlugin{})
|
||||
}
|
||||
|
||||
@@ -383,5 +383,9 @@ func (WikimediaAgent) GetAlbumImages(context.Context, *api.AlbumImagesRequest) (
|
||||
func main() {}
|
||||
|
||||
func init() {
|
||||
// Configure logging: No timestamps, no source file/line
|
||||
log.SetFlags(0)
|
||||
log.SetPrefix("[Wikimedia] ")
|
||||
|
||||
api.RegisterMetadataAgent(WikimediaAgent{})
|
||||
}
|
||||
|
||||
@@ -8,7 +8,6 @@ import (
|
||||
|
||||
gonanoid "github.com/matoous/go-nanoid/v2"
|
||||
"github.com/navidrome/navidrome/log"
|
||||
"github.com/navidrome/navidrome/plugins/api"
|
||||
"github.com/navidrome/navidrome/plugins/host/scheduler"
|
||||
navidsched "github.com/navidrome/navidrome/scheduler"
|
||||
)
|
||||
@@ -295,21 +294,10 @@ func (s *schedulerService) executeCallback(ctx context.Context, internalSchedule
|
||||
return
|
||||
}
|
||||
|
||||
callbackType := "one-time"
|
||||
if isRecurring {
|
||||
callbackType = "recurring"
|
||||
}
|
||||
|
||||
log.Debug("Executing schedule callback", "plugin", callback.PluginID, "scheduleID", callback.ID, "type", callbackType)
|
||||
ctx = log.NewContext(ctx, "plugin", callback.PluginID, "scheduleID", callback.ID, "type", callback.Type)
|
||||
log.Debug("Executing schedule callback")
|
||||
start := time.Now()
|
||||
|
||||
// Create a SchedulerCallbackRequest
|
||||
req := &api.SchedulerCallbackRequest{
|
||||
ScheduleId: callback.ID,
|
||||
Payload: callback.Payload,
|
||||
IsRecurring: isRecurring,
|
||||
}
|
||||
|
||||
// Get the plugin
|
||||
p := s.manager.LoadPlugin(callback.PluginID, CapabilitySchedulerCallback)
|
||||
if p == nil {
|
||||
@@ -317,31 +305,19 @@ func (s *schedulerService) executeCallback(ctx context.Context, internalSchedule
|
||||
return
|
||||
}
|
||||
|
||||
// Get instance
|
||||
inst, closeFn, err := p.Instantiate(ctx)
|
||||
if err != nil {
|
||||
log.Error("Error getting plugin instance for callback", "plugin", callback.PluginID, err)
|
||||
return
|
||||
}
|
||||
defer closeFn()
|
||||
|
||||
// Type-check the plugin
|
||||
plugin, ok := inst.(api.SchedulerCallback)
|
||||
plugin, ok := p.(*wasmSchedulerCallback)
|
||||
if !ok {
|
||||
log.Error("Plugin does not implement SchedulerCallback", "plugin", callback.PluginID)
|
||||
return
|
||||
}
|
||||
|
||||
// Call the plugin's OnSchedulerCallback method
|
||||
log.Trace(ctx, "Executing schedule callback", "plugin", callback.PluginID, "scheduleID", callback.ID, "type", callbackType)
|
||||
resp, err := plugin.OnSchedulerCallback(ctx, req)
|
||||
log.Trace(ctx, "Executing schedule callback")
|
||||
err := plugin.OnSchedulerCallback(ctx, callback.ID, callback.Payload, isRecurring)
|
||||
if err != nil {
|
||||
log.Error("Error executing schedule callback", "plugin", callback.PluginID, "elapsed", time.Since(start), err)
|
||||
log.Error("Error executing schedule callback", "elapsed", time.Since(start), err)
|
||||
return
|
||||
}
|
||||
log.Debug("Schedule callback executed", "plugin", callback.PluginID, "elapsed", time.Since(start))
|
||||
|
||||
if resp.Error != "" {
|
||||
log.Error("Plugin reported error in schedule callback", "plugin", callback.PluginID, resp.Error)
|
||||
}
|
||||
log.Debug("Schedule callback executed", "elapsed", time.Since(start))
|
||||
}
|
||||
|
||||
@@ -3,6 +3,7 @@ package plugins
|
||||
import (
|
||||
"context"
|
||||
|
||||
"github.com/navidrome/navidrome/core/metrics"
|
||||
"github.com/navidrome/navidrome/plugins/host/scheduler"
|
||||
. "github.com/onsi/ginkgo/v2"
|
||||
. "github.com/onsi/gomega"
|
||||
@@ -16,7 +17,7 @@ var _ = Describe("SchedulerService", func() {
|
||||
)
|
||||
|
||||
BeforeEach(func() {
|
||||
manager = createManager(nil, nil)
|
||||
manager = createManager(nil, metrics.NewNoopInstance())
|
||||
ss = manager.schedulerService
|
||||
})
|
||||
|
||||
|
||||
@@ -314,7 +314,7 @@ func (s *websocketService) handleMessages(internalID string, conn *WebSocketConn
|
||||
|
||||
// executeCallback is a common function that handles the plugin loading and execution
|
||||
// for all types of callbacks
|
||||
func (s *websocketService) executeCallback(ctx context.Context, pluginID string, fn func(context.Context, api.WebSocketCallback) error) {
|
||||
func (s *websocketService) executeCallback(ctx context.Context, pluginID, methodName string, fn func(context.Context, api.WebSocketCallback) error) {
|
||||
log.Debug(ctx, "WebSocket received")
|
||||
|
||||
start := time.Now()
|
||||
@@ -326,30 +326,16 @@ func (s *websocketService) executeCallback(ctx context.Context, pluginID string,
|
||||
return
|
||||
}
|
||||
|
||||
// Get instance
|
||||
inst, closeFn, err := p.Instantiate(ctx)
|
||||
if err != nil {
|
||||
log.Error(ctx, "Error getting plugin instance for WebSocket callback", err)
|
||||
return
|
||||
}
|
||||
defer closeFn()
|
||||
|
||||
// Type-check the plugin
|
||||
plugin, ok := inst.(api.WebSocketCallback)
|
||||
if !ok {
|
||||
log.Error(ctx, "Plugin does not implement WebSocketCallback")
|
||||
return
|
||||
}
|
||||
|
||||
// Call the appropriate callback function
|
||||
log.Trace(ctx, "Executing WebSocket callback")
|
||||
|
||||
if err = fn(ctx, plugin); err != nil {
|
||||
log.Error(ctx, "Error executing WebSocket callback", "elapsed", time.Since(start), err)
|
||||
return
|
||||
}
|
||||
|
||||
log.Debug(ctx, "WebSocket callback executed", "elapsed", time.Since(start))
|
||||
_, _ = callMethod(ctx, p, methodName, func(inst api.WebSocketCallback) (struct{}, error) {
|
||||
// Call the appropriate callback function
|
||||
log.Trace(ctx, "Executing WebSocket callback")
|
||||
if err := fn(ctx, inst); err != nil {
|
||||
log.Error(ctx, "Error executing WebSocket callback", "elapsed", time.Since(start), err)
|
||||
return struct{}{}, fmt.Errorf("error executing WebSocket callback: %w", err)
|
||||
}
|
||||
log.Debug(ctx, "WebSocket callback executed", "elapsed", time.Since(start))
|
||||
return struct{}{}, nil
|
||||
})
|
||||
}
|
||||
|
||||
// notifyTextCallback notifies the plugin of a text message
|
||||
@@ -361,8 +347,8 @@ func (s *websocketService) notifyTextCallback(ctx context.Context, connectionID
|
||||
|
||||
ctx = log.NewContext(ctx, "callback", "OnTextMessage", "size", len(message))
|
||||
|
||||
s.executeCallback(ctx, conn.PluginName, func(ctx context.Context, plugin api.WebSocketCallback) error {
|
||||
_, err := plugin.OnTextMessage(ctx, req)
|
||||
s.executeCallback(ctx, conn.PluginName, "OnTextMessage", func(ctx context.Context, plugin api.WebSocketCallback) error {
|
||||
_, err := checkErr(plugin.OnTextMessage(ctx, req))
|
||||
return err
|
||||
})
|
||||
}
|
||||
@@ -376,8 +362,8 @@ func (s *websocketService) notifyBinaryCallback(ctx context.Context, connectionI
|
||||
|
||||
ctx = log.NewContext(ctx, "callback", "OnBinaryMessage", "size", len(data))
|
||||
|
||||
s.executeCallback(ctx, conn.PluginName, func(ctx context.Context, plugin api.WebSocketCallback) error {
|
||||
_, err := plugin.OnBinaryMessage(ctx, req)
|
||||
s.executeCallback(ctx, conn.PluginName, "OnBinaryMessage", func(ctx context.Context, plugin api.WebSocketCallback) error {
|
||||
_, err := checkErr(plugin.OnBinaryMessage(ctx, req))
|
||||
return err
|
||||
})
|
||||
}
|
||||
@@ -391,8 +377,8 @@ func (s *websocketService) notifyErrorCallback(ctx context.Context, connectionID
|
||||
|
||||
ctx = log.NewContext(ctx, "callback", "OnError", "error", errorMsg)
|
||||
|
||||
s.executeCallback(ctx, conn.PluginName, func(ctx context.Context, plugin api.WebSocketCallback) error {
|
||||
_, err := plugin.OnError(ctx, req)
|
||||
s.executeCallback(ctx, conn.PluginName, "OnError", func(ctx context.Context, plugin api.WebSocketCallback) error {
|
||||
_, err := checkErr(plugin.OnError(ctx, req))
|
||||
return err
|
||||
})
|
||||
}
|
||||
@@ -407,8 +393,8 @@ func (s *websocketService) notifyCloseCallback(ctx context.Context, connectionID
|
||||
|
||||
ctx = log.NewContext(ctx, "callback", "OnClose", "code", code, "reason", reason)
|
||||
|
||||
s.executeCallback(ctx, conn.PluginName, func(ctx context.Context, plugin api.WebSocketCallback) error {
|
||||
_, err := plugin.OnClose(ctx, req)
|
||||
s.executeCallback(ctx, conn.PluginName, "OnClose", func(ctx context.Context, plugin api.WebSocketCallback) error {
|
||||
_, err := checkErr(plugin.OnClose(ctx, req))
|
||||
return err
|
||||
})
|
||||
}
|
||||
|
||||
@@ -6,9 +6,11 @@ import (
|
||||
"net/http/httptest"
|
||||
"strings"
|
||||
"sync"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
gorillaws "github.com/gorilla/websocket"
|
||||
"github.com/navidrome/navidrome/core/metrics"
|
||||
"github.com/navidrome/navidrome/plugins/host/websocket"
|
||||
. "github.com/onsi/ginkgo/v2"
|
||||
. "github.com/onsi/gomega"
|
||||
@@ -84,7 +86,7 @@ var _ = Describe("WebSocket Host Service", func() {
|
||||
DeferCleanup(server.Close)
|
||||
|
||||
// Create a new manager and websocket service
|
||||
manager = createManager(nil, nil)
|
||||
manager = createManager(nil, metrics.NewNoopInstance())
|
||||
wsService = newWebsocketService(manager)
|
||||
})
|
||||
|
||||
@@ -188,6 +190,10 @@ var _ = Describe("WebSocket Host Service", func() {
|
||||
})
|
||||
|
||||
It("handles connection errors gracefully", func() {
|
||||
if testing.Short() {
|
||||
GinkgoT().Skip("skipping test in short mode.")
|
||||
}
|
||||
|
||||
// Try to connect to an invalid URL
|
||||
req := &websocket.ConnectRequest{
|
||||
Url: "ws://invalid-url-that-does-not-exist",
|
||||
|
||||
@@ -10,10 +10,10 @@ package plugins
|
||||
//go:generate protoc --go-plugin_out=. --go-plugin_opt=paths=source_relative host/subsonicapi/subsonicapi.proto
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"net/http"
|
||||
"os"
|
||||
"slices"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
"time"
|
||||
@@ -53,8 +53,6 @@ var pluginCreators = map[string]pluginConstructor{
|
||||
type WasmPlugin interface {
|
||||
// PluginID returns the unique identifier of the plugin (folder name)
|
||||
PluginID() string
|
||||
// Instantiate creates a new instance of the plugin and returns it along with a cleanup function
|
||||
Instantiate(ctx context.Context) (any, func(), error)
|
||||
}
|
||||
|
||||
type plugin struct {
|
||||
@@ -91,11 +89,8 @@ type Manager interface {
|
||||
EnsureCompiled(name string) error
|
||||
PluginNames(serviceName string) []string
|
||||
LoadPlugin(name string, capability string) WasmPlugin
|
||||
LoadAllPlugins(capability string) []WasmPlugin
|
||||
LoadMediaAgent(name string) (agents.Interface, bool)
|
||||
LoadAllMediaAgents() []agents.Interface
|
||||
LoadScrobbler(name string) (scrobbler.Scrobbler, bool)
|
||||
LoadAllScrobblers() []scrobbler.Scrobbler
|
||||
ScanPlugins()
|
||||
}
|
||||
|
||||
@@ -126,7 +121,7 @@ func GetManager(ds model.DataStore, metrics metrics.Metrics) Manager {
|
||||
func createManager(ds model.DataStore, metrics metrics.Metrics) *managerImpl {
|
||||
m := &managerImpl{
|
||||
plugins: make(map[string]*plugin),
|
||||
lifecycle: newPluginLifecycleManager(),
|
||||
lifecycle: newPluginLifecycleManager(metrics),
|
||||
ds: ds,
|
||||
metrics: metrics,
|
||||
}
|
||||
@@ -170,16 +165,8 @@ func (m *managerImpl) registerPlugin(pluginID, pluginDir, wasmPath string, manif
|
||||
compilationReady: make(chan struct{}),
|
||||
}
|
||||
|
||||
// Start pre-compilation of WASM module in background
|
||||
go func() {
|
||||
precompilePlugin(p)
|
||||
// Check if this plugin implements InitService and hasn't been initialized yet
|
||||
m.initializePluginIfNeeded(p)
|
||||
}()
|
||||
|
||||
// Register the plugin
|
||||
// Register the plugin first
|
||||
m.mu.Lock()
|
||||
defer m.mu.Unlock()
|
||||
m.plugins[pluginID] = p
|
||||
|
||||
// Register one plugin adapter for each capability
|
||||
@@ -200,6 +187,14 @@ func (m *managerImpl) registerPlugin(pluginID, pluginDir, wasmPath string, manif
|
||||
}
|
||||
m.adapters[pluginID+"_"+capabilityStr] = adapter
|
||||
}
|
||||
m.mu.Unlock()
|
||||
|
||||
// Start pre-compilation of WASM module in background AFTER registration
|
||||
go func() {
|
||||
precompilePlugin(p)
|
||||
// Check if this plugin implements InitService and hasn't been initialized yet
|
||||
m.initializePluginIfNeeded(p)
|
||||
}()
|
||||
|
||||
log.Info("Discovered plugin", "folder", pluginID, "name", manifest.Name, "capabilities", manifest.Capabilities, "wasm", wasmPath, "dev_mode", isSymlink)
|
||||
return m.plugins[pluginID]
|
||||
@@ -213,15 +208,36 @@ func (m *managerImpl) initializePluginIfNeeded(plugin *plugin) {
|
||||
}
|
||||
|
||||
// Check if the plugin implements LifecycleManagement
|
||||
for _, capability := range plugin.Manifest.Capabilities {
|
||||
if capability == CapabilityLifecycleManagement {
|
||||
m.lifecycle.callOnInit(plugin)
|
||||
m.lifecycle.markInitialized(plugin)
|
||||
break
|
||||
if slices.Contains(plugin.Manifest.Capabilities, CapabilityLifecycleManagement) {
|
||||
if err := m.lifecycle.callOnInit(plugin); err != nil {
|
||||
m.unregisterPlugin(plugin.ID)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// unregisterPlugin removes a plugin from the manager
|
||||
func (m *managerImpl) unregisterPlugin(pluginID string) {
|
||||
m.mu.Lock()
|
||||
defer m.mu.Unlock()
|
||||
|
||||
plugin, ok := m.plugins[pluginID]
|
||||
if !ok {
|
||||
return
|
||||
}
|
||||
|
||||
// Clear initialization state from lifecycle manager
|
||||
m.lifecycle.clearInitialized(plugin)
|
||||
|
||||
// Unregister plugin adapters
|
||||
for _, capability := range plugin.Manifest.Capabilities {
|
||||
delete(m.adapters, pluginID+"_"+string(capability))
|
||||
}
|
||||
|
||||
// Unregister plugin
|
||||
delete(m.plugins, pluginID)
|
||||
log.Info("Unregistered plugin", "plugin", pluginID)
|
||||
}
|
||||
|
||||
// ScanPlugins scans the plugins directory, discovers all valid plugins, and registers them for use.
|
||||
func (m *managerImpl) ScanPlugins() {
|
||||
// Clear existing plugins
|
||||
@@ -344,23 +360,6 @@ func (m *managerImpl) EnsureCompiled(name string) error {
|
||||
return plugin.waitForCompilation()
|
||||
}
|
||||
|
||||
// LoadAllPlugins instantiates and returns all plugins that implement the specified capability
|
||||
func (m *managerImpl) LoadAllPlugins(capability string) []WasmPlugin {
|
||||
names := m.PluginNames(capability)
|
||||
if len(names) == 0 {
|
||||
return nil
|
||||
}
|
||||
|
||||
var plugins []WasmPlugin
|
||||
for _, name := range names {
|
||||
plugin := m.LoadPlugin(name, capability)
|
||||
if plugin != nil {
|
||||
plugins = append(plugins, plugin)
|
||||
}
|
||||
}
|
||||
return plugins
|
||||
}
|
||||
|
||||
// LoadMediaAgent instantiates and returns a media agent plugin by folder name
|
||||
func (m *managerImpl) LoadMediaAgent(name string) (agents.Interface, bool) {
|
||||
plugin := m.LoadPlugin(name, CapabilityMetadataAgent)
|
||||
@@ -371,15 +370,6 @@ func (m *managerImpl) LoadMediaAgent(name string) (agents.Interface, bool) {
|
||||
return agent, ok
|
||||
}
|
||||
|
||||
// LoadAllMediaAgents instantiates and returns all media agent plugins
|
||||
func (m *managerImpl) LoadAllMediaAgents() []agents.Interface {
|
||||
plugins := m.LoadAllPlugins(CapabilityMetadataAgent)
|
||||
|
||||
return slice.Map(plugins, func(p WasmPlugin) agents.Interface {
|
||||
return p.(agents.Interface)
|
||||
})
|
||||
}
|
||||
|
||||
// LoadScrobbler instantiates and returns a scrobbler plugin by folder name
|
||||
func (m *managerImpl) LoadScrobbler(name string) (scrobbler.Scrobbler, bool) {
|
||||
plugin := m.LoadPlugin(name, CapabilityScrobbler)
|
||||
@@ -390,15 +380,6 @@ func (m *managerImpl) LoadScrobbler(name string) (scrobbler.Scrobbler, bool) {
|
||||
return s, ok
|
||||
}
|
||||
|
||||
// LoadAllScrobblers instantiates and returns all scrobbler plugins
|
||||
func (m *managerImpl) LoadAllScrobblers() []scrobbler.Scrobbler {
|
||||
plugins := m.LoadAllPlugins(CapabilityScrobbler)
|
||||
|
||||
return slice.Map(plugins, func(p WasmPlugin) scrobbler.Scrobbler {
|
||||
return p.(scrobbler.Scrobbler)
|
||||
})
|
||||
}
|
||||
|
||||
type noopManager struct{}
|
||||
|
||||
func (n noopManager) SetSubsonicRouter(router SubsonicRouter) {}
|
||||
@@ -409,14 +390,8 @@ func (n noopManager) PluginNames(serviceName string) []string { return nil }
|
||||
|
||||
func (n noopManager) LoadPlugin(name string, capability string) WasmPlugin { return nil }
|
||||
|
||||
func (n noopManager) LoadAllPlugins(capability string) []WasmPlugin { return nil }
|
||||
|
||||
func (n noopManager) LoadMediaAgent(name string) (agents.Interface, bool) { return nil, false }
|
||||
|
||||
func (n noopManager) LoadAllMediaAgents() []agents.Interface { return nil }
|
||||
|
||||
func (n noopManager) LoadScrobbler(name string) (scrobbler.Scrobbler, bool) { return nil, false }
|
||||
|
||||
func (n noopManager) LoadAllScrobblers() []scrobbler.Scrobbler { return nil }
|
||||
|
||||
func (n noopManager) ScanPlugins() {}
|
||||
|
||||
@@ -7,6 +7,8 @@ import (
|
||||
|
||||
"github.com/navidrome/navidrome/conf"
|
||||
"github.com/navidrome/navidrome/core/agents"
|
||||
"github.com/navidrome/navidrome/core/metrics"
|
||||
"github.com/navidrome/navidrome/plugins/schema"
|
||||
. "github.com/onsi/ginkgo/v2"
|
||||
. "github.com/onsi/gomega"
|
||||
)
|
||||
@@ -27,7 +29,7 @@ var _ = Describe("Plugin Manager", func() {
|
||||
conf.Server.Plugins.Folder = testDataDir
|
||||
|
||||
ctx = GinkgoT().Context()
|
||||
mgr = createManager(nil, nil)
|
||||
mgr = createManager(nil, metrics.NewNoopInstance())
|
||||
mgr.ScanPlugins()
|
||||
})
|
||||
|
||||
@@ -36,17 +38,21 @@ var _ = Describe("Plugin Manager", func() {
|
||||
|
||||
mediaAgentNames := mgr.PluginNames("MetadataAgent")
|
||||
Expect(mediaAgentNames).To(HaveLen(4))
|
||||
Expect(mediaAgentNames).To(ContainElement("fake_artist_agent"))
|
||||
Expect(mediaAgentNames).To(ContainElement("fake_album_agent"))
|
||||
Expect(mediaAgentNames).To(ContainElement("multi_plugin"))
|
||||
Expect(mediaAgentNames).To(ContainElement("unauthorized_plugin"))
|
||||
Expect(mediaAgentNames).To(ContainElements(
|
||||
"fake_artist_agent",
|
||||
"fake_album_agent",
|
||||
"multi_plugin",
|
||||
"unauthorized_plugin",
|
||||
))
|
||||
|
||||
scrobblerNames := mgr.PluginNames("Scrobbler")
|
||||
Expect(scrobblerNames).To(ContainElement("fake_scrobbler"))
|
||||
|
||||
initServiceNames := mgr.PluginNames("LifecycleManagement")
|
||||
Expect(initServiceNames).To(ContainElement("multi_plugin"))
|
||||
Expect(initServiceNames).To(ContainElement("fake_init_service"))
|
||||
Expect(initServiceNames).To(ContainElements("multi_plugin", "fake_init_service"))
|
||||
|
||||
schedulerCallbackNames := mgr.PluginNames("SchedulerCallback")
|
||||
Expect(schedulerCallbackNames).To(ContainElement("multi_plugin"))
|
||||
})
|
||||
|
||||
It("should load a MetadataAgent plugin and invoke artist-related methods", func() {
|
||||
@@ -65,13 +71,18 @@ var _ = Describe("Plugin Manager", func() {
|
||||
})
|
||||
|
||||
It("should load all MetadataAgent plugins", func() {
|
||||
agents := mgr.LoadAllMediaAgents()
|
||||
Expect(agents).To(HaveLen(4))
|
||||
var names []string
|
||||
for _, a := range agents {
|
||||
names = append(names, a.AgentName())
|
||||
mediaAgentNames := mgr.PluginNames("MetadataAgent")
|
||||
Expect(mediaAgentNames).To(HaveLen(4))
|
||||
|
||||
var agentNames []string
|
||||
for _, name := range mediaAgentNames {
|
||||
agent, ok := mgr.LoadMediaAgent(name)
|
||||
if ok {
|
||||
agentNames = append(agentNames, agent.AgentName())
|
||||
}
|
||||
}
|
||||
Expect(names).To(ContainElements("fake_artist_agent", "fake_album_agent", "multi_plugin", "unauthorized_plugin"))
|
||||
|
||||
Expect(agentNames).To(ContainElements("fake_artist_agent", "fake_album_agent", "multi_plugin", "unauthorized_plugin"))
|
||||
})
|
||||
|
||||
Describe("ScanPlugins", func() {
|
||||
@@ -85,7 +96,7 @@ var _ = Describe("Plugin Manager", func() {
|
||||
})
|
||||
|
||||
conf.Server.Plugins.Folder = tempPluginsDir
|
||||
m = createManager(nil, nil)
|
||||
m = createManager(nil, metrics.NewNoopInstance())
|
||||
})
|
||||
|
||||
// Helper to create a complete valid plugin for manager testing
|
||||
@@ -193,21 +204,8 @@ var _ = Describe("Plugin Manager", func() {
|
||||
|
||||
Describe("Invoke Methods", func() {
|
||||
It("should load all MetadataAgent plugins and invoke methods", func() {
|
||||
mediaAgentNames := mgr.PluginNames("MetadataAgent")
|
||||
Expect(mediaAgentNames).NotTo(BeEmpty())
|
||||
|
||||
plugins := mgr.LoadAllPlugins("MetadataAgent")
|
||||
Expect(plugins).To(HaveLen(len(mediaAgentNames)))
|
||||
|
||||
var fakeAlbumPlugin agents.Interface
|
||||
for _, p := range plugins {
|
||||
if agent, ok := p.(agents.Interface); ok {
|
||||
if agent.AgentName() == "fake_album_agent" {
|
||||
fakeAlbumPlugin = agent
|
||||
break
|
||||
}
|
||||
}
|
||||
}
|
||||
fakeAlbumPlugin, isMediaAgent := mgr.LoadMediaAgent("fake_album_agent")
|
||||
Expect(isMediaAgent).To(BeTrue())
|
||||
|
||||
Expect(fakeAlbumPlugin).NotTo(BeNil(), "fake_album_agent should be loaded")
|
||||
|
||||
@@ -254,4 +252,95 @@ var _ = Describe("Plugin Manager", func() {
|
||||
}
|
||||
})
|
||||
})
|
||||
|
||||
Describe("Plugin Initialization Lifecycle", func() {
|
||||
BeforeEach(func() {
|
||||
conf.Server.Plugins.Enabled = true
|
||||
conf.Server.Plugins.Folder = testDataDir
|
||||
})
|
||||
|
||||
Context("when OnInit is successful", func() {
|
||||
It("should register and initialize the plugin", func() {
|
||||
conf.Server.PluginConfig = nil
|
||||
mgr = createManager(nil, metrics.NewNoopInstance()) // Create manager after setting config
|
||||
mgr.ScanPlugins()
|
||||
|
||||
plugin := mgr.plugins["fake_init_service"]
|
||||
Expect(plugin).NotTo(BeNil())
|
||||
|
||||
Eventually(func() bool {
|
||||
return mgr.lifecycle.isInitialized(plugin)
|
||||
}).Should(BeTrue())
|
||||
|
||||
// Check that the plugin is still registered
|
||||
names := mgr.PluginNames(CapabilityLifecycleManagement)
|
||||
Expect(names).To(ContainElement("fake_init_service"))
|
||||
})
|
||||
})
|
||||
|
||||
Context("when OnInit fails", func() {
|
||||
It("should unregister the plugin if OnInit returns an error string", func() {
|
||||
conf.Server.PluginConfig = map[string]map[string]string{
|
||||
"fake_init_service": {
|
||||
"returnError": "response_error",
|
||||
},
|
||||
}
|
||||
mgr = createManager(nil, metrics.NewNoopInstance()) // Create manager after setting config
|
||||
mgr.ScanPlugins()
|
||||
|
||||
Eventually(func() []string {
|
||||
return mgr.PluginNames(CapabilityLifecycleManagement)
|
||||
}).ShouldNot(ContainElement("fake_init_service"))
|
||||
})
|
||||
|
||||
It("should unregister the plugin if OnInit returns a Go error", func() {
|
||||
conf.Server.PluginConfig = map[string]map[string]string{
|
||||
"fake_init_service": {
|
||||
"returnError": "go_error",
|
||||
},
|
||||
}
|
||||
mgr = createManager(nil, metrics.NewNoopInstance()) // Create manager after setting config
|
||||
mgr.ScanPlugins()
|
||||
|
||||
Eventually(func() []string {
|
||||
return mgr.PluginNames(CapabilityLifecycleManagement)
|
||||
}).ShouldNot(ContainElement("fake_init_service"))
|
||||
})
|
||||
})
|
||||
|
||||
It("should clear lifecycle state when unregistering a plugin", func() {
|
||||
// Create a manager and register a plugin
|
||||
mgr := createManager(nil, metrics.NewNoopInstance())
|
||||
|
||||
// Create a mock plugin with LifecycleManagement capability
|
||||
plugin := &plugin{
|
||||
ID: "test-plugin",
|
||||
Capabilities: []string{CapabilityLifecycleManagement},
|
||||
Manifest: &schema.PluginManifest{
|
||||
Version: "1.0.0",
|
||||
},
|
||||
}
|
||||
|
||||
// Register the plugin in the manager
|
||||
mgr.mu.Lock()
|
||||
mgr.plugins[plugin.ID] = plugin
|
||||
mgr.mu.Unlock()
|
||||
|
||||
// Mark the plugin as initialized in the lifecycle manager
|
||||
mgr.lifecycle.markInitialized(plugin)
|
||||
Expect(mgr.lifecycle.isInitialized(plugin)).To(BeTrue())
|
||||
|
||||
// Unregister the plugin
|
||||
mgr.unregisterPlugin(plugin.ID)
|
||||
|
||||
// Verify that the plugin is no longer in the manager
|
||||
mgr.mu.RLock()
|
||||
_, exists := mgr.plugins[plugin.ID]
|
||||
mgr.mu.RUnlock()
|
||||
Expect(exists).To(BeFalse())
|
||||
|
||||
// Verify that the lifecycle state has been cleared
|
||||
Expect(mgr.lifecycle.isInitialized(plugin)).To(BeFalse())
|
||||
})
|
||||
})
|
||||
})
|
||||
|
||||
@@ -8,6 +8,7 @@ import (
|
||||
|
||||
"github.com/navidrome/navidrome/conf"
|
||||
"github.com/navidrome/navidrome/conf/configtest"
|
||||
"github.com/navidrome/navidrome/core/metrics"
|
||||
"github.com/navidrome/navidrome/plugins/schema"
|
||||
. "github.com/onsi/ginkgo/v2"
|
||||
. "github.com/onsi/gomega"
|
||||
@@ -55,7 +56,7 @@ var _ = Describe("Plugin Permissions", func() {
|
||||
BeforeEach(func() {
|
||||
DeferCleanup(configtest.SetupConfig())
|
||||
ctx = context.Background()
|
||||
mgr = createManager(nil, nil)
|
||||
mgr = createManager(nil, metrics.NewNoopInstance())
|
||||
tempDir = GinkgoT().TempDir()
|
||||
})
|
||||
|
||||
|
||||
@@ -8,6 +8,7 @@ import (
|
||||
|
||||
"github.com/navidrome/navidrome/conf"
|
||||
"github.com/navidrome/navidrome/consts"
|
||||
"github.com/navidrome/navidrome/core/metrics"
|
||||
"github.com/navidrome/navidrome/log"
|
||||
"github.com/navidrome/navidrome/plugins/api"
|
||||
)
|
||||
@@ -16,13 +17,15 @@ import (
|
||||
type pluginLifecycleManager struct {
|
||||
plugins sync.Map // string -> bool
|
||||
config map[string]map[string]string
|
||||
metrics metrics.Metrics
|
||||
}
|
||||
|
||||
// newPluginLifecycleManager creates a new plugin lifecycle manager
|
||||
func newPluginLifecycleManager() *pluginLifecycleManager {
|
||||
func newPluginLifecycleManager(metrics metrics.Metrics) *pluginLifecycleManager {
|
||||
config := maps.Clone(conf.Server.PluginConfig)
|
||||
return &pluginLifecycleManager{
|
||||
config: config,
|
||||
config: config,
|
||||
metrics: metrics,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -39,8 +42,14 @@ func (m *pluginLifecycleManager) markInitialized(plugin *plugin) {
|
||||
m.plugins.Store(key, true)
|
||||
}
|
||||
|
||||
// clearInitialized removes the initialization state of a plugin
|
||||
func (m *pluginLifecycleManager) clearInitialized(plugin *plugin) {
|
||||
key := plugin.ID + consts.Zwsp + plugin.Manifest.Version
|
||||
m.plugins.Delete(key)
|
||||
}
|
||||
|
||||
// callOnInit calls the OnInit method on a plugin that implements LifecycleManagement
|
||||
func (m *pluginLifecycleManager) callOnInit(plugin *plugin) {
|
||||
func (m *pluginLifecycleManager) callOnInit(plugin *plugin) error {
|
||||
ctx := context.Background()
|
||||
log.Debug("Initializing plugin", "name", plugin.ID)
|
||||
start := time.Now()
|
||||
@@ -49,13 +58,13 @@ func (m *pluginLifecycleManager) callOnInit(plugin *plugin) {
|
||||
loader, err := api.NewLifecycleManagementPlugin(ctx, api.WazeroRuntime(plugin.Runtime), api.WazeroModuleConfig(plugin.ModConfig))
|
||||
if loader == nil || err != nil {
|
||||
log.Error("Error creating LifecycleManagement plugin", "plugin", plugin.ID, err)
|
||||
return
|
||||
return err
|
||||
}
|
||||
|
||||
initPlugin, err := loader.Load(ctx, plugin.WasmPath)
|
||||
if err != nil {
|
||||
log.Error("Error loading LifecycleManagement plugin", "plugin", plugin.ID, "path", plugin.WasmPath, err)
|
||||
return
|
||||
return err
|
||||
}
|
||||
defer initPlugin.Close(ctx)
|
||||
|
||||
@@ -71,16 +80,16 @@ func (m *pluginLifecycleManager) callOnInit(plugin *plugin) {
|
||||
}
|
||||
|
||||
// Call OnInit
|
||||
resp, err := initPlugin.OnInit(ctx, req)
|
||||
callStart := time.Now()
|
||||
_, err = checkErr(initPlugin.OnInit(ctx, req))
|
||||
m.metrics.RecordPluginRequest(ctx, plugin.ID, "OnInit", err != nil, time.Since(callStart).Milliseconds())
|
||||
if err != nil {
|
||||
log.Error("Error initializing plugin", "plugin", plugin.ID, "elapsed", time.Since(start), err)
|
||||
return
|
||||
}
|
||||
|
||||
if resp.Error != "" {
|
||||
log.Error("Plugin reported error during initialization", "plugin", plugin.ID, "error", resp.Error)
|
||||
return
|
||||
return err
|
||||
}
|
||||
|
||||
// Mark the plugin as initialized
|
||||
m.markInitialized(plugin)
|
||||
log.Debug("Plugin initialized successfully", "plugin", plugin.ID, "elapsed", time.Since(start))
|
||||
return nil
|
||||
}
|
||||
|
||||
@@ -2,6 +2,7 @@ package plugins
|
||||
|
||||
import (
|
||||
"github.com/navidrome/navidrome/consts"
|
||||
"github.com/navidrome/navidrome/core/metrics"
|
||||
"github.com/navidrome/navidrome/plugins/schema"
|
||||
. "github.com/onsi/ginkgo/v2"
|
||||
. "github.com/onsi/gomega"
|
||||
@@ -18,11 +19,11 @@ func hasInitService(info *plugin) bool {
|
||||
}
|
||||
|
||||
var _ = Describe("LifecycleManagement", func() {
|
||||
Describe("Plugin Lifecycle managerImpl", func() {
|
||||
Describe("Plugin Lifecycle Manager", func() {
|
||||
var lifecycleManager *pluginLifecycleManager
|
||||
|
||||
BeforeEach(func() {
|
||||
lifecycleManager = newPluginLifecycleManager()
|
||||
lifecycleManager = newPluginLifecycleManager(metrics.NewNoopInstance())
|
||||
})
|
||||
|
||||
It("should track initialization state of plugins", func() {
|
||||
@@ -140,5 +141,26 @@ var _ = Describe("LifecycleManagement", func() {
|
||||
|
||||
Expect(actualKey).To(Equal(expectedKey))
|
||||
})
|
||||
|
||||
It("should clear initialization state when requested", func() {
|
||||
plugin := &plugin{
|
||||
ID: "test-plugin",
|
||||
Capabilities: []string{CapabilityLifecycleManagement},
|
||||
Manifest: &schema.PluginManifest{
|
||||
Version: "1.0.0",
|
||||
},
|
||||
}
|
||||
|
||||
// Initially not initialized
|
||||
Expect(lifecycleManager.isInitialized(plugin)).To(BeFalse())
|
||||
|
||||
// Mark as initialized
|
||||
lifecycleManager.markInitialized(plugin)
|
||||
Expect(lifecycleManager.isInitialized(plugin)).To(BeTrue())
|
||||
|
||||
// Clear initialization state
|
||||
lifecycleManager.clearInitialized(plugin)
|
||||
Expect(lifecycleManager.isInitialized(plugin)).To(BeFalse())
|
||||
})
|
||||
})
|
||||
})
|
||||
|
||||
@@ -8,6 +8,7 @@ import (
|
||||
"time"
|
||||
|
||||
"github.com/navidrome/navidrome/conf"
|
||||
"github.com/navidrome/navidrome/core/metrics"
|
||||
"github.com/navidrome/navidrome/plugins/schema"
|
||||
. "github.com/onsi/ginkgo/v2"
|
||||
. "github.com/onsi/gomega"
|
||||
@@ -40,7 +41,7 @@ var _ = Describe("CachingRuntime", func() {
|
||||
|
||||
BeforeEach(func() {
|
||||
ctx = GinkgoT().Context()
|
||||
mgr = createManager(nil, nil)
|
||||
mgr = createManager(nil, metrics.NewNoopInstance())
|
||||
// Add permissions for the test plugin using typed struct
|
||||
permissions := schema.PluginManifestPermissions{
|
||||
Http: &schema.PluginManifestPermissionsHttp{
|
||||
|
||||
17
plugins/testdata/fake_init_service/plugin.go
vendored
17
plugins/testdata/fake_init_service/plugin.go
vendored
@@ -4,6 +4,7 @@ package main
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"log"
|
||||
|
||||
"github.com/navidrome/navidrome/plugins/api"
|
||||
@@ -13,6 +14,22 @@ type initServicePlugin struct{}
|
||||
|
||||
func (p *initServicePlugin) OnInit(ctx context.Context, req *api.InitRequest) (*api.InitResponse, error) {
|
||||
log.Printf("OnInit called with %v", req)
|
||||
|
||||
// Check for specific error conditions in the config
|
||||
if req.Config != nil {
|
||||
if errorType, exists := req.Config["returnError"]; exists {
|
||||
switch errorType {
|
||||
case "go_error":
|
||||
return nil, errors.New("initialization failed with Go error")
|
||||
case "response_error":
|
||||
return &api.InitResponse{
|
||||
Error: "initialization failed with response error",
|
||||
}, nil
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Default: successful initialization
|
||||
return &api.InitResponse{}, nil
|
||||
}
|
||||
|
||||
|
||||
@@ -1,118 +0,0 @@
|
||||
package plugins
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"time"
|
||||
|
||||
"github.com/navidrome/navidrome/core/metrics"
|
||||
"github.com/navidrome/navidrome/log"
|
||||
"github.com/navidrome/navidrome/model/id"
|
||||
"github.com/navidrome/navidrome/plugins/api"
|
||||
)
|
||||
|
||||
// newWasmBasePlugin creates a new instance of wasmBasePlugin with the required parameters.
|
||||
func newWasmBasePlugin[S any, P any](wasmPath, id, capability string, m metrics.Metrics, loader P, loadFunc loaderFunc[S, P]) *wasmBasePlugin[S, P] {
|
||||
return &wasmBasePlugin[S, P]{
|
||||
wasmPath: wasmPath,
|
||||
id: id,
|
||||
capability: capability,
|
||||
loader: loader,
|
||||
loadFunc: loadFunc,
|
||||
metrics: m,
|
||||
}
|
||||
}
|
||||
|
||||
// LoaderFunc is a generic function type that loads a plugin instance.
|
||||
type loaderFunc[S any, P any] func(ctx context.Context, loader P, path string) (S, error)
|
||||
|
||||
// wasmBasePlugin is a generic base implementation for WASM plugins.
|
||||
// S is the service interface type and P is the plugin loader type.
|
||||
type wasmBasePlugin[S any, P any] struct {
|
||||
wasmPath string
|
||||
id string
|
||||
capability string
|
||||
loader P
|
||||
loadFunc loaderFunc[S, P]
|
||||
metrics metrics.Metrics
|
||||
}
|
||||
|
||||
func (w *wasmBasePlugin[S, P]) PluginID() string {
|
||||
return w.id
|
||||
}
|
||||
|
||||
func (w *wasmBasePlugin[S, P]) Instantiate(ctx context.Context) (any, func(), error) {
|
||||
return w.getInstance(ctx, "<none>")
|
||||
}
|
||||
|
||||
func (w *wasmBasePlugin[S, P]) serviceName() string {
|
||||
return w.id + "_" + w.capability
|
||||
}
|
||||
|
||||
func (w *wasmBasePlugin[S, P]) getMetrics() metrics.Metrics {
|
||||
return w.metrics
|
||||
}
|
||||
|
||||
// getInstance loads a new plugin instance and returns a cleanup function.
|
||||
func (w *wasmBasePlugin[S, P]) getInstance(ctx context.Context, methodName string) (S, func(), error) {
|
||||
start := time.Now()
|
||||
// Add context metadata for tracing
|
||||
ctx = log.NewContext(ctx, "capability", w.serviceName(), "method", methodName)
|
||||
|
||||
inst, err := w.loadFunc(ctx, w.loader, w.wasmPath)
|
||||
if err != nil {
|
||||
var zero S
|
||||
return zero, func() {}, fmt.Errorf("wasmBasePlugin: failed to load instance for %s: %w", w.serviceName(), err)
|
||||
}
|
||||
// Add context metadata for tracing
|
||||
ctx = log.NewContext(ctx, "instanceID", getInstanceID(inst))
|
||||
log.Trace(ctx, "wasmBasePlugin: loaded instance", "elapsed", time.Since(start))
|
||||
return inst, func() {
|
||||
log.Trace(ctx, "wasmBasePlugin: finished using instance", "elapsed", time.Since(start))
|
||||
if closer, ok := any(inst).(interface{ Close(context.Context) error }); ok {
|
||||
_ = closer.Close(ctx)
|
||||
}
|
||||
}, nil
|
||||
}
|
||||
|
||||
type wasmPlugin[S any] interface {
|
||||
PluginID() string
|
||||
getInstance(ctx context.Context, methodName string) (S, func(), error)
|
||||
getMetrics() metrics.Metrics
|
||||
}
|
||||
|
||||
type errorMapper interface {
|
||||
mapError(err error) error
|
||||
}
|
||||
|
||||
func callMethod[S any, R any](ctx context.Context, w wasmPlugin[S], methodName string, fn func(inst S) (R, error)) (R, error) {
|
||||
// Add a unique call ID to the context for tracing
|
||||
ctx = log.NewContext(ctx, "callID", id.NewRandom())
|
||||
|
||||
inst, done, err := w.getInstance(ctx, methodName)
|
||||
var r R
|
||||
if err != nil {
|
||||
return r, err
|
||||
}
|
||||
start := time.Now()
|
||||
defer done()
|
||||
r, err = fn(inst)
|
||||
elapsed := time.Since(start)
|
||||
|
||||
if em, ok := any(w).(errorMapper); ok {
|
||||
err = em.mapError(err)
|
||||
}
|
||||
|
||||
if !errors.Is(err, api.ErrNotImplemented) {
|
||||
id := w.PluginID()
|
||||
isOk := err == nil
|
||||
metrics := w.getMetrics()
|
||||
if metrics != nil {
|
||||
metrics.RecordPluginRequest(ctx, id, methodName, isOk, elapsed.Milliseconds())
|
||||
log.Trace(ctx, "callMethod: sending metrics", "plugin", id, "method", methodName, "ok", isOk, elapsed)
|
||||
}
|
||||
}
|
||||
|
||||
return r, err
|
||||
}
|
||||
@@ -1,32 +0,0 @@
|
||||
package plugins
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
||||
. "github.com/onsi/ginkgo/v2"
|
||||
. "github.com/onsi/gomega"
|
||||
)
|
||||
|
||||
type nilInstance struct{}
|
||||
|
||||
var _ = Describe("wasmBasePlugin", func() {
|
||||
var ctx = context.Background()
|
||||
|
||||
It("should load instance using loadFunc", func() {
|
||||
called := false
|
||||
plugin := &wasmBasePlugin[*nilInstance, any]{
|
||||
wasmPath: "",
|
||||
id: "test",
|
||||
capability: "test",
|
||||
loadFunc: func(ctx context.Context, _ any, path string) (*nilInstance, error) {
|
||||
called = true
|
||||
return &nilInstance{}, nil
|
||||
},
|
||||
}
|
||||
inst, done, err := plugin.getInstance(ctx, "test")
|
||||
defer done()
|
||||
Expect(err).To(BeNil())
|
||||
Expect(inst).ToNot(BeNil())
|
||||
Expect(called).To(BeTrue())
|
||||
})
|
||||
})
|
||||
Reference in New Issue
Block a user