Compare commits

..

18 Commits

Author SHA1 Message Date
Deluan
91be7ef20e feat(plugins): add ClearQueue function to remove pending tasks from a specified queue
Signed-off-by: Deluan <deluan@navidrome.org>
2026-02-28 17:27:19 -05:00
Deluan Quintão
e5604004f9 Merge branch 'master' into feat/plugin-taskqueue-host-service 2026-02-28 11:36:57 -05:00
Deluan
c2e8b39392 feat(plugins): update TaskWorker interface to return status messages and refactor task queue service
Signed-off-by: Deluan <deluan@navidrome.org>
2026-02-27 20:25:52 -05:00
Deluan
1974d1276e refactor(plugins): simplify goroutine management in task queue service
Signed-off-by: Deluan <deluan@navidrome.org>
2026-02-27 19:32:06 -05:00
Deluan
d7ace6f95f feat(plugins): increase maxConcurrency for task queue and handle budget exhaustion
Signed-off-by: Deluan <deluan@navidrome.org>
2026-02-27 19:32:06 -05:00
Deluan
a196ec9a59 refactor(plugins): streamline task queue configuration and error handling
Signed-off-by: Deluan <deluan@navidrome.org>
2026-02-27 19:32:06 -05:00
Deluan
132928abb6 fix(plugins): use context-aware database execution in TaskQueue host service
Signed-off-by: Deluan <deluan@navidrome.org>
2026-02-27 19:32:06 -05:00
Deluan
173aa9b979 refactor(plugins): remove capability check for TaskWorker in TaskQueue host service
Signed-off-by: Deluan <deluan@navidrome.org>
2026-02-27 19:32:06 -05:00
Deluan
3b2133c134 fix(plugins): harden TaskQueue host service with validation and safety improvements
Add input validation (queue name length, payload size limits), extract
status string constants to eliminate raw SQL literals, make CreateQueue
idempotent via upsert for crash recovery, fix RetentionMs default check
for negative values, cap exponential backoff at 1 hour to prevent
overflow, and replace manual mutex-based delay enforcement with
rate.Limiter from golang.org/x/time/rate for correct concurrent worker
serialization.
2026-02-27 19:32:06 -05:00
Deluan
74bacf6879 docs: document TaskQueue module for persistent task queues
Signed-off-by: Deluan <deluan@navidrome.org>
2026-02-27 19:32:06 -05:00
Deluan
55ef58da83 feat(plugins): add integration tests for TaskQueue host service 2026-02-27 19:32:06 -05:00
Deluan
2bfbe6fde1 feat(plugins): add test-taskqueue plugin for integration testing 2026-02-27 19:32:06 -05:00
Deluan
03cce614fd feat(plugins): register TaskQueue host service in manager 2026-02-27 19:32:06 -05:00
Deluan
36a8cb37ca feat(plugins): require TaskWorker capability for taskqueue permission 2026-02-27 19:32:06 -05:00
Deluan
11d2b3b51c feat(plugins): implement TaskQueue service with SQLite persistence and workers
Per-plugin SQLite database with queues and tasks tables. Worker goroutines
dequeue tasks and invoke nd_task_execute callback. Exponential backoff
retries, rate limiting via delayMs, automatic cleanup of terminal tasks.
2026-02-27 19:32:06 -05:00
Deluan
b308c71f38 feat(plugins): add taskqueue permission to manifest schema
Add TaskQueuePermission with maxConcurrency option.
2026-02-27 19:32:06 -05:00
Deluan
591f3a333b feat(plugins): define TaskWorker capability for task execution callbacks 2026-02-27 19:32:06 -05:00
Deluan
36b58a9a10 feat(plugins): define TaskQueue host service interface
Add the TaskQueueService interface with CreateQueue, Enqueue,
GetTaskStatus, and CancelTask methods plus QueueConfig struct.
2026-02-27 19:32:06 -05:00
49 changed files with 3491 additions and 852 deletions

View File

@@ -1,6 +1,6 @@
// Code generated by Wire. DO NOT EDIT.
//go:generate go run -mod=mod github.com/google/wire/cmd/wire gen -tags "netgo sqlite_fts5"
//go:generate go run -mod=mod github.com/google/wire/cmd/wire gen -tags "netgo"
//go:build !wireinject
// +build !wireinject
@@ -16,7 +16,6 @@ import (
"github.com/navidrome/navidrome/core/artwork"
"github.com/navidrome/navidrome/core/external"
"github.com/navidrome/navidrome/core/ffmpeg"
"github.com/navidrome/navidrome/core/lyrics"
"github.com/navidrome/navidrome/core/metrics"
"github.com/navidrome/navidrome/core/playback"
"github.com/navidrome/navidrome/core/playlists"
@@ -104,8 +103,7 @@ func CreateSubsonicAPIRouter(ctx context.Context) *subsonic.Router {
modelScanner := scanner.New(ctx, dataStore, cacheWarmer, broker, playlistsPlaylists, metricsMetrics)
playTracker := scrobbler.GetPlayTracker(dataStore, broker, manager)
playbackServer := playback.GetInstance(dataStore)
lyricsLyrics := lyrics.NewLyrics(manager)
router := subsonic.New(dataStore, artworkArtwork, mediaStreamer, archiver, players, provider, modelScanner, broker, playlistsPlaylists, playTracker, share, playbackServer, metricsMetrics, lyricsLyrics)
router := subsonic.New(dataStore, artworkArtwork, mediaStreamer, archiver, players, provider, modelScanner, broker, playlistsPlaylists, playTracker, share, playbackServer, metricsMetrics)
return router
}
@@ -209,7 +207,7 @@ func getPluginManager() *plugins.Manager {
// wire_injectors.go:
var allProviders = wire.NewSet(core.Set, artwork.Set, server.New, subsonic.New, nativeapi.New, public.New, persistence.New, lastfm.NewRouter, listenbrainz.NewRouter, events.GetBroker, scanner.New, scanner.GetWatcher, metrics.GetPrometheusInstance, db.Db, plugins.GetManager, wire.Bind(new(agents.PluginLoader), new(*plugins.Manager)), wire.Bind(new(scrobbler.PluginLoader), new(*plugins.Manager)), wire.Bind(new(lyrics.PluginLoader), new(*plugins.Manager)), wire.Bind(new(nativeapi.PluginManager), new(*plugins.Manager)), wire.Bind(new(core.PluginUnloader), new(*plugins.Manager)), wire.Bind(new(plugins.PluginMetricsRecorder), new(metrics.Metrics)), wire.Bind(new(core.Watcher), new(scanner.Watcher)))
var allProviders = wire.NewSet(core.Set, artwork.Set, server.New, subsonic.New, nativeapi.New, public.New, persistence.New, lastfm.NewRouter, listenbrainz.NewRouter, events.GetBroker, scanner.New, scanner.GetWatcher, metrics.GetPrometheusInstance, db.Db, plugins.GetManager, wire.Bind(new(agents.PluginLoader), new(*plugins.Manager)), wire.Bind(new(scrobbler.PluginLoader), new(*plugins.Manager)), wire.Bind(new(nativeapi.PluginManager), new(*plugins.Manager)), wire.Bind(new(core.PluginUnloader), new(*plugins.Manager)), wire.Bind(new(plugins.PluginMetricsRecorder), new(metrics.Metrics)), wire.Bind(new(core.Watcher), new(scanner.Watcher)))
func GetPluginManager(ctx context.Context) *plugins.Manager {
manager := getPluginManager()

View File

@@ -11,7 +11,6 @@ import (
"github.com/navidrome/navidrome/core"
"github.com/navidrome/navidrome/core/agents"
"github.com/navidrome/navidrome/core/artwork"
"github.com/navidrome/navidrome/core/lyrics"
"github.com/navidrome/navidrome/core/metrics"
"github.com/navidrome/navidrome/core/playback"
"github.com/navidrome/navidrome/core/scrobbler"
@@ -45,7 +44,6 @@ var allProviders = wire.NewSet(
plugins.GetManager,
wire.Bind(new(agents.PluginLoader), new(*plugins.Manager)),
wire.Bind(new(scrobbler.PluginLoader), new(*plugins.Manager)),
wire.Bind(new(lyrics.PluginLoader), new(*plugins.Manager)),
wire.Bind(new(nativeapi.PluginManager), new(*plugins.Manager)),
wire.Bind(new(core.PluginUnloader), new(*plugins.Manager)),
wire.Bind(new(plugins.PluginMetricsRecorder), new(metrics.Metrics)),

View File

@@ -9,29 +9,7 @@ import (
"github.com/navidrome/navidrome/model"
)
// Lyrics can fetch lyrics for a media file.
type Lyrics interface {
GetLyrics(ctx context.Context, mf *model.MediaFile) (model.LyricList, error)
}
// PluginLoader discovers and loads lyrics provider plugins.
type PluginLoader interface {
LoadLyricsProvider(name string) (Lyrics, bool)
}
type lyricsService struct {
pluginLoader PluginLoader
}
// NewLyrics creates a new lyrics service. pluginLoader may be nil if no plugin
// system is available.
func NewLyrics(pluginLoader PluginLoader) Lyrics {
return &lyricsService{pluginLoader: pluginLoader}
}
// GetLyrics returns lyrics for the given media file, trying sources in the
// order specified by conf.Server.LyricsPriority.
func (l *lyricsService) GetLyrics(ctx context.Context, mf *model.MediaFile) (model.LyricList, error) {
func GetLyrics(ctx context.Context, mf *model.MediaFile) (model.LyricList, error) {
var lyricsList model.LyricList
var err error
@@ -43,11 +21,11 @@ func (l *lyricsService) GetLyrics(ctx context.Context, mf *model.MediaFile) (mod
case strings.HasPrefix(pattern, "."):
lyricsList, err = fromExternalFile(ctx, mf, pattern)
default:
lyricsList, err = l.fromPlugin(ctx, mf, pattern)
log.Error(ctx, "Invalid lyric pattern", "pattern", pattern)
}
if err != nil {
log.Error(ctx, "error getting lyrics", "source", pattern, err)
log.Error(ctx, "error parsing lyrics", "source", pattern, err)
}
if len(lyricsList) > 0 {

View File

@@ -3,7 +3,6 @@ package lyrics_test
import (
"context"
"encoding/json"
"fmt"
"os"
"github.com/navidrome/navidrome/conf"
@@ -73,8 +72,7 @@ var _ = Describe("sources", func() {
DescribeTable("Lyrics Priority", func(priority string, expected model.LyricList) {
conf.Server.LyricsPriority = priority
svc := lyrics.NewLyrics(nil)
list, err := svc.GetLyrics(ctx, &mf)
list, err := lyrics.GetLyrics(ctx, &mf)
Expect(err).To(BeNil())
Expect(list).To(Equal(expected))
},
@@ -109,8 +107,7 @@ var _ = Describe("sources", func() {
It("should fallback to embedded if an error happens when parsing file", func() {
conf.Server.LyricsPriority = ".mp3,embedded"
svc := lyrics.NewLyrics(nil)
list, err := svc.GetLyrics(ctx, &mf)
list, err := lyrics.GetLyrics(ctx, &mf)
Expect(err).To(BeNil())
Expect(list).To(Equal(embeddedLyrics))
})
@@ -118,94 +115,10 @@ var _ = Describe("sources", func() {
It("should return nothing if error happens when trying to parse file", func() {
conf.Server.LyricsPriority = ".mp3"
svc := lyrics.NewLyrics(nil)
list, err := svc.GetLyrics(ctx, &mf)
list, err := lyrics.GetLyrics(ctx, &mf)
Expect(err).To(BeNil())
Expect(list).To(BeEmpty())
})
})
})
Context("plugin sources", func() {
var mockLoader *mockPluginLoader
BeforeEach(func() {
mockLoader = &mockPluginLoader{}
})
It("should return lyrics from a plugin", func() {
conf.Server.LyricsPriority = "test-lyrics-plugin"
mockLoader.lyrics = unsyncedLyrics
svc := lyrics.NewLyrics(mockLoader)
list, err := svc.GetLyrics(ctx, &mf)
Expect(err).To(BeNil())
Expect(list).To(Equal(unsyncedLyrics))
})
It("should try plugin after embedded returns nothing", func() {
conf.Server.LyricsPriority = "embedded,test-lyrics-plugin"
mf.Lyrics = "" // No embedded lyrics
mockLoader.lyrics = unsyncedLyrics
svc := lyrics.NewLyrics(mockLoader)
list, err := svc.GetLyrics(ctx, &mf)
Expect(err).To(BeNil())
Expect(list).To(Equal(unsyncedLyrics))
})
It("should skip plugin if embedded has lyrics", func() {
conf.Server.LyricsPriority = "embedded,test-lyrics-plugin"
mockLoader.lyrics = unsyncedLyrics
svc := lyrics.NewLyrics(mockLoader)
list, err := svc.GetLyrics(ctx, &mf)
Expect(err).To(BeNil())
Expect(list).To(Equal(embeddedLyrics)) // embedded wins
})
It("should skip unknown plugin names gracefully", func() {
conf.Server.LyricsPriority = "nonexistent-plugin,embedded"
mockLoader.notFound = true
svc := lyrics.NewLyrics(mockLoader)
list, err := svc.GetLyrics(ctx, &mf)
Expect(err).To(BeNil())
Expect(list).To(Equal(embeddedLyrics)) // falls through to embedded
})
It("should handle plugin error gracefully", func() {
conf.Server.LyricsPriority = "test-lyrics-plugin,embedded"
mockLoader.err = fmt.Errorf("plugin error")
svc := lyrics.NewLyrics(mockLoader)
list, err := svc.GetLyrics(ctx, &mf)
Expect(err).To(BeNil())
Expect(list).To(Equal(embeddedLyrics)) // falls through to embedded
})
})
})
type mockPluginLoader struct {
lyrics model.LyricList
err error
notFound bool
}
func (m *mockPluginLoader) PluginNames(_ string) []string {
if m.notFound {
return nil
}
return []string{"test-lyrics-plugin"}
}
func (m *mockPluginLoader) LoadLyricsProvider(_ string) (lyrics.Lyrics, bool) {
if m.notFound {
return nil, false
}
return &mockLyricsProvider{lyrics: m.lyrics, err: m.err}, true
}
type mockLyricsProvider struct {
lyrics model.LyricList
err error
}
func (m *mockLyricsProvider) GetLyrics(_ context.Context, _ *model.MediaFile) (model.LyricList, error) {
return m.lyrics, m.err
}

View File

@@ -49,27 +49,3 @@ func fromExternalFile(ctx context.Context, mf *model.MediaFile, suffix string) (
return model.LyricList{*lyrics}, nil
}
// fromPlugin attempts to load lyrics from a plugin with the given name.
func (l *lyricsService) fromPlugin(ctx context.Context, mf *model.MediaFile, pluginName string) (model.LyricList, error) {
if l.pluginLoader == nil {
log.Debug(ctx, "Invalid lyric source", "source", pluginName)
return nil, nil
}
provider, ok := l.pluginLoader.LoadLyricsProvider(pluginName)
if !ok {
log.Warn(ctx, "Lyrics plugin not found", "plugin", pluginName)
return nil, nil
}
lyricsList, err := provider.GetLyrics(ctx, mf)
if err != nil {
return nil, err
}
if len(lyricsList) > 0 {
log.Trace(ctx, "Retrieved lyrics from plugin", "plugin", pluginName, "count", len(lyricsList))
}
return lyricsList, nil
}

View File

@@ -5,7 +5,6 @@ import (
"github.com/navidrome/navidrome/core/agents"
"github.com/navidrome/navidrome/core/external"
"github.com/navidrome/navidrome/core/ffmpeg"
"github.com/navidrome/navidrome/core/lyrics"
"github.com/navidrome/navidrome/core/metrics"
"github.com/navidrome/navidrome/core/playback"
"github.com/navidrome/navidrome/core/playlists"
@@ -29,5 +28,4 @@ var Set = wire.NewSet(
scrobbler.GetPlayTracker,
playback.GetInstance,
metrics.GetInstance,
lyrics.NewLyrics,
)

View File

@@ -1,26 +0,0 @@
package capabilities
// Lyrics provides lyrics for a given track from external sources.
//
//nd:capability name=lyrics required=true
type Lyrics interface {
//nd:export name=nd_lyrics_get_lyrics
GetLyrics(GetLyricsRequest) (GetLyricsResponse, error)
}
// GetLyricsRequest contains the track information for lyrics lookup.
type GetLyricsRequest struct {
Track TrackInfo `json:"track"`
}
// GetLyricsResponse contains the lyrics returned by the plugin.
type GetLyricsResponse struct {
Lyrics []LyricsText `json:"lyrics"`
}
// LyricsText represents a single set of lyrics in raw text format.
// Text can be plain text or LRC format — Navidrome will parse it.
type LyricsText struct {
Lang string `json:"lang,omitempty"`
Text string `json:"text"`
}

View File

@@ -1,115 +0,0 @@
version: v1-draft
exports:
nd_lyrics_get_lyrics:
input:
$ref: '#/components/schemas/GetLyricsRequest'
contentType: application/json
output:
$ref: '#/components/schemas/GetLyricsResponse'
contentType: application/json
components:
schemas:
ArtistRef:
description: ArtistRef is a reference to an artist with name and optional MBID.
properties:
id:
type: string
description: ID is the internal Navidrome artist ID (if known).
name:
type: string
description: Name is the artist name.
mbid:
type: string
description: MBID is the MusicBrainz ID for the artist.
required:
- name
GetLyricsRequest:
description: GetLyricsRequest contains the track information for lyrics lookup.
properties:
track:
$ref: '#/components/schemas/TrackInfo'
required:
- track
GetLyricsResponse:
description: GetLyricsResponse contains the lyrics returned by the plugin.
properties:
lyrics:
type: array
items:
$ref: '#/components/schemas/LyricsText'
required:
- lyrics
LyricsText:
description: |-
LyricsText represents a single set of lyrics in raw text format.
Text can be plain text or LRC format — Navidrome will parse it.
properties:
lang:
type: string
text:
type: string
required:
- text
TrackInfo:
description: TrackInfo contains track metadata for scrobbling.
properties:
id:
type: string
description: ID is the internal Navidrome track ID.
title:
type: string
description: Title is the track title.
album:
type: string
description: Album is the album name.
artist:
type: string
description: Artist is the formatted artist name for display (e.g., "Artist1 • Artist2").
albumArtist:
type: string
description: AlbumArtist is the formatted album artist name for display.
artists:
type: array
description: Artists is the list of track artists.
items:
$ref: '#/components/schemas/ArtistRef'
albumArtists:
type: array
description: AlbumArtists is the list of album artists.
items:
$ref: '#/components/schemas/ArtistRef'
duration:
type: number
format: float
description: Duration is the track duration in seconds.
trackNumber:
type: integer
format: int32
description: TrackNumber is the track number on the album.
discNumber:
type: integer
format: int32
description: DiscNumber is the disc number.
mbzRecordingId:
type: string
description: MBZRecordingID is the MusicBrainz recording ID.
mbzAlbumId:
type: string
description: MBZAlbumID is the MusicBrainz album/release ID.
mbzReleaseGroupId:
type: string
description: MBZReleaseGroupID is the MusicBrainz release group ID.
mbzReleaseTrackId:
type: string
description: MBZReleaseTrackID is the MusicBrainz release track ID.
required:
- id
- title
- album
- artist
- albumArtist
- artists
- albumArtists
- duration
- trackNumber
- discNumber

View File

@@ -0,0 +1,27 @@
package capabilities
// TaskWorker provides task execution handling.
// This capability allows plugins to receive callbacks when their queued tasks
// are ready to execute. Plugins that use the taskqueue host service must
// implement this capability.
//
//nd:capability name=taskworker
type TaskWorker interface {
// OnTaskExecute is called when a queued task is ready to run.
// The returned string is a status/result message stored in the tasks table.
// Return an error to trigger retry (if retries are configured).
//nd:export name=nd_task_execute
OnTaskExecute(TaskExecuteRequest) (string, error)
}
// TaskExecuteRequest is the request provided when a task is ready to execute.
type TaskExecuteRequest struct {
// QueueName is the name of the queue this task belongs to.
QueueName string `json:"queueName"`
// TaskID is the unique identifier for this task.
TaskID string `json:"taskId"`
// Payload is the opaque data provided when the task was enqueued.
Payload []byte `json:"payload"`
// Attempt is the current attempt number (1-based: first attempt = 1).
Attempt int32 `json:"attempt"`
}

View File

@@ -0,0 +1,38 @@
version: v1-draft
exports:
nd_task_execute:
description: |-
OnTaskExecute is called when a queued task is ready to run.
The returned string is a status/result message stored in the tasks table.
Return an error to trigger retry (if retries are configured).
input:
$ref: '#/components/schemas/TaskExecuteRequest'
contentType: application/json
output:
type: string
contentType: application/json
components:
schemas:
TaskExecuteRequest:
description: TaskExecuteRequest is the request provided when a task is ready to execute.
properties:
queueName:
type: string
description: QueueName is the name of the queue this task belongs to.
taskId:
type: string
description: TaskID is the unique identifier for this task.
payload:
type: array
description: Payload is the opaque data provided when the task was enqueued.
items:
type: object
attempt:
type: integer
format: int32
description: 'Attempt is the current attempt number (1-based: first attempt = 1).'
required:
- queueName
- taskId
- payload
- attempt

73
plugins/host/task.go Normal file
View File

@@ -0,0 +1,73 @@
package host
import "context"
// TaskInfo holds the current state of a task.
type TaskInfo struct {
// Status is the current task status: "pending", "running",
// "completed", "failed", or "cancelled".
Status string `json:"status"`
// Message is the status/result message returned by the plugin callback.
Message string `json:"message"`
// Attempt is the current or last attempt number (1-based).
Attempt int32 `json:"attempt"`
}
// QueueConfig holds configuration for a task queue.
type QueueConfig struct {
// Concurrency is the max number of parallel workers. Default: 1.
// Capped by the plugin's manifest maxConcurrency.
Concurrency int32 `json:"concurrency"`
// MaxRetries is the number of times to retry a failed task. Default: 0.
MaxRetries int32 `json:"maxRetries"`
// BackoffMs is the initial backoff between retries in milliseconds.
// Doubles each retry (exponential: backoffMs * 2^(attempt-1)). Default: 1000.
BackoffMs int64 `json:"backoffMs"`
// DelayMs is the minimum delay between starting consecutive tasks
// in milliseconds. Useful for rate limiting. Default: 0.
DelayMs int64 `json:"delayMs"`
// RetentionMs is how long completed/failed/cancelled tasks are kept
// in milliseconds. Default: 3600000 (1h). Min: 60000 (1m). Max: 604800000 (1w).
RetentionMs int64 `json:"retentionMs"`
}
// TaskService provides persistent task queues for plugins.
//
// This service allows plugins to create named queues with configurable concurrency,
// retry policies, and rate limiting. Tasks are persisted to SQLite and survive
// server restarts. When a task is ready to execute, the host calls the plugin's
// nd_task_execute callback function.
//
//nd:hostservice name=Task permission=taskqueue
type TaskService interface {
// CreateQueue creates a named task queue with the given configuration.
// Zero-value fields in config use sensible defaults.
// If a queue with the same name already exists, returns an error.
// On startup, this also recovers any stale "running" tasks from a previous crash.
//nd:hostfunc
CreateQueue(ctx context.Context, name string, config QueueConfig) error
// Enqueue adds a task to the named queue. Returns the task ID.
// payload is opaque bytes passed back to the plugin on execution.
//nd:hostfunc
Enqueue(ctx context.Context, queueName string, payload []byte) (string, error)
// Get returns the current state of a task including its status,
// message, and attempt count.
//nd:hostfunc
Get(ctx context.Context, taskID string) (*TaskInfo, error)
// Cancel cancels a pending task. Returns error if already
// running, completed, or failed.
//nd:hostfunc
Cancel(ctx context.Context, taskID string) error
// ClearQueue removes all pending tasks from the named queue.
// Running tasks are not affected. Returns the number of tasks removed.
//nd:hostfunc
ClearQueue(ctx context.Context, queueName string) (int64, error)
}

266
plugins/host/task_gen.go Normal file
View File

@@ -0,0 +1,266 @@
// Code generated by ndpgen. DO NOT EDIT.
package host
import (
"context"
"encoding/json"
extism "github.com/extism/go-sdk"
)
// TaskCreateQueueRequest is the request type for Task.CreateQueue.
type TaskCreateQueueRequest struct {
Name string `json:"name"`
Config QueueConfig `json:"config"`
}
// TaskCreateQueueResponse is the response type for Task.CreateQueue.
type TaskCreateQueueResponse struct {
Error string `json:"error,omitempty"`
}
// TaskEnqueueRequest is the request type for Task.Enqueue.
type TaskEnqueueRequest struct {
QueueName string `json:"queueName"`
Payload []byte `json:"payload"`
}
// TaskEnqueueResponse is the response type for Task.Enqueue.
type TaskEnqueueResponse struct {
Result string `json:"result,omitempty"`
Error string `json:"error,omitempty"`
}
// TaskGetRequest is the request type for Task.Get.
type TaskGetRequest struct {
TaskID string `json:"taskId"`
}
// TaskGetResponse is the response type for Task.Get.
type TaskGetResponse struct {
Result *TaskInfo `json:"result,omitempty"`
Error string `json:"error,omitempty"`
}
// TaskCancelRequest is the request type for Task.Cancel.
type TaskCancelRequest struct {
TaskID string `json:"taskId"`
}
// TaskCancelResponse is the response type for Task.Cancel.
type TaskCancelResponse struct {
Error string `json:"error,omitempty"`
}
// TaskClearQueueRequest is the request type for Task.ClearQueue.
type TaskClearQueueRequest struct {
QueueName string `json:"queueName"`
}
// TaskClearQueueResponse is the response type for Task.ClearQueue.
type TaskClearQueueResponse struct {
Result int64 `json:"result,omitempty"`
Error string `json:"error,omitempty"`
}
// RegisterTaskHostFunctions registers Task service host functions.
// The returned host functions should be added to the plugin's configuration.
func RegisterTaskHostFunctions(service TaskService) []extism.HostFunction {
return []extism.HostFunction{
newTaskCreateQueueHostFunction(service),
newTaskEnqueueHostFunction(service),
newTaskGetHostFunction(service),
newTaskCancelHostFunction(service),
newTaskClearQueueHostFunction(service),
}
}
func newTaskCreateQueueHostFunction(service TaskService) extism.HostFunction {
return extism.NewHostFunctionWithStack(
"task_createqueue",
func(ctx context.Context, p *extism.CurrentPlugin, stack []uint64) {
// Read JSON request from plugin memory
reqBytes, err := p.ReadBytes(stack[0])
if err != nil {
taskWriteError(p, stack, err)
return
}
var req TaskCreateQueueRequest
if err := json.Unmarshal(reqBytes, &req); err != nil {
taskWriteError(p, stack, err)
return
}
// Call the service method
if svcErr := service.CreateQueue(ctx, req.Name, req.Config); svcErr != nil {
taskWriteError(p, stack, svcErr)
return
}
// Write JSON response to plugin memory
resp := TaskCreateQueueResponse{}
taskWriteResponse(p, stack, resp)
},
[]extism.ValueType{extism.ValueTypePTR},
[]extism.ValueType{extism.ValueTypePTR},
)
}
func newTaskEnqueueHostFunction(service TaskService) extism.HostFunction {
return extism.NewHostFunctionWithStack(
"task_enqueue",
func(ctx context.Context, p *extism.CurrentPlugin, stack []uint64) {
// Read JSON request from plugin memory
reqBytes, err := p.ReadBytes(stack[0])
if err != nil {
taskWriteError(p, stack, err)
return
}
var req TaskEnqueueRequest
if err := json.Unmarshal(reqBytes, &req); err != nil {
taskWriteError(p, stack, err)
return
}
// Call the service method
result, svcErr := service.Enqueue(ctx, req.QueueName, req.Payload)
if svcErr != nil {
taskWriteError(p, stack, svcErr)
return
}
// Write JSON response to plugin memory
resp := TaskEnqueueResponse{
Result: result,
}
taskWriteResponse(p, stack, resp)
},
[]extism.ValueType{extism.ValueTypePTR},
[]extism.ValueType{extism.ValueTypePTR},
)
}
func newTaskGetHostFunction(service TaskService) extism.HostFunction {
return extism.NewHostFunctionWithStack(
"task_get",
func(ctx context.Context, p *extism.CurrentPlugin, stack []uint64) {
// Read JSON request from plugin memory
reqBytes, err := p.ReadBytes(stack[0])
if err != nil {
taskWriteError(p, stack, err)
return
}
var req TaskGetRequest
if err := json.Unmarshal(reqBytes, &req); err != nil {
taskWriteError(p, stack, err)
return
}
// Call the service method
result, svcErr := service.Get(ctx, req.TaskID)
if svcErr != nil {
taskWriteError(p, stack, svcErr)
return
}
// Write JSON response to plugin memory
resp := TaskGetResponse{
Result: result,
}
taskWriteResponse(p, stack, resp)
},
[]extism.ValueType{extism.ValueTypePTR},
[]extism.ValueType{extism.ValueTypePTR},
)
}
func newTaskCancelHostFunction(service TaskService) extism.HostFunction {
return extism.NewHostFunctionWithStack(
"task_cancel",
func(ctx context.Context, p *extism.CurrentPlugin, stack []uint64) {
// Read JSON request from plugin memory
reqBytes, err := p.ReadBytes(stack[0])
if err != nil {
taskWriteError(p, stack, err)
return
}
var req TaskCancelRequest
if err := json.Unmarshal(reqBytes, &req); err != nil {
taskWriteError(p, stack, err)
return
}
// Call the service method
if svcErr := service.Cancel(ctx, req.TaskID); svcErr != nil {
taskWriteError(p, stack, svcErr)
return
}
// Write JSON response to plugin memory
resp := TaskCancelResponse{}
taskWriteResponse(p, stack, resp)
},
[]extism.ValueType{extism.ValueTypePTR},
[]extism.ValueType{extism.ValueTypePTR},
)
}
func newTaskClearQueueHostFunction(service TaskService) extism.HostFunction {
return extism.NewHostFunctionWithStack(
"task_clearqueue",
func(ctx context.Context, p *extism.CurrentPlugin, stack []uint64) {
// Read JSON request from plugin memory
reqBytes, err := p.ReadBytes(stack[0])
if err != nil {
taskWriteError(p, stack, err)
return
}
var req TaskClearQueueRequest
if err := json.Unmarshal(reqBytes, &req); err != nil {
taskWriteError(p, stack, err)
return
}
// Call the service method
result, svcErr := service.ClearQueue(ctx, req.QueueName)
if svcErr != nil {
taskWriteError(p, stack, svcErr)
return
}
// Write JSON response to plugin memory
resp := TaskClearQueueResponse{
Result: result,
}
taskWriteResponse(p, stack, resp)
},
[]extism.ValueType{extism.ValueTypePTR},
[]extism.ValueType{extism.ValueTypePTR},
)
}
// taskWriteResponse writes a JSON response to plugin memory.
func taskWriteResponse(p *extism.CurrentPlugin, stack []uint64, resp any) {
respBytes, err := json.Marshal(resp)
if err != nil {
taskWriteError(p, stack, err)
return
}
respPtr, err := p.WriteBytes(respBytes)
if err != nil {
stack[0] = 0
return
}
stack[0] = respPtr
}
// taskWriteError writes an error response to plugin memory.
func taskWriteError(p *extism.CurrentPlugin, stack []uint64, err error) {
errResp := struct {
Error string `json:"error"`
}{Error: err.Error()}
respBytes, _ := json.Marshal(errResp)
respPtr, _ := p.WriteBytes(respBytes)
stack[0] = respPtr
}

596
plugins/host_taskqueue.go Normal file
View File

@@ -0,0 +1,596 @@
package plugins
import (
"context"
"database/sql"
"errors"
"fmt"
"io"
"os"
"path/filepath"
"sync"
"time"
_ "github.com/mattn/go-sqlite3"
"github.com/navidrome/navidrome/conf"
"github.com/navidrome/navidrome/log"
"github.com/navidrome/navidrome/model/id"
"github.com/navidrome/navidrome/plugins/capabilities"
"github.com/navidrome/navidrome/plugins/host"
"golang.org/x/time/rate"
)
const (
defaultConcurrency int32 = 1
defaultBackoffMs int64 = 1000
defaultRetentionMs int64 = 3_600_000 // 1 hour
minRetentionMs int64 = 60_000 // 1 minute
maxRetentionMs int64 = 604_800_000 // 1 week
maxQueueNameLength = 128
maxPayloadSize = 1 * 1024 * 1024 // 1MB
maxBackoffMs int64 = 3_600_000 // 1 hour
cleanupInterval = 5 * time.Minute
pollInterval = 5 * time.Second
shutdownTimeout = 10 * time.Second
taskStatusPending = "pending"
taskStatusRunning = "running"
taskStatusCompleted = "completed"
taskStatusFailed = "failed"
taskStatusCancelled = "cancelled"
)
// CapabilityTaskWorker indicates the plugin can receive task execution callbacks.
const CapabilityTaskWorker Capability = "TaskWorker"
const FuncTaskWorkerCallback = "nd_task_execute"
func init() {
registerCapability(CapabilityTaskWorker, FuncTaskWorkerCallback)
}
type queueState struct {
config host.QueueConfig
signal chan struct{}
limiter *rate.Limiter
}
// notifyWorkers sends a non-blocking signal to wake up queue workers.
func (qs *queueState) notifyWorkers() {
select {
case qs.signal <- struct{}{}:
default:
}
}
// taskQueueServiceImpl implements host.TaskQueueService with SQLite persistence
// and background worker goroutines for task execution.
type taskQueueServiceImpl struct {
pluginName string
manager *Manager
maxConcurrency int32
db *sql.DB
ctx context.Context
cancel context.CancelFunc
wg sync.WaitGroup
mu sync.Mutex
queues map[string]*queueState
// For testing: override how callbacks are invoked
invokeCallbackFn func(ctx context.Context, queueName, taskID string, payload []byte, attempt int32) (string, error)
}
// newTaskQueueService creates a new taskQueueServiceImpl with its own SQLite database.
func newTaskQueueService(pluginName string, manager *Manager, maxConcurrency int32) (*taskQueueServiceImpl, error) {
dataDir := filepath.Join(conf.Server.DataFolder, "plugins", pluginName)
if err := os.MkdirAll(dataDir, 0700); err != nil {
return nil, fmt.Errorf("creating plugin data directory: %w", err)
}
dbPath := filepath.Join(dataDir, "taskqueue.db")
db, err := sql.Open("sqlite3", dbPath+"?_busy_timeout=5000&_journal_mode=WAL&_foreign_keys=off")
if err != nil {
return nil, fmt.Errorf("opening taskqueue database: %w", err)
}
db.SetMaxOpenConns(3)
db.SetMaxIdleConns(1)
if err := createTaskQueueSchema(db); err != nil {
db.Close()
return nil, fmt.Errorf("creating taskqueue schema: %w", err)
}
ctx, cancel := context.WithCancel(manager.ctx)
s := &taskQueueServiceImpl{
pluginName: pluginName,
manager: manager,
maxConcurrency: maxConcurrency,
db: db,
ctx: ctx,
cancel: cancel,
queues: make(map[string]*queueState),
}
s.invokeCallbackFn = s.defaultInvokeCallback
s.wg.Go(s.cleanupLoop)
log.Debug("Initialized plugin taskqueue", "plugin", pluginName, "path", dbPath, "maxConcurrency", maxConcurrency)
return s, nil
}
func createTaskQueueSchema(db *sql.DB) error {
_, err := db.Exec(`
CREATE TABLE IF NOT EXISTS queues (
name TEXT PRIMARY KEY,
concurrency INTEGER NOT NULL DEFAULT 1,
max_retries INTEGER NOT NULL DEFAULT 0,
backoff_ms INTEGER NOT NULL DEFAULT 1000,
delay_ms INTEGER NOT NULL DEFAULT 0,
retention_ms INTEGER NOT NULL DEFAULT 3600000
);
CREATE TABLE IF NOT EXISTS tasks (
id TEXT PRIMARY KEY,
queue_name TEXT NOT NULL REFERENCES queues(name),
payload BLOB NOT NULL,
status TEXT NOT NULL DEFAULT 'pending',
attempt INTEGER NOT NULL DEFAULT 0,
max_retries INTEGER NOT NULL,
next_run_at INTEGER NOT NULL,
created_at INTEGER NOT NULL,
updated_at INTEGER NOT NULL,
message TEXT NOT NULL DEFAULT ''
);
CREATE INDEX IF NOT EXISTS idx_tasks_dequeue ON tasks(queue_name, status, next_run_at);
`)
return err
}
// applyConfigDefaults fills zero-value config fields with sensible defaults
// and clamps values to valid ranges, logging warnings for clamped values.
func (s *taskQueueServiceImpl) applyConfigDefaults(ctx context.Context, name string, config *host.QueueConfig) {
if config.Concurrency <= 0 {
config.Concurrency = defaultConcurrency
}
if config.BackoffMs <= 0 {
config.BackoffMs = defaultBackoffMs
}
if config.RetentionMs <= 0 {
config.RetentionMs = defaultRetentionMs
}
if config.RetentionMs < minRetentionMs {
log.Warn(ctx, "TaskQueue retention clamped to minimum", "plugin", s.pluginName, "queue", name,
"requested", config.RetentionMs, "min", minRetentionMs)
config.RetentionMs = minRetentionMs
}
if config.RetentionMs > maxRetentionMs {
log.Warn(ctx, "TaskQueue retention clamped to maximum", "plugin", s.pluginName, "queue", name,
"requested", config.RetentionMs, "max", maxRetentionMs)
config.RetentionMs = maxRetentionMs
}
}
// clampConcurrency reduces config.Concurrency if it exceeds the remaining budget.
// Returns an error when the concurrency budget is fully exhausted.
// Must be called with s.mu held.
func (s *taskQueueServiceImpl) clampConcurrency(ctx context.Context, name string, config *host.QueueConfig) error {
var allocated int32
for _, qs := range s.queues {
allocated += qs.config.Concurrency
}
available := s.maxConcurrency - allocated
if available <= 0 {
log.Warn(ctx, "TaskQueue concurrency budget exhausted", "plugin", s.pluginName, "queue", name,
"allocated", allocated, "maxConcurrency", s.maxConcurrency)
return fmt.Errorf("concurrency budget exhausted (%d/%d allocated)", allocated, s.maxConcurrency)
}
if config.Concurrency > available {
log.Warn(ctx, "TaskQueue concurrency clamped", "plugin", s.pluginName, "queue", name,
"requested", config.Concurrency, "available", available, "maxConcurrency", s.maxConcurrency)
config.Concurrency = available
}
return nil
}
func (s *taskQueueServiceImpl) CreateQueue(ctx context.Context, name string, config host.QueueConfig) error {
if len(name) == 0 {
return fmt.Errorf("queue name cannot be empty")
}
if len(name) > maxQueueNameLength {
return fmt.Errorf("queue name exceeds maximum length of %d bytes", maxQueueNameLength)
}
s.applyConfigDefaults(ctx, name, &config)
s.mu.Lock()
defer s.mu.Unlock()
if err := s.clampConcurrency(ctx, name, &config); err != nil {
return err
}
if _, exists := s.queues[name]; exists {
return fmt.Errorf("queue %q already exists", name)
}
// Upsert into queues table (idempotent across restarts)
_, err := s.db.ExecContext(ctx, `
INSERT INTO queues (name, concurrency, max_retries, backoff_ms, delay_ms, retention_ms)
VALUES (?, ?, ?, ?, ?, ?)
ON CONFLICT(name) DO UPDATE SET
concurrency = excluded.concurrency,
max_retries = excluded.max_retries,
backoff_ms = excluded.backoff_ms,
delay_ms = excluded.delay_ms,
retention_ms = excluded.retention_ms
`, name, config.Concurrency, config.MaxRetries, config.BackoffMs, config.DelayMs, config.RetentionMs)
if err != nil {
return fmt.Errorf("creating queue: %w", err)
}
// Reset stale running tasks from previous crash
now := time.Now().UnixMilli()
_, err = s.db.ExecContext(ctx, `
UPDATE tasks SET status = ?, updated_at = ? WHERE queue_name = ? AND status = ?
`, taskStatusPending, now, name, taskStatusRunning)
if err != nil {
return fmt.Errorf("resetting stale tasks: %w", err)
}
qs := &queueState{
config: config,
signal: make(chan struct{}, 1),
}
if config.DelayMs > 0 {
// Rate limit dispatches to enforce delay between tasks.
// Burst of 1 allows one immediate dispatch, then enforces the delay interval.
qs.limiter = rate.NewLimiter(rate.Every(time.Duration(config.DelayMs)*time.Millisecond), 1)
}
s.queues[name] = qs
for i := int32(0); i < config.Concurrency; i++ {
s.wg.Go(func() { s.worker(name, qs) })
}
log.Debug(ctx, "Created task queue", "plugin", s.pluginName, "queue", name,
"concurrency", config.Concurrency, "maxRetries", config.MaxRetries,
"backoffMs", config.BackoffMs, "delayMs", config.DelayMs, "retentionMs", config.RetentionMs)
return nil
}
func (s *taskQueueServiceImpl) Enqueue(ctx context.Context, queueName string, payload []byte) (string, error) {
s.mu.Lock()
qs, exists := s.queues[queueName]
s.mu.Unlock()
if !exists {
return "", fmt.Errorf("queue %q does not exist", queueName)
}
if len(payload) > maxPayloadSize {
return "", fmt.Errorf("payload size %d exceeds maximum of %d bytes", len(payload), maxPayloadSize)
}
taskID := id.NewRandom()
now := time.Now().UnixMilli()
_, err := s.db.ExecContext(ctx, `
INSERT INTO tasks (id, queue_name, payload, status, attempt, max_retries, next_run_at, created_at, updated_at)
VALUES (?, ?, ?, ?, 0, ?, ?, ?, ?)
`, taskID, queueName, payload, taskStatusPending, qs.config.MaxRetries, now, now, now)
if err != nil {
return "", fmt.Errorf("enqueuing task: %w", err)
}
qs.notifyWorkers()
log.Trace(ctx, "Enqueued task", "plugin", s.pluginName, "queue", queueName, "taskID", taskID)
return taskID, nil
}
// Get returns the current state of a task.
func (s *taskQueueServiceImpl) Get(ctx context.Context, taskID string) (*host.TaskInfo, error) {
var info host.TaskInfo
err := s.db.QueryRowContext(ctx, `SELECT status, message, attempt FROM tasks WHERE id = ?`, taskID).
Scan(&info.Status, &info.Message, &info.Attempt)
if errors.Is(err, sql.ErrNoRows) {
return nil, fmt.Errorf("task %q not found", taskID)
}
if err != nil {
return nil, fmt.Errorf("getting task info: %w", err)
}
return &info, nil
}
// Cancel cancels a pending task.
func (s *taskQueueServiceImpl) Cancel(ctx context.Context, taskID string) error {
now := time.Now().UnixMilli()
result, err := s.db.ExecContext(ctx, `
UPDATE tasks SET status = ?, updated_at = ? WHERE id = ? AND status = ?
`, taskStatusCancelled, now, taskID, taskStatusPending)
if err != nil {
return fmt.Errorf("cancelling task: %w", err)
}
rowsAffected, err := result.RowsAffected()
if err != nil {
return fmt.Errorf("checking cancel result: %w", err)
}
if rowsAffected == 0 {
// Check if task exists at all
var status string
err := s.db.QueryRowContext(ctx, `SELECT status FROM tasks WHERE id = ?`, taskID).Scan(&status)
if errors.Is(err, sql.ErrNoRows) {
return fmt.Errorf("task %q not found", taskID)
}
if err != nil {
return fmt.Errorf("checking task existence: %w", err)
}
return fmt.Errorf("task %q cannot be cancelled (status: %s)", taskID, status)
}
log.Trace(ctx, "Cancelled task", "plugin", s.pluginName, "taskID", taskID)
return nil
}
// ClearQueue removes all pending tasks from the named queue.
// Running tasks are not affected. Returns the number of tasks removed.
func (s *taskQueueServiceImpl) ClearQueue(ctx context.Context, queueName string) (int64, error) {
s.mu.Lock()
_, exists := s.queues[queueName]
s.mu.Unlock()
if !exists {
return 0, fmt.Errorf("queue %q does not exist", queueName)
}
now := time.Now().UnixMilli()
result, err := s.db.ExecContext(ctx, `
UPDATE tasks SET status = ?, updated_at = ? WHERE queue_name = ? AND status = ?
`, taskStatusCancelled, now, queueName, taskStatusPending)
if err != nil {
return 0, fmt.Errorf("clearing queue: %w", err)
}
cleared, err := result.RowsAffected()
if err != nil {
return 0, fmt.Errorf("checking clear result: %w", err)
}
if cleared > 0 {
log.Debug(ctx, "Cleared pending tasks from queue", "plugin", s.pluginName, "queue", queueName, "cleared", cleared)
}
return cleared, nil
}
// worker is the main loop for a single worker goroutine.
func (s *taskQueueServiceImpl) worker(queueName string, qs *queueState) {
// Process any existing pending tasks immediately on startup
s.drainQueue(queueName, qs)
ticker := time.NewTicker(pollInterval)
defer ticker.Stop()
for {
select {
case <-s.ctx.Done():
return
case <-qs.signal:
s.drainQueue(queueName, qs)
case <-ticker.C:
s.drainQueue(queueName, qs)
}
}
}
func (s *taskQueueServiceImpl) drainQueue(queueName string, qs *queueState) {
for s.ctx.Err() == nil && s.processTask(queueName, qs) {
}
}
// processTask dequeues and processes a single task. Returns true if a task was processed.
func (s *taskQueueServiceImpl) processTask(queueName string, qs *queueState) bool {
now := time.Now().UnixMilli()
// Atomically dequeue a task
var taskID string
var payload []byte
var attempt, maxRetries int32
err := s.db.QueryRowContext(s.ctx, `
UPDATE tasks SET status = ?, attempt = attempt + 1, updated_at = ?
WHERE id = (
SELECT id FROM tasks
WHERE queue_name = ? AND status = ? AND next_run_at <= ?
ORDER BY next_run_at, created_at LIMIT 1
)
RETURNING id, payload, attempt, max_retries
`, taskStatusRunning, now, queueName, taskStatusPending, now).Scan(&taskID, &payload, &attempt, &maxRetries)
if errors.Is(err, sql.ErrNoRows) {
return false
}
if err != nil {
log.Error(s.ctx, "Failed to dequeue task", "plugin", s.pluginName, "queue", queueName, err)
return false
}
// Enforce delay between task dispatches using a rate limiter.
// This is done after dequeue so that empty polls don't consume rate tokens.
if qs.limiter != nil {
if err := qs.limiter.Wait(s.ctx); err != nil {
// Context cancelled during wait — revert task to pending for recovery
s.revertTaskToPending(taskID)
return false
}
}
// Invoke callback
log.Debug(s.ctx, "Executing task", "plugin", s.pluginName, "queue", queueName, "taskID", taskID, "attempt", attempt)
message, callbackErr := s.invokeCallbackFn(s.ctx, queueName, taskID, payload, attempt)
// If context was cancelled (shutdown), revert task to pending for recovery
if s.ctx.Err() != nil {
s.revertTaskToPending(taskID)
return false
}
if callbackErr == nil {
s.completeTask(queueName, taskID, message)
} else {
s.handleTaskFailure(queueName, taskID, attempt, maxRetries, qs, callbackErr, message)
}
return true
}
func (s *taskQueueServiceImpl) completeTask(queueName, taskID, message string) {
now := time.Now().UnixMilli()
if _, err := s.db.ExecContext(s.ctx, `UPDATE tasks SET status = ?, message = ?, updated_at = ? WHERE id = ?`, taskStatusCompleted, message, now, taskID); err != nil {
log.Error(s.ctx, "Failed to mark task as completed", "plugin", s.pluginName, "taskID", taskID, err)
}
log.Debug(s.ctx, "Task completed", "plugin", s.pluginName, "queue", queueName, "taskID", taskID)
}
func (s *taskQueueServiceImpl) handleTaskFailure(queueName, taskID string, attempt, maxRetries int32, qs *queueState, callbackErr error, message string) {
log.Warn(s.ctx, "Task execution failed", "plugin", s.pluginName, "queue", queueName,
"taskID", taskID, "attempt", attempt, "maxRetries", maxRetries, "err", callbackErr)
// Use error message as fallback if no message was provided
if message == "" {
message = callbackErr.Error()
}
now := time.Now().UnixMilli()
if attempt > maxRetries {
if _, err := s.db.ExecContext(s.ctx, `UPDATE tasks SET status = ?, message = ?, updated_at = ? WHERE id = ?`, taskStatusFailed, message, now, taskID); err != nil {
log.Error(s.ctx, "Failed to mark task as failed", "plugin", s.pluginName, "taskID", taskID, err)
}
log.Warn(s.ctx, "Task failed after all retries", "plugin", s.pluginName, "queue", queueName, "taskID", taskID)
return
}
// Exponential backoff: backoffMs * 2^(attempt-1)
backoff := qs.config.BackoffMs << (attempt - 1)
if backoff <= 0 || backoff > maxBackoffMs {
backoff = maxBackoffMs
}
nextRunAt := now + backoff
if _, err := s.db.ExecContext(s.ctx, `
UPDATE tasks SET status = ?, next_run_at = ?, updated_at = ? WHERE id = ?
`, taskStatusPending, nextRunAt, now, taskID); err != nil {
log.Error(s.ctx, "Failed to reschedule task for retry", "plugin", s.pluginName, "taskID", taskID, err)
}
// Wake worker after backoff expires
time.AfterFunc(time.Duration(backoff)*time.Millisecond, func() {
qs.notifyWorkers()
})
}
// revertTaskToPending puts a running task back to pending status and decrements the attempt
// counter (used during shutdown to ensure the interrupted attempt doesn't count).
func (s *taskQueueServiceImpl) revertTaskToPending(taskID string) {
now := time.Now().UnixMilli()
_, err := s.db.Exec(`UPDATE tasks SET status = ?, attempt = MAX(attempt - 1, 0), updated_at = ? WHERE id = ? AND status = ?`, taskStatusPending, now, taskID, taskStatusRunning)
if err != nil {
log.Error("Failed to revert task to pending", "plugin", s.pluginName, "taskID", taskID, err)
}
}
// defaultInvokeCallback calls the plugin's nd_task_execute function.
func (s *taskQueueServiceImpl) defaultInvokeCallback(ctx context.Context, queueName, taskID string, payload []byte, attempt int32) (string, error) {
s.manager.mu.RLock()
p, ok := s.manager.plugins[s.pluginName]
s.manager.mu.RUnlock()
if !ok {
return "", fmt.Errorf("plugin %s not loaded", s.pluginName)
}
input := capabilities.TaskExecuteRequest{
QueueName: queueName,
TaskID: taskID,
Payload: payload,
Attempt: attempt,
}
message, err := callPluginFunction[capabilities.TaskExecuteRequest, string](ctx, p, FuncTaskWorkerCallback, input)
if err != nil {
return "", err
}
return message, nil
}
// cleanupLoop periodically removes terminal tasks past their retention period.
func (s *taskQueueServiceImpl) cleanupLoop() {
ticker := time.NewTicker(cleanupInterval)
defer ticker.Stop()
for {
select {
case <-s.ctx.Done():
return
case <-ticker.C:
s.runCleanup()
}
}
}
// runCleanup deletes terminal tasks past their retention period.
func (s *taskQueueServiceImpl) runCleanup() {
s.mu.Lock()
queues := make(map[string]*queueState, len(s.queues))
for k, v := range s.queues {
queues[k] = v
}
s.mu.Unlock()
now := time.Now().UnixMilli()
for name, qs := range queues {
result, err := s.db.ExecContext(s.ctx, `
DELETE FROM tasks WHERE queue_name = ? AND status IN (?, ?, ?) AND updated_at + ? < ?
`, name, taskStatusCompleted, taskStatusFailed, taskStatusCancelled, qs.config.RetentionMs, now)
if err != nil {
log.Error(s.ctx, "Failed to cleanup tasks", "plugin", s.pluginName, "queue", name, err)
continue
}
if deleted, _ := result.RowsAffected(); deleted > 0 {
log.Debug(s.ctx, "Cleaned up terminal tasks", "plugin", s.pluginName, "queue", name, "deleted", deleted)
}
}
}
// Close shuts down the task queue service, stopping all workers and closing the database.
func (s *taskQueueServiceImpl) Close() error {
// Cancel context to signal all goroutines
s.cancel()
// Wait for goroutines with timeout
done := make(chan struct{})
go func() {
s.wg.Wait()
close(done)
}()
select {
case <-done:
case <-time.After(shutdownTimeout):
log.Warn("TaskQueue shutdown timed out", "plugin", s.pluginName)
}
// Mark running tasks as pending for recovery on next startup
if s.db != nil {
now := time.Now().UnixMilli()
if _, err := s.db.Exec(`UPDATE tasks SET status = ?, updated_at = ? WHERE status = ?`, taskStatusPending, now, taskStatusRunning); err != nil {
log.Error("Failed to reset running tasks on shutdown", "plugin", s.pluginName, err)
}
log.Debug("Closing plugin taskqueue", "plugin", s.pluginName)
return s.db.Close()
}
return nil
}
// Compile-time verification
var _ host.TaskService = (*taskQueueServiceImpl)(nil)
var _ io.Closer = (*taskQueueServiceImpl)(nil)

View File

File diff suppressed because it is too large Load Diff

View File

@@ -1,55 +0,0 @@
package plugins
import (
"context"
"github.com/navidrome/navidrome/log"
"github.com/navidrome/navidrome/model"
"github.com/navidrome/navidrome/plugins/capabilities"
)
const CapabilityLyrics Capability = "Lyrics"
const (
FuncLyricsGetLyrics = "nd_lyrics_get_lyrics"
)
func init() {
registerCapability(
CapabilityLyrics,
FuncLyricsGetLyrics,
)
}
// LyricsPlugin adapts a WASM plugin with the Lyrics capability.
type LyricsPlugin struct {
name string
plugin *plugin
}
// GetLyrics calls the plugin to fetch lyrics, then parses the raw text responses
// using model.ToLyrics.
func (l *LyricsPlugin) GetLyrics(ctx context.Context, mf *model.MediaFile) (model.LyricList, error) {
req := capabilities.GetLyricsRequest{
Track: mediaFileToTrackInfo(mf),
}
resp, err := callPluginFunction[capabilities.GetLyricsRequest, capabilities.GetLyricsResponse](
ctx, l.plugin, FuncLyricsGetLyrics, req,
)
if err != nil {
return nil, err
}
var result model.LyricList
for _, lt := range resp.Lyrics {
parsed, err := model.ToLyrics(lt.Lang, lt.Text)
if err != nil {
log.Warn(ctx, "Error parsing plugin lyrics", "plugin", l.name, err)
continue
}
if parsed != nil && !parsed.IsEmpty() {
result = append(result, *parsed)
}
}
return result, nil
}

View File

@@ -1,84 +0,0 @@
//go:build !windows
package plugins
import (
"github.com/navidrome/navidrome/model"
. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"
)
var _ = Describe("LyricsPlugin", Ordered, func() {
var (
lyricsManager *Manager
provider *LyricsPlugin
)
BeforeAll(func() {
lyricsManager, _ = createTestManagerWithPlugins(nil,
"test-lyrics"+PackageExtension,
"test-metadata-agent"+PackageExtension,
)
p, ok := lyricsManager.LoadLyricsProvider("test-lyrics")
Expect(ok).To(BeTrue())
provider = p.(*LyricsPlugin)
})
Describe("LoadLyricsProvider", func() {
It("returns a lyrics provider for a plugin with Lyrics capability", func() {
Expect(provider).ToNot(BeNil())
})
It("returns false for a plugin without Lyrics capability", func() {
_, ok := lyricsManager.LoadLyricsProvider("test-metadata-agent")
Expect(ok).To(BeFalse())
})
It("returns false for non-existent plugin", func() {
_, ok := lyricsManager.LoadLyricsProvider("non-existent")
Expect(ok).To(BeFalse())
})
})
Describe("GetLyrics", func() {
It("successfully returns lyrics from the plugin", func() {
track := &model.MediaFile{
ID: "track-1",
Title: "Test Song",
Artist: "Test Artist",
}
result, err := provider.GetLyrics(GinkgoT().Context(), track)
Expect(err).ToNot(HaveOccurred())
Expect(result).To(HaveLen(1))
Expect(result[0].Line).ToNot(BeEmpty())
Expect(result[0].Line[0].Value).To(ContainSubstring("Test Song"))
})
It("returns error when plugin returns error", func() {
manager, _ := createTestManagerWithPlugins(map[string]map[string]string{
"test-lyrics": {"error": "service unavailable"},
}, "test-lyrics"+PackageExtension)
p, ok := manager.LoadLyricsProvider("test-lyrics")
Expect(ok).To(BeTrue())
track := &model.MediaFile{ID: "track-1", Title: "Test Song"}
_, err := p.GetLyrics(GinkgoT().Context(), track)
Expect(err).To(HaveOccurred())
})
})
Describe("PluginNames", func() {
It("returns plugin names with Lyrics capability", func() {
names := lyricsManager.PluginNames("Lyrics")
Expect(names).To(ContainElement("test-lyrics"))
})
It("does not return metadata agent plugins for Lyrics capability", func() {
names := lyricsManager.PluginNames("Lyrics")
Expect(names).ToNot(ContainElement("test-metadata-agent"))
})
})
})

View File

@@ -16,7 +16,6 @@ import (
extism "github.com/extism/go-sdk"
"github.com/navidrome/navidrome/conf"
"github.com/navidrome/navidrome/core/agents"
"github.com/navidrome/navidrome/core/lyrics"
"github.com/navidrome/navidrome/core/scrobbler"
"github.com/navidrome/navidrome/log"
"github.com/navidrome/navidrome/model"
@@ -277,22 +276,6 @@ func (m *Manager) LoadScrobbler(name string) (scrobbler.Scrobbler, bool) {
}, true
}
// LoadLyricsProvider loads and returns a lyrics provider plugin by name.
func (m *Manager) LoadLyricsProvider(name string) (lyrics.Lyrics, bool) {
m.mu.RLock()
plugin, ok := m.plugins[name]
m.mu.RUnlock()
if !ok || !hasCapability(plugin.capabilities, CapabilityLyrics) {
return nil, false
}
return &LyricsPlugin{
name: plugin.name,
plugin: plugin,
}, true
}
// PluginInfo contains basic information about a plugin for metrics/insights.
type PluginInfo struct {
Name string

View File

@@ -128,6 +128,23 @@ var hostServices = []hostServiceEntry{
return host.RegisterHTTPHostFunctions(service), nil
},
},
{
name: "Task",
hasPermission: func(p *Permissions) bool { return p != nil && p.Taskqueue != nil },
create: func(ctx *serviceContext) ([]extism.HostFunction, io.Closer) {
perm := ctx.permissions.Taskqueue
maxConcurrency := int32(1)
if perm.MaxConcurrency > 0 {
maxConcurrency = int32(perm.MaxConcurrency)
}
service, err := newTaskQueueService(ctx.pluginName, ctx.manager, maxConcurrency)
if err != nil {
log.Error("Failed to create Task service", "plugin", ctx.pluginName, err)
return nil, nil
}
return host.RegisterTaskHostFunctions(service), service
},
},
}
// extractManifest reads manifest from an .ndp package and computes its SHA-256 hash.

View File

@@ -110,6 +110,9 @@
},
"users": {
"$ref": "#/$defs/UsersPermission"
},
"taskqueue": {
"$ref": "#/$defs/TaskQueuePermission"
}
}
},
@@ -224,6 +227,23 @@
}
}
},
"TaskQueuePermission": {
"type": "object",
"description": "Task queue permissions for background task processing",
"additionalProperties": false,
"properties": {
"reason": {
"type": "string",
"description": "Explanation for why task queue access is needed"
},
"maxConcurrency": {
"type": "integer",
"description": "Maximum total concurrent workers across all queues. Default: 1",
"minimum": 1,
"default": 1
}
}
},
"UsersPermission": {
"type": "object",
"description": "Users service permissions for accessing user information",

View File

@@ -72,6 +72,13 @@ func ValidateWithCapabilities(m *Manifest, capabilities []Capability) error {
}
}
// Task (taskqueue) permission requires TaskWorker capability
if m.Permissions != nil && m.Permissions.Taskqueue != nil {
if !hasCapability(capabilities, CapabilityTaskWorker) {
return fmt.Errorf("'taskqueue' permission requires plugin to export '%s' function", FuncTaskWorkerCallback)
}
}
return nil
}

View File

@@ -181,6 +181,9 @@ type Permissions struct {
// Subsonicapi corresponds to the JSON schema field "subsonicapi".
Subsonicapi *SubsonicAPIPermission `json:"subsonicapi,omitempty" yaml:"subsonicapi,omitempty" mapstructure:"subsonicapi,omitempty"`
// Taskqueue corresponds to the JSON schema field "taskqueue".
Taskqueue *TaskQueuePermission `json:"taskqueue,omitempty" yaml:"taskqueue,omitempty" mapstructure:"taskqueue,omitempty"`
// Users corresponds to the JSON schema field "users".
Users *UsersPermission `json:"users,omitempty" yaml:"users,omitempty" mapstructure:"users,omitempty"`
@@ -200,6 +203,36 @@ type SubsonicAPIPermission struct {
Reason *string `json:"reason,omitempty" yaml:"reason,omitempty" mapstructure:"reason,omitempty"`
}
// Task queue permissions for background task processing
type TaskQueuePermission struct {
// Maximum total concurrent workers across all queues. Default: 1
MaxConcurrency int `json:"maxConcurrency,omitempty" yaml:"maxConcurrency,omitempty" mapstructure:"maxConcurrency,omitempty"`
// Explanation for why task queue access is needed
Reason *string `json:"reason,omitempty" yaml:"reason,omitempty" mapstructure:"reason,omitempty"`
}
// UnmarshalJSON implements json.Unmarshaler.
func (j *TaskQueuePermission) UnmarshalJSON(value []byte) error {
var raw map[string]interface{}
if err := json.Unmarshal(value, &raw); err != nil {
return err
}
type Plain TaskQueuePermission
var plain Plain
if err := json.Unmarshal(value, &plain); err != nil {
return err
}
if v, ok := raw["maxConcurrency"]; !ok || v == nil {
plain.MaxConcurrency = 1.0
}
if 1 > plain.MaxConcurrency {
return fmt.Errorf("field %s: must be >= %v", "maxConcurrency", 1)
}
*j = TaskQueuePermission(plain)
return nil
}
// Enable experimental WebAssembly threads support
type ThreadsFeature struct {
// Explanation for why threads support is needed

View File

@@ -43,6 +43,7 @@ The following host services are available:
- Library: provides access to music library metadata for plugins.
- Scheduler: provides task scheduling capabilities for plugins.
- SubsonicAPI: provides access to Navidrome's Subsonic API from plugins.
- Task: provides persistent task queues for plugins.
- Users: provides access to user information for plugins.
- WebSocket: provides WebSocket communication capabilities for plugins.

View File

@@ -0,0 +1,277 @@
// Code generated by ndpgen. DO NOT EDIT.
//
// This file contains client wrappers for the Task host service.
// It is intended for use in Navidrome plugins built with TinyGo.
//
//go:build wasip1
package host
import (
"encoding/json"
"errors"
"github.com/navidrome/navidrome/plugins/pdk/go/pdk"
)
// QueueConfig represents the QueueConfig data structure.
// QueueConfig holds configuration for a task queue.
type QueueConfig struct {
Concurrency int32 `json:"concurrency"`
MaxRetries int32 `json:"maxRetries"`
BackoffMs int64 `json:"backoffMs"`
DelayMs int64 `json:"delayMs"`
RetentionMs int64 `json:"retentionMs"`
}
// TaskInfo represents the TaskInfo data structure.
// TaskInfo holds the current state of a task.
type TaskInfo struct {
Status string `json:"status"`
Message string `json:"message"`
Attempt int32 `json:"attempt"`
}
// task_createqueue is the host function provided by Navidrome.
//
//go:wasmimport extism:host/user task_createqueue
func task_createqueue(uint64) uint64
// task_enqueue is the host function provided by Navidrome.
//
//go:wasmimport extism:host/user task_enqueue
func task_enqueue(uint64) uint64
// task_get is the host function provided by Navidrome.
//
//go:wasmimport extism:host/user task_get
func task_get(uint64) uint64
// task_cancel is the host function provided by Navidrome.
//
//go:wasmimport extism:host/user task_cancel
func task_cancel(uint64) uint64
// task_clearqueue is the host function provided by Navidrome.
//
//go:wasmimport extism:host/user task_clearqueue
func task_clearqueue(uint64) uint64
type taskCreateQueueRequest struct {
Name string `json:"name"`
Config QueueConfig `json:"config"`
}
type taskEnqueueRequest struct {
QueueName string `json:"queueName"`
Payload []byte `json:"payload"`
}
type taskEnqueueResponse struct {
Result string `json:"result,omitempty"`
Error string `json:"error,omitempty"`
}
type taskGetRequest struct {
TaskID string `json:"taskId"`
}
type taskGetResponse struct {
Result *TaskInfo `json:"result,omitempty"`
Error string `json:"error,omitempty"`
}
type taskCancelRequest struct {
TaskID string `json:"taskId"`
}
type taskClearQueueRequest struct {
QueueName string `json:"queueName"`
}
type taskClearQueueResponse struct {
Result int64 `json:"result,omitempty"`
Error string `json:"error,omitempty"`
}
// TaskCreateQueue calls the task_createqueue host function.
// CreateQueue creates a named task queue with the given configuration.
// Zero-value fields in config use sensible defaults.
// If a queue with the same name already exists, returns an error.
// On startup, this also recovers any stale "running" tasks from a previous crash.
func TaskCreateQueue(name string, config QueueConfig) error {
// Marshal request to JSON
req := taskCreateQueueRequest{
Name: name,
Config: config,
}
reqBytes, err := json.Marshal(req)
if err != nil {
return err
}
reqMem := pdk.AllocateBytes(reqBytes)
defer reqMem.Free()
// Call the host function
responsePtr := task_createqueue(reqMem.Offset())
// Read the response from memory
responseMem := pdk.FindMemory(responsePtr)
responseBytes := responseMem.ReadBytes()
// Parse error-only response
var response struct {
Error string `json:"error,omitempty"`
}
if err := json.Unmarshal(responseBytes, &response); err != nil {
return err
}
if response.Error != "" {
return errors.New(response.Error)
}
return nil
}
// TaskEnqueue calls the task_enqueue host function.
// Enqueue adds a task to the named queue. Returns the task ID.
// payload is opaque bytes passed back to the plugin on execution.
func TaskEnqueue(queueName string, payload []byte) (string, error) {
// Marshal request to JSON
req := taskEnqueueRequest{
QueueName: queueName,
Payload: payload,
}
reqBytes, err := json.Marshal(req)
if err != nil {
return "", err
}
reqMem := pdk.AllocateBytes(reqBytes)
defer reqMem.Free()
// Call the host function
responsePtr := task_enqueue(reqMem.Offset())
// Read the response from memory
responseMem := pdk.FindMemory(responsePtr)
responseBytes := responseMem.ReadBytes()
// Parse the response
var response taskEnqueueResponse
if err := json.Unmarshal(responseBytes, &response); err != nil {
return "", err
}
// Convert Error field to Go error
if response.Error != "" {
return "", errors.New(response.Error)
}
return response.Result, nil
}
// TaskGet calls the task_get host function.
// Get returns the current state of a task including its status,
// message, and attempt count.
func TaskGet(taskID string) (*TaskInfo, error) {
// Marshal request to JSON
req := taskGetRequest{
TaskID: taskID,
}
reqBytes, err := json.Marshal(req)
if err != nil {
return nil, err
}
reqMem := pdk.AllocateBytes(reqBytes)
defer reqMem.Free()
// Call the host function
responsePtr := task_get(reqMem.Offset())
// Read the response from memory
responseMem := pdk.FindMemory(responsePtr)
responseBytes := responseMem.ReadBytes()
// Parse the response
var response taskGetResponse
if err := json.Unmarshal(responseBytes, &response); err != nil {
return nil, err
}
// Convert Error field to Go error
if response.Error != "" {
return nil, errors.New(response.Error)
}
return response.Result, nil
}
// TaskCancel calls the task_cancel host function.
// Cancel cancels a pending task. Returns error if already
// running, completed, or failed.
func TaskCancel(taskID string) error {
// Marshal request to JSON
req := taskCancelRequest{
TaskID: taskID,
}
reqBytes, err := json.Marshal(req)
if err != nil {
return err
}
reqMem := pdk.AllocateBytes(reqBytes)
defer reqMem.Free()
// Call the host function
responsePtr := task_cancel(reqMem.Offset())
// Read the response from memory
responseMem := pdk.FindMemory(responsePtr)
responseBytes := responseMem.ReadBytes()
// Parse error-only response
var response struct {
Error string `json:"error,omitempty"`
}
if err := json.Unmarshal(responseBytes, &response); err != nil {
return err
}
if response.Error != "" {
return errors.New(response.Error)
}
return nil
}
// TaskClearQueue calls the task_clearqueue host function.
// ClearQueue removes all pending tasks from the named queue.
// Running tasks are not affected. Returns the number of tasks removed.
func TaskClearQueue(queueName string) (int64, error) {
// Marshal request to JSON
req := taskClearQueueRequest{
QueueName: queueName,
}
reqBytes, err := json.Marshal(req)
if err != nil {
return 0, err
}
reqMem := pdk.AllocateBytes(reqBytes)
defer reqMem.Free()
// Call the host function
responsePtr := task_clearqueue(reqMem.Offset())
// Read the response from memory
responseMem := pdk.FindMemory(responsePtr)
responseBytes := responseMem.ReadBytes()
// Parse the response
var response taskClearQueueResponse
if err := json.Unmarshal(responseBytes, &response); err != nil {
return 0, err
}
// Convert Error field to Go error
if response.Error != "" {
return 0, errors.New(response.Error)
}
return response.Result, nil
}

View File

@@ -0,0 +1,92 @@
// Code generated by ndpgen. DO NOT EDIT.
//
// This file contains mock implementations for non-WASM builds.
// These mocks allow IDE support, compilation, and unit testing on non-WASM platforms.
// Plugin authors can use the exported mock instances to set expectations in tests.
//
//go:build !wasip1
package host
import "github.com/stretchr/testify/mock"
// QueueConfig represents the QueueConfig data structure.
// QueueConfig holds configuration for a task queue.
type QueueConfig struct {
Concurrency int32 `json:"concurrency"`
MaxRetries int32 `json:"maxRetries"`
BackoffMs int64 `json:"backoffMs"`
DelayMs int64 `json:"delayMs"`
RetentionMs int64 `json:"retentionMs"`
}
// TaskInfo represents the TaskInfo data structure.
// TaskInfo holds the current state of a task.
type TaskInfo struct {
Status string `json:"status"`
Message string `json:"message"`
Attempt int32 `json:"attempt"`
}
// mockTaskService is the mock implementation for testing.
type mockTaskService struct {
mock.Mock
}
// TaskMock is the auto-instantiated mock instance for testing.
// Use this to set expectations: host.TaskMock.On("MethodName", args...).Return(values...)
var TaskMock = &mockTaskService{}
// CreateQueue is the mock method for TaskCreateQueue.
func (m *mockTaskService) CreateQueue(name string, config QueueConfig) error {
args := m.Called(name, config)
return args.Error(0)
}
// TaskCreateQueue delegates to the mock instance.
// CreateQueue creates a named task queue with the given configuration.
// Zero-value fields in config use sensible defaults.
// If a queue with the same name already exists, returns an error.
// On startup, this also recovers any stale "running" tasks from a previous crash.
func TaskCreateQueue(name string, config QueueConfig) error {
return TaskMock.CreateQueue(name, config)
}
// Enqueue is the mock method for TaskEnqueue.
func (m *mockTaskService) Enqueue(queueName string, payload []byte) (string, error) {
args := m.Called(queueName, payload)
return args.String(0), args.Error(1)
}
// TaskEnqueue delegates to the mock instance.
// Enqueue adds a task to the named queue. Returns the task ID.
// payload is opaque bytes passed back to the plugin on execution.
func TaskEnqueue(queueName string, payload []byte) (string, error) {
return TaskMock.Enqueue(queueName, payload)
}
// Get is the mock method for TaskGet.
func (m *mockTaskService) Get(taskID string) (*TaskInfo, error) {
args := m.Called(taskID)
return args.Get(0).(*TaskInfo), args.Error(1)
}
// TaskGet delegates to the mock instance.
// Get returns the current state of a task including its status,
// message, and attempt count.
func TaskGet(taskID string) (*TaskInfo, error) {
return TaskMock.Get(taskID)
}
// Cancel is the mock method for TaskCancel.
func (m *mockTaskService) Cancel(taskID string) error {
args := m.Called(taskID)
return args.Error(0)
}
// TaskCancel delegates to the mock instance.
// Cancel cancels a pending task. Returns error if already
// running, completed, or failed.
func TaskCancel(taskID string) error {
return TaskMock.Cancel(taskID)
}

View File

@@ -1,118 +0,0 @@
// Code generated by ndpgen. DO NOT EDIT.
//
// This file contains export wrappers for the Lyrics capability.
// It is intended for use in Navidrome plugins built with TinyGo.
//
//go:build wasip1
package lyrics
import (
"github.com/navidrome/navidrome/plugins/pdk/go/pdk"
)
// ArtistRef is a reference to an artist with name and optional MBID.
type ArtistRef struct {
// ID is the internal Navidrome artist ID (if known).
ID string `json:"id,omitempty"`
// Name is the artist name.
Name string `json:"name"`
// MBID is the MusicBrainz ID for the artist.
MBID string `json:"mbid,omitempty"`
}
// GetLyricsRequest contains the track information for lyrics lookup.
type GetLyricsRequest struct {
Track TrackInfo `json:"track"`
}
// GetLyricsResponse contains the lyrics returned by the plugin.
type GetLyricsResponse struct {
Lyrics []LyricsText `json:"lyrics"`
}
// LyricsText represents a single set of lyrics in raw text format.
// Text can be plain text or LRC format — Navidrome will parse it.
type LyricsText struct {
Lang string `json:"lang,omitempty"`
Text string `json:"text"`
}
// TrackInfo contains track metadata for scrobbling.
type TrackInfo struct {
// ID is the internal Navidrome track ID.
ID string `json:"id"`
// Title is the track title.
Title string `json:"title"`
// Album is the album name.
Album string `json:"album"`
// Artist is the formatted artist name for display (e.g., "Artist1 • Artist2").
Artist string `json:"artist"`
// AlbumArtist is the formatted album artist name for display.
AlbumArtist string `json:"albumArtist"`
// Artists is the list of track artists.
Artists []ArtistRef `json:"artists"`
// AlbumArtists is the list of album artists.
AlbumArtists []ArtistRef `json:"albumArtists"`
// Duration is the track duration in seconds.
Duration float32 `json:"duration"`
// TrackNumber is the track number on the album.
TrackNumber int32 `json:"trackNumber"`
// DiscNumber is the disc number.
DiscNumber int32 `json:"discNumber"`
// MBZRecordingID is the MusicBrainz recording ID.
MBZRecordingID string `json:"mbzRecordingId,omitempty"`
// MBZAlbumID is the MusicBrainz album/release ID.
MBZAlbumID string `json:"mbzAlbumId,omitempty"`
// MBZReleaseGroupID is the MusicBrainz release group ID.
MBZReleaseGroupID string `json:"mbzReleaseGroupId,omitempty"`
// MBZReleaseTrackID is the MusicBrainz release track ID.
MBZReleaseTrackID string `json:"mbzReleaseTrackId,omitempty"`
}
// Lyrics requires all methods to be implemented.
// Lyrics provides lyrics for a given track from external sources.
type Lyrics interface {
// GetLyrics
GetLyrics(GetLyricsRequest) (GetLyricsResponse, error)
} // Internal implementation holders
var (
lyricsImpl func(GetLyricsRequest) (GetLyricsResponse, error)
)
// Register registers a lyrics implementation.
// All methods are required.
func Register(impl Lyrics) {
lyricsImpl = impl.GetLyrics
}
// NotImplementedCode is the standard return code for unimplemented functions.
// The host recognizes this and skips the plugin gracefully.
const NotImplementedCode int32 = -2
//go:wasmexport nd_lyrics_get_lyrics
func _NdLyricsGetLyrics() int32 {
if lyricsImpl == nil {
// Return standard code - host will skip this plugin gracefully
return NotImplementedCode
}
var input GetLyricsRequest
if err := pdk.InputJSON(&input); err != nil {
pdk.SetError(err)
return -1
}
output, err := lyricsImpl(input)
if err != nil {
pdk.SetError(err)
return -1
}
if err := pdk.OutputJSON(output); err != nil {
pdk.SetError(err)
return -1
}
return 0
}

View File

@@ -1,82 +0,0 @@
// Code generated by ndpgen. DO NOT EDIT.
//
// This file provides stub implementations for non-WASM platforms.
// It allows Go plugins to compile and run tests outside of WASM,
// but the actual functionality is only available in WASM builds.
//
//go:build !wasip1
package lyrics
// ArtistRef is a reference to an artist with name and optional MBID.
type ArtistRef struct {
// ID is the internal Navidrome artist ID (if known).
ID string `json:"id,omitempty"`
// Name is the artist name.
Name string `json:"name"`
// MBID is the MusicBrainz ID for the artist.
MBID string `json:"mbid,omitempty"`
}
// GetLyricsRequest contains the track information for lyrics lookup.
type GetLyricsRequest struct {
Track TrackInfo `json:"track"`
}
// GetLyricsResponse contains the lyrics returned by the plugin.
type GetLyricsResponse struct {
Lyrics []LyricsText `json:"lyrics"`
}
// LyricsText represents a single set of lyrics in raw text format.
// Text can be plain text or LRC format — Navidrome will parse it.
type LyricsText struct {
Lang string `json:"lang,omitempty"`
Text string `json:"text"`
}
// TrackInfo contains track metadata for scrobbling.
type TrackInfo struct {
// ID is the internal Navidrome track ID.
ID string `json:"id"`
// Title is the track title.
Title string `json:"title"`
// Album is the album name.
Album string `json:"album"`
// Artist is the formatted artist name for display (e.g., "Artist1 • Artist2").
Artist string `json:"artist"`
// AlbumArtist is the formatted album artist name for display.
AlbumArtist string `json:"albumArtist"`
// Artists is the list of track artists.
Artists []ArtistRef `json:"artists"`
// AlbumArtists is the list of album artists.
AlbumArtists []ArtistRef `json:"albumArtists"`
// Duration is the track duration in seconds.
Duration float32 `json:"duration"`
// TrackNumber is the track number on the album.
TrackNumber int32 `json:"trackNumber"`
// DiscNumber is the disc number.
DiscNumber int32 `json:"discNumber"`
// MBZRecordingID is the MusicBrainz recording ID.
MBZRecordingID string `json:"mbzRecordingId,omitempty"`
// MBZAlbumID is the MusicBrainz album/release ID.
MBZAlbumID string `json:"mbzAlbumId,omitempty"`
// MBZReleaseGroupID is the MusicBrainz release group ID.
MBZReleaseGroupID string `json:"mbzReleaseGroupId,omitempty"`
// MBZReleaseTrackID is the MusicBrainz release track ID.
MBZReleaseTrackID string `json:"mbzReleaseTrackId,omitempty"`
}
// Lyrics requires all methods to be implemented.
// Lyrics provides lyrics for a given track from external sources.
type Lyrics interface {
// GetLyrics
GetLyrics(GetLyricsRequest) (GetLyricsResponse, error)
}
// NotImplementedCode is the standard return code for unimplemented functions.
const NotImplementedCode int32 = -2
// Register is a no-op on non-WASM platforms.
// This stub allows code to compile outside of WASM.
func Register(_ Lyrics) {}

View File

@@ -0,0 +1,79 @@
// Code generated by ndpgen. DO NOT EDIT.
//
// This file contains export wrappers for the TaskWorker capability.
// It is intended for use in Navidrome plugins built with TinyGo.
//
//go:build wasip1
package taskworker
import (
"github.com/navidrome/navidrome/plugins/pdk/go/pdk"
)
// TaskExecuteRequest is the request provided when a task is ready to execute.
type TaskExecuteRequest struct {
// QueueName is the name of the queue this task belongs to.
QueueName string `json:"queueName"`
// TaskID is the unique identifier for this task.
TaskID string `json:"taskId"`
// Payload is the opaque data provided when the task was enqueued.
Payload []byte `json:"payload"`
// Attempt is the current attempt number (1-based: first attempt = 1).
Attempt int32 `json:"attempt"`
}
// TaskWorker is the marker interface for taskworker plugins.
// Implement one or more of the provider interfaces below.
// TaskWorker provides task execution handling.
// This capability allows plugins to receive callbacks when their queued tasks
// are ready to execute. Plugins that use the taskqueue host service must
// implement this capability.
type TaskWorker interface{}
// TaskExecuteProvider provides the OnTaskExecute function.
type TaskExecuteProvider interface {
OnTaskExecute(TaskExecuteRequest) (string, error)
} // Internal implementation holders
var (
taskExecuteImpl func(TaskExecuteRequest) (string, error)
)
// Register registers a taskworker implementation.
// The implementation is checked for optional provider interfaces.
func Register(impl TaskWorker) {
if p, ok := impl.(TaskExecuteProvider); ok {
taskExecuteImpl = p.OnTaskExecute
}
}
// NotImplementedCode is the standard return code for unimplemented functions.
// The host recognizes this and skips the plugin gracefully.
const NotImplementedCode int32 = -2
//go:wasmexport nd_task_execute
func _NdTaskExecute() int32 {
if taskExecuteImpl == nil {
// Return standard code - host will skip this plugin gracefully
return NotImplementedCode
}
var input TaskExecuteRequest
if err := pdk.InputJSON(&input); err != nil {
pdk.SetError(err)
return -1
}
output, err := taskExecuteImpl(input)
if err != nil {
pdk.SetError(err)
return -1
}
if err := pdk.OutputJSON(output); err != nil {
pdk.SetError(err)
return -1
}
return 0
}

View File

@@ -0,0 +1,41 @@
// Code generated by ndpgen. DO NOT EDIT.
//
// This file provides stub implementations for non-WASM platforms.
// It allows Go plugins to compile and run tests outside of WASM,
// but the actual functionality is only available in WASM builds.
//
//go:build !wasip1
package taskworker
// TaskExecuteRequest is the request provided when a task is ready to execute.
type TaskExecuteRequest struct {
// QueueName is the name of the queue this task belongs to.
QueueName string `json:"queueName"`
// TaskID is the unique identifier for this task.
TaskID string `json:"taskId"`
// Payload is the opaque data provided when the task was enqueued.
Payload []byte `json:"payload"`
// Attempt is the current attempt number (1-based: first attempt = 1).
Attempt int32 `json:"attempt"`
}
// TaskWorker is the marker interface for taskworker plugins.
// Implement one or more of the provider interfaces below.
// TaskWorker provides task execution handling.
// This capability allows plugins to receive callbacks when their queued tasks
// are ready to execute. Plugins that use the taskqueue host service must
// implement this capability.
type TaskWorker interface{}
// TaskExecuteProvider provides the OnTaskExecute function.
type TaskExecuteProvider interface {
OnTaskExecute(TaskExecuteRequest) (string, error)
}
// NotImplementedCode is the standard return code for unimplemented functions.
const NotImplementedCode int32 = -2
// Register is a no-op on non-WASM platforms.
// This stub allows code to compile outside of WASM.
func Register(_ TaskWorker) {}

View File

@@ -0,0 +1,187 @@
# Code generated by ndpgen. DO NOT EDIT.
#
# This file contains client wrappers for the Task host service.
# It is intended for use in Navidrome plugins built with extism-py.
#
# IMPORTANT: Due to a limitation in extism-py, you cannot import this file directly.
# The @extism.import_fn decorators are only detected when defined in the plugin's
# main __init__.py file. Copy the needed functions from this file into your plugin.
from dataclasses import dataclass
from typing import Any
import extism
import json
import base64
class HostFunctionError(Exception):
"""Raised when a host function returns an error."""
pass
@extism.import_fn("extism:host/user", "task_createqueue")
def _task_createqueue(offset: int) -> int:
"""Raw host function - do not call directly."""
...
@extism.import_fn("extism:host/user", "task_enqueue")
def _task_enqueue(offset: int) -> int:
"""Raw host function - do not call directly."""
...
@extism.import_fn("extism:host/user", "task_get")
def _task_get(offset: int) -> int:
"""Raw host function - do not call directly."""
...
@extism.import_fn("extism:host/user", "task_cancel")
def _task_cancel(offset: int) -> int:
"""Raw host function - do not call directly."""
...
@extism.import_fn("extism:host/user", "task_clearqueue")
def _task_clearqueue(offset: int) -> int:
"""Raw host function - do not call directly."""
...
def task_create_queue(name: str, config: Any) -> None:
"""CreateQueue creates a named task queue with the given configuration.
Zero-value fields in config use sensible defaults.
If a queue with the same name already exists, returns an error.
On startup, this also recovers any stale "running" tasks from a previous crash.
Args:
name: str parameter.
config: Any parameter.
Raises:
HostFunctionError: If the host function returns an error.
"""
request = {
"name": name,
"config": config,
}
request_bytes = json.dumps(request).encode("utf-8")
request_mem = extism.memory.alloc(request_bytes)
response_offset = _task_createqueue(request_mem.offset)
response_mem = extism.memory.find(response_offset)
response = json.loads(extism.memory.string(response_mem))
if response.get("error"):
raise HostFunctionError(response["error"])
def task_enqueue(queue_name: str, payload: bytes) -> str:
"""Enqueue adds a task to the named queue. Returns the task ID.
payload is opaque bytes passed back to the plugin on execution.
Args:
queue_name: str parameter.
payload: bytes parameter.
Returns:
str: The result value.
Raises:
HostFunctionError: If the host function returns an error.
"""
request = {
"queueName": queue_name,
"payload": base64.b64encode(payload).decode("ascii"),
}
request_bytes = json.dumps(request).encode("utf-8")
request_mem = extism.memory.alloc(request_bytes)
response_offset = _task_enqueue(request_mem.offset)
response_mem = extism.memory.find(response_offset)
response = json.loads(extism.memory.string(response_mem))
if response.get("error"):
raise HostFunctionError(response["error"])
return response.get("result", "")
def task_get(task_id: str) -> Any:
"""Get returns the current state of a task including its status,
message, and attempt count.
Args:
task_id: str parameter.
Returns:
Any: The result value.
Raises:
HostFunctionError: If the host function returns an error.
"""
request = {
"taskId": task_id,
}
request_bytes = json.dumps(request).encode("utf-8")
request_mem = extism.memory.alloc(request_bytes)
response_offset = _task_get(request_mem.offset)
response_mem = extism.memory.find(response_offset)
response = json.loads(extism.memory.string(response_mem))
if response.get("error"):
raise HostFunctionError(response["error"])
return response.get("result", None)
def task_cancel(task_id: str) -> None:
"""Cancel cancels a pending task. Returns error if already
running, completed, or failed.
Args:
task_id: str parameter.
Raises:
HostFunctionError: If the host function returns an error.
"""
request = {
"taskId": task_id,
}
request_bytes = json.dumps(request).encode("utf-8")
request_mem = extism.memory.alloc(request_bytes)
response_offset = _task_cancel(request_mem.offset)
response_mem = extism.memory.find(response_offset)
response = json.loads(extism.memory.string(response_mem))
if response.get("error"):
raise HostFunctionError(response["error"])
def task_clear_queue(queue_name: str) -> int:
"""ClearQueue removes all pending tasks from the named queue.
Running tasks are not affected. Returns the number of tasks removed.
Args:
queue_name: str parameter.
Returns:
int: The number of tasks removed.
Raises:
HostFunctionError: If the host function returns an error.
"""
request = {
"queueName": queue_name,
}
request_bytes = json.dumps(request).encode("utf-8")
request_mem = extism.memory.alloc(request_bytes)
response_offset = _task_clearqueue(request_mem.offset)
response_mem = extism.memory.find(response_offset)
response = json.loads(extism.memory.string(response_mem))
if response.get("error"):
raise HostFunctionError(response["error"])
return response.get("result", 0)

View File

@@ -6,8 +6,8 @@
//! for implementing Navidrome plugin capabilities in Rust.
pub mod lifecycle;
pub mod lyrics;
pub mod metadata;
pub mod scheduler;
pub mod scrobbler;
pub mod taskworker;
pub mod websocket;

View File

@@ -1,148 +0,0 @@
// Code generated by ndpgen. DO NOT EDIT.
//
// This file contains export wrappers for the Lyrics capability.
// It is intended for use in Navidrome plugins built with extism-pdk.
use serde::{Deserialize, Serialize};
// Helper functions for skip_serializing_if with numeric types
#[allow(dead_code)]
fn is_zero_i32(value: &i32) -> bool { *value == 0 }
#[allow(dead_code)]
fn is_zero_u32(value: &u32) -> bool { *value == 0 }
#[allow(dead_code)]
fn is_zero_i64(value: &i64) -> bool { *value == 0 }
#[allow(dead_code)]
fn is_zero_u64(value: &u64) -> bool { *value == 0 }
#[allow(dead_code)]
fn is_zero_f32(value: &f32) -> bool { *value == 0.0 }
#[allow(dead_code)]
fn is_zero_f64(value: &f64) -> bool { *value == 0.0 }
/// ArtistRef is a reference to an artist with name and optional MBID.
#[derive(Debug, Clone, Default, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct ArtistRef {
/// ID is the internal Navidrome artist ID (if known).
#[serde(default, skip_serializing_if = "String::is_empty")]
pub id: String,
/// Name is the artist name.
#[serde(default)]
pub name: String,
/// MBID is the MusicBrainz ID for the artist.
#[serde(default, skip_serializing_if = "String::is_empty")]
pub mbid: String,
}
/// GetLyricsRequest contains the track information for lyrics lookup.
#[derive(Debug, Clone, Default, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct GetLyricsRequest {
#[serde(default)]
pub track: TrackInfo,
}
/// GetLyricsResponse contains the lyrics returned by the plugin.
#[derive(Debug, Clone, Default, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct GetLyricsResponse {
#[serde(default)]
pub lyrics: Vec<LyricsText>,
}
/// LyricsText represents a single set of lyrics in raw text format.
/// Text can be plain text or LRC format — Navidrome will parse it.
#[derive(Debug, Clone, Default, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct LyricsText {
#[serde(default, skip_serializing_if = "String::is_empty")]
pub lang: String,
#[serde(default)]
pub text: String,
}
/// TrackInfo contains track metadata for scrobbling.
#[derive(Debug, Clone, Default, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct TrackInfo {
/// ID is the internal Navidrome track ID.
#[serde(default)]
pub id: String,
/// Title is the track title.
#[serde(default)]
pub title: String,
/// Album is the album name.
#[serde(default)]
pub album: String,
/// Artist is the formatted artist name for display (e.g., "Artist1 • Artist2").
#[serde(default)]
pub artist: String,
/// AlbumArtist is the formatted album artist name for display.
#[serde(default)]
pub album_artist: String,
/// Artists is the list of track artists.
#[serde(default)]
pub artists: Vec<ArtistRef>,
/// AlbumArtists is the list of album artists.
#[serde(default)]
pub album_artists: Vec<ArtistRef>,
/// Duration is the track duration in seconds.
#[serde(default)]
pub duration: f32,
/// TrackNumber is the track number on the album.
#[serde(default)]
pub track_number: i32,
/// DiscNumber is the disc number.
#[serde(default)]
pub disc_number: i32,
/// MBZRecordingID is the MusicBrainz recording ID.
#[serde(default, skip_serializing_if = "String::is_empty")]
pub mbz_recording_id: String,
/// MBZAlbumID is the MusicBrainz album/release ID.
#[serde(default, skip_serializing_if = "String::is_empty")]
pub mbz_album_id: String,
/// MBZReleaseGroupID is the MusicBrainz release group ID.
#[serde(default, skip_serializing_if = "String::is_empty")]
pub mbz_release_group_id: String,
/// MBZReleaseTrackID is the MusicBrainz release track ID.
#[serde(default, skip_serializing_if = "String::is_empty")]
pub mbz_release_track_id: String,
}
/// Error represents an error from a capability method.
#[derive(Debug)]
pub struct Error {
pub message: String,
}
impl std::fmt::Display for Error {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{}", self.message)
}
}
impl std::error::Error for Error {}
impl Error {
pub fn new(message: impl Into<String>) -> Self {
Self { message: message.into() }
}
}
/// Lyrics requires all methods to be implemented.
/// Lyrics provides lyrics for a given track from external sources.
pub trait Lyrics {
/// GetLyrics
fn get_lyrics(&self, req: GetLyricsRequest) -> Result<GetLyricsResponse, Error>;
}
/// Register all exports for the Lyrics capability.
/// This macro generates the WASM export functions for all trait methods.
#[macro_export]
macro_rules! register_lyrics {
($plugin_type:ty) => {
#[extism_pdk::plugin_fn]
pub fn nd_lyrics_get_lyrics(
req: extism_pdk::Json<$crate::lyrics::GetLyricsRequest>
) -> extism_pdk::FnResult<extism_pdk::Json<$crate::lyrics::GetLyricsResponse>> {
let plugin = <$plugin_type>::default();
let result = $crate::lyrics::Lyrics::get_lyrics(&plugin, req.into_inner())?;
Ok(extism_pdk::Json(result))
}
};
}

View File

@@ -0,0 +1,102 @@
// Code generated by ndpgen. DO NOT EDIT.
//
// This file contains export wrappers for the TaskWorker capability.
// It is intended for use in Navidrome plugins built with extism-pdk.
use serde::{Deserialize, Serialize};
use base64::Engine as _;
use base64::engine::general_purpose::STANDARD as BASE64;
mod base64_bytes {
use serde::{self, Deserialize, Deserializer, Serializer};
use base64::Engine as _;
use base64::engine::general_purpose::STANDARD as BASE64;
pub fn serialize<S>(bytes: &Vec<u8>, serializer: S) -> Result<S::Ok, S::Error>
where
S: Serializer,
{
serializer.serialize_str(&BASE64.encode(bytes))
}
pub fn deserialize<'de, D>(deserializer: D) -> Result<Vec<u8>, D::Error>
where
D: Deserializer<'de>,
{
let s = String::deserialize(deserializer)?;
BASE64.decode(&s).map_err(serde::de::Error::custom)
}
}
// Helper functions for skip_serializing_if with numeric types
#[allow(dead_code)]
fn is_zero_i32(value: &i32) -> bool { *value == 0 }
#[allow(dead_code)]
fn is_zero_u32(value: &u32) -> bool { *value == 0 }
#[allow(dead_code)]
fn is_zero_i64(value: &i64) -> bool { *value == 0 }
#[allow(dead_code)]
fn is_zero_u64(value: &u64) -> bool { *value == 0 }
#[allow(dead_code)]
fn is_zero_f32(value: &f32) -> bool { *value == 0.0 }
#[allow(dead_code)]
fn is_zero_f64(value: &f64) -> bool { *value == 0.0 }
/// TaskExecuteRequest is the request provided when a task is ready to execute.
#[derive(Debug, Clone, Default, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct TaskExecuteRequest {
/// QueueName is the name of the queue this task belongs to.
#[serde(default)]
pub queue_name: String,
/// TaskID is the unique identifier for this task.
#[serde(default)]
pub task_id: String,
/// Payload is the opaque data provided when the task was enqueued.
#[serde(default)]
#[serde(with = "base64_bytes")]
pub payload: Vec<u8>,
/// Attempt is the current attempt number (1-based: first attempt = 1).
#[serde(default)]
pub attempt: i32,
}
/// Error represents an error from a capability method.
#[derive(Debug)]
pub struct Error {
pub message: String,
}
impl std::fmt::Display for Error {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{}", self.message)
}
}
impl std::error::Error for Error {}
impl Error {
pub fn new(message: impl Into<String>) -> Self {
Self { message: message.into() }
}
}
/// TaskExecuteProvider provides the OnTaskExecute function.
pub trait TaskExecuteProvider {
fn on_task_execute(&self, req: TaskExecuteRequest) -> Result<String, Error>;
}
/// Register the on_task_execute export.
/// This macro generates the WASM export function for this method.
#[macro_export]
macro_rules! register_taskworker_task_execute {
($plugin_type:ty) => {
#[extism_pdk::plugin_fn]
pub fn nd_task_execute(
req: extism_pdk::Json<$crate::taskworker::TaskExecuteRequest>
) -> extism_pdk::FnResult<extism_pdk::Json<String>> {
let plugin = <$plugin_type>::default();
let result = $crate::taskworker::TaskExecuteProvider::on_task_execute(&plugin, req.into_inner())?;
Ok(extism_pdk::Json(result))
}
};
}

View File

@@ -40,6 +40,7 @@
//! - [`library`] - provides access to music library metadata for plugins.
//! - [`scheduler`] - provides task scheduling capabilities for plugins.
//! - [`subsonicapi`] - provides access to Navidrome's Subsonic API from plugins.
//! - [`task`] - provides persistent task queues for plugins.
//! - [`users`] - provides access to user information for plugins.
//! - [`websocket`] - provides WebSocket communication capabilities for plugins.
@@ -99,6 +100,13 @@ pub mod subsonicapi {
pub use super::nd_host_subsonicapi::*;
}
#[doc(hidden)]
mod nd_host_task;
/// provides persistent task queues for plugins.
pub mod task {
pub use super::nd_host_task::*;
}
#[doc(hidden)]
mod nd_host_users;
/// provides access to user information for plugins.

View File

@@ -0,0 +1,258 @@
// Code generated by ndpgen. DO NOT EDIT.
//
// This file contains client wrappers for the Task host service.
// It is intended for use in Navidrome plugins built with extism-pdk.
use extism_pdk::*;
use serde::{Deserialize, Serialize};
use base64::Engine as _;
use base64::engine::general_purpose::STANDARD as BASE64;
mod base64_bytes {
use serde::{self, Deserialize, Deserializer, Serializer};
use base64::Engine as _;
use base64::engine::general_purpose::STANDARD as BASE64;
pub fn serialize<S>(bytes: &Vec<u8>, serializer: S) -> Result<S::Ok, S::Error>
where
S: Serializer,
{
serializer.serialize_str(&BASE64.encode(bytes))
}
pub fn deserialize<'de, D>(deserializer: D) -> Result<Vec<u8>, D::Error>
where
D: Deserializer<'de>,
{
let s = String::deserialize(deserializer)?;
BASE64.decode(&s).map_err(serde::de::Error::custom)
}
}
/// QueueConfig holds configuration for a task queue.
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct QueueConfig {
pub concurrency: i32,
pub max_retries: i32,
pub backoff_ms: i64,
pub delay_ms: i64,
pub retention_ms: i64,
}
/// TaskInfo holds the current state of a task.
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct TaskInfo {
pub status: String,
pub message: String,
pub attempt: i32,
}
#[derive(Debug, Clone, Serialize)]
#[serde(rename_all = "camelCase")]
struct TaskCreateQueueRequest {
name: String,
config: QueueConfig,
}
#[derive(Debug, Clone, Deserialize)]
#[serde(rename_all = "camelCase")]
struct TaskCreateQueueResponse {
#[serde(default)]
error: Option<String>,
}
#[derive(Debug, Clone, Serialize)]
#[serde(rename_all = "camelCase")]
struct TaskEnqueueRequest {
queue_name: String,
#[serde(with = "base64_bytes")]
payload: Vec<u8>,
}
#[derive(Debug, Clone, Deserialize)]
#[serde(rename_all = "camelCase")]
struct TaskEnqueueResponse {
#[serde(default)]
result: String,
#[serde(default)]
error: Option<String>,
}
#[derive(Debug, Clone, Serialize)]
#[serde(rename_all = "camelCase")]
struct TaskGetRequest {
task_id: String,
}
#[derive(Debug, Clone, Deserialize)]
#[serde(rename_all = "camelCase")]
struct TaskGetResponse {
#[serde(default)]
result: Option<TaskInfo>,
#[serde(default)]
error: Option<String>,
}
#[derive(Debug, Clone, Serialize)]
#[serde(rename_all = "camelCase")]
struct TaskCancelRequest {
task_id: String,
}
#[derive(Debug, Clone, Deserialize)]
#[serde(rename_all = "camelCase")]
struct TaskCancelResponse {
#[serde(default)]
error: Option<String>,
}
#[derive(Debug, Clone, Serialize)]
#[serde(rename_all = "camelCase")]
struct TaskClearQueueRequest {
queue_name: String,
}
#[derive(Debug, Clone, Deserialize)]
#[serde(rename_all = "camelCase")]
struct TaskClearQueueResponse {
#[serde(default)]
result: i64,
#[serde(default)]
error: Option<String>,
}
#[host_fn]
extern "ExtismHost" {
fn task_createqueue(input: Json<TaskCreateQueueRequest>) -> Json<TaskCreateQueueResponse>;
fn task_enqueue(input: Json<TaskEnqueueRequest>) -> Json<TaskEnqueueResponse>;
fn task_get(input: Json<TaskGetRequest>) -> Json<TaskGetResponse>;
fn task_cancel(input: Json<TaskCancelRequest>) -> Json<TaskCancelResponse>;
fn task_clearqueue(input: Json<TaskClearQueueRequest>) -> Json<TaskClearQueueResponse>;
}
/// CreateQueue creates a named task queue with the given configuration.
/// Zero-value fields in config use sensible defaults.
/// If a queue with the same name already exists, returns an error.
/// On startup, this also recovers any stale "running" tasks from a previous crash.
///
/// # Arguments
/// * `name` - String parameter.
/// * `config` - QueueConfig parameter.
///
/// # Errors
/// Returns an error if the host function call fails.
pub fn create_queue(name: &str, config: QueueConfig) -> Result<(), Error> {
let response = unsafe {
task_createqueue(Json(TaskCreateQueueRequest {
name: name.to_owned(),
config: config,
}))?
};
if let Some(err) = response.0.error {
return Err(Error::msg(err));
}
Ok(())
}
/// Enqueue adds a task to the named queue. Returns the task ID.
/// payload is opaque bytes passed back to the plugin on execution.
///
/// # Arguments
/// * `queue_name` - String parameter.
/// * `payload` - Vec<u8> parameter.
///
/// # Returns
/// The result value.
///
/// # Errors
/// Returns an error if the host function call fails.
pub fn enqueue(queue_name: &str, payload: Vec<u8>) -> Result<String, Error> {
let response = unsafe {
task_enqueue(Json(TaskEnqueueRequest {
queue_name: queue_name.to_owned(),
payload: payload,
}))?
};
if let Some(err) = response.0.error {
return Err(Error::msg(err));
}
Ok(response.0.result)
}
/// Get returns the current state of a task including its status,
/// message, and attempt count.
///
/// # Arguments
/// * `task_id` - String parameter.
///
/// # Returns
/// The result value.
///
/// # Errors
/// Returns an error if the host function call fails.
pub fn get(task_id: &str) -> Result<Option<TaskInfo>, Error> {
let response = unsafe {
task_get(Json(TaskGetRequest {
task_id: task_id.to_owned(),
}))?
};
if let Some(err) = response.0.error {
return Err(Error::msg(err));
}
Ok(response.0.result)
}
/// Cancel cancels a pending task. Returns error if already
/// running, completed, or failed.
///
/// # Arguments
/// * `task_id` - String parameter.
///
/// # Errors
/// Returns an error if the host function call fails.
pub fn cancel(task_id: &str) -> Result<(), Error> {
let response = unsafe {
task_cancel(Json(TaskCancelRequest {
task_id: task_id.to_owned(),
}))?
};
if let Some(err) = response.0.error {
return Err(Error::msg(err));
}
Ok(())
}
/// ClearQueue removes all pending tasks from the named queue.
/// Running tasks are not affected. Returns the number of tasks removed.
///
/// # Arguments
/// * `queue_name` - String parameter.
///
/// # Returns
/// The number of tasks removed.
///
/// # Errors
/// Returns an error if the host function call fails.
pub fn clear_queue(queue_name: &str) -> Result<i64, Error> {
let response = unsafe {
task_clearqueue(Json(TaskClearQueueRequest {
queue_name: queue_name.to_owned(),
}))?
};
if let Some(err) = response.0.error {
return Err(Error::msg(err));
}
Ok(response.0.result)
}

View File

@@ -1,35 +0,0 @@
// Test lyrics plugin for Navidrome plugin system integration tests.
package main
import (
"fmt"
"github.com/navidrome/navidrome/plugins/pdk/go/lyrics"
"github.com/navidrome/navidrome/plugins/pdk/go/pdk"
)
func init() {
lyrics.Register(&testLyrics{})
}
type testLyrics struct{}
func (t *testLyrics) GetLyrics(input lyrics.GetLyricsRequest) (lyrics.GetLyricsResponse, error) {
// Check for configured error
errMsg, hasErr := pdk.GetConfig("error")
if hasErr && errMsg != "" {
return lyrics.GetLyricsResponse{}, fmt.Errorf("%s", errMsg)
}
// Return test lyrics based on track info
return lyrics.GetLyricsResponse{
Lyrics: []lyrics.LyricsText{
{
Lang: "eng",
Text: "Test lyrics for " + input.Track.Title + "\nBy " + input.Track.Artist,
},
},
}, nil
}
func main() {}

View File

@@ -1,6 +0,0 @@
{
"name": "Test Lyrics",
"author": "Navidrome Test",
"version": "1.0.0",
"description": "A test lyrics plugin for integration testing"
}

View File

@@ -1,4 +1,4 @@
module test-lyrics
module test-taskqueue
go 1.25

114
plugins/testdata/test-taskqueue/main.go vendored Normal file
View File

@@ -0,0 +1,114 @@
// Test TaskQueue plugin for Navidrome plugin system integration tests.
// Build with: tinygo build -o ../test-taskqueue.wasm -target wasip1 -buildmode=c-shared .
package main
import (
"fmt"
"github.com/navidrome/navidrome/plugins/pdk/go/host"
"github.com/navidrome/navidrome/plugins/pdk/go/pdk"
"github.com/navidrome/navidrome/plugins/pdk/go/taskworker"
)
func init() {
taskworker.Register(&handler{})
}
type handler struct{}
func (h *handler) OnTaskExecute(req taskworker.TaskExecuteRequest) (string, error) {
payload := string(req.Payload)
if payload == "fail" {
return "", fmt.Errorf("task failed as instructed")
}
if payload == "fail-then-succeed" && req.Attempt < 2 {
return "", fmt.Errorf("transient failure")
}
return "completed successfully", nil
}
// Test helper types
type TestInput struct {
Operation string `json:"operation"`
QueueName string `json:"queueName,omitempty"`
Config *host.QueueConfig `json:"config,omitempty"`
Payload []byte `json:"payload,omitempty"`
TaskID string `json:"taskId,omitempty"`
}
type TestOutput struct {
TaskID string `json:"taskId,omitempty"`
Status string `json:"status,omitempty"`
Message string `json:"message,omitempty"`
Attempt int32 `json:"attempt,omitempty"`
Cleared int64 `json:"cleared,omitempty"`
Error *string `json:"error,omitempty"`
}
//go:wasmexport nd_test_taskqueue
func ndTestTaskQueue() int32 {
var input TestInput
if err := pdk.InputJSON(&input); err != nil {
errStr := err.Error()
pdk.OutputJSON(TestOutput{Error: &errStr})
return 0
}
switch input.Operation {
case "create_queue":
config := host.QueueConfig{}
if input.Config != nil {
config = *input.Config
}
err := host.TaskCreateQueue(input.QueueName, config)
if err != nil {
errStr := err.Error()
pdk.OutputJSON(TestOutput{Error: &errStr})
return 0
}
pdk.OutputJSON(TestOutput{})
case "enqueue":
taskID, err := host.TaskEnqueue(input.QueueName, input.Payload)
if err != nil {
errStr := err.Error()
pdk.OutputJSON(TestOutput{Error: &errStr})
return 0
}
pdk.OutputJSON(TestOutput{TaskID: taskID})
case "get_task_status":
info, err := host.TaskGet(input.TaskID)
if err != nil {
errStr := err.Error()
pdk.OutputJSON(TestOutput{Error: &errStr})
return 0
}
pdk.OutputJSON(TestOutput{Status: info.Status, Message: info.Message, Attempt: info.Attempt})
case "cancel_task":
err := host.TaskCancel(input.TaskID)
if err != nil {
errStr := err.Error()
pdk.OutputJSON(TestOutput{Error: &errStr})
return 0
}
pdk.OutputJSON(TestOutput{})
case "clear_queue":
cleared, err := host.TaskClearQueue(input.QueueName)
if err != nil {
errStr := err.Error()
pdk.OutputJSON(TestOutput{Error: &errStr})
return 0
}
pdk.OutputJSON(TestOutput{Cleared: cleared})
default:
errStr := "unknown operation: " + input.Operation
pdk.OutputJSON(TestOutput{Error: &errStr})
}
return 0
}
func main() {}

View File

@@ -0,0 +1,12 @@
{
"name": "Test TaskQueue Plugin",
"author": "Navidrome Test",
"version": "1.0.0",
"description": "A test plugin for TaskQueue integration testing",
"permissions": {
"taskqueue": {
"reason": "For testing task queue operations",
"maxConcurrency": 10
}
}
}

View File

@@ -19,7 +19,6 @@ import (
"github.com/navidrome/navidrome/core/artwork"
"github.com/navidrome/navidrome/core/auth"
"github.com/navidrome/navidrome/core/external"
"github.com/navidrome/navidrome/core/lyrics"
"github.com/navidrome/navidrome/core/metrics"
"github.com/navidrome/navidrome/core/playback"
"github.com/navidrome/navidrome/core/playlists"
@@ -397,7 +396,6 @@ func setupTestDB() {
core.NewShare(ds),
playback.PlaybackServer(nil),
metrics.NewNoopInstance(),
lyrics.NewLyrics(nil),
)
}

View File

@@ -27,7 +27,7 @@ var _ = Describe("Album Lists", func() {
ds = &tests.MockDataStore{}
auth.Init(ds)
mockRepo = ds.Album(ctx).(*tests.MockAlbumRepo)
router = New(ds, nil, nil, nil, nil, nil, nil, nil, nil, nil, nil, nil, nil, nil)
router = New(ds, nil, nil, nil, nil, nil, nil, nil, nil, nil, nil, nil, nil)
w = httptest.NewRecorder()
})

View File

@@ -14,7 +14,6 @@ import (
"github.com/navidrome/navidrome/core"
"github.com/navidrome/navidrome/core/artwork"
"github.com/navidrome/navidrome/core/external"
"github.com/navidrome/navidrome/core/lyrics"
"github.com/navidrome/navidrome/core/metrics"
"github.com/navidrome/navidrome/core/playback"
playlistsvc "github.com/navidrome/navidrome/core/playlists"
@@ -49,13 +48,12 @@ type Router struct {
share core.Share
playback playback.PlaybackServer
metrics metrics.Metrics
lyrics lyrics.Lyrics
}
func New(ds model.DataStore, artwork artwork.Artwork, streamer core.MediaStreamer, archiver core.Archiver,
players core.Players, provider external.Provider, scanner model.Scanner, broker events.Broker,
playlists playlistsvc.Playlists, scrobbler scrobbler.PlayTracker, share core.Share, playback playback.PlaybackServer,
metrics metrics.Metrics, lyrics lyrics.Lyrics,
metrics metrics.Metrics,
) *Router {
r := &Router{
ds: ds,
@@ -71,7 +69,6 @@ func New(ds model.DataStore, artwork artwork.Artwork, streamer core.MediaStreame
share: share,
playback: playback,
metrics: metrics,
lyrics: lyrics,
}
r.Handler = r.routes()
return r

View File

@@ -27,7 +27,7 @@ var _ = Describe("MediaAnnotationController", func() {
ds = &tests.MockDataStore{}
playTracker = &fakePlayTracker{}
eventBroker = &fakeEventBroker{}
router = New(ds, nil, nil, nil, nil, nil, nil, eventBroker, nil, playTracker, nil, nil, nil, nil)
router = New(ds, nil, nil, nil, nil, nil, nil, eventBroker, nil, playTracker, nil, nil, nil)
})
Describe("Scrobble", func() {

View File

@@ -10,6 +10,7 @@ import (
"github.com/navidrome/navidrome/conf"
"github.com/navidrome/navidrome/consts"
"github.com/navidrome/navidrome/core/lyrics"
"github.com/navidrome/navidrome/log"
"github.com/navidrome/navidrome/model"
"github.com/navidrome/navidrome/resources"
@@ -108,7 +109,7 @@ func (api *Router) GetLyrics(r *http.Request) (*responses.Subsonic, error) {
return response, nil
}
structuredLyrics, err := api.lyrics.GetLyrics(r.Context(), &mediaFiles[0])
structuredLyrics, err := lyrics.GetLyrics(r.Context(), &mediaFiles[0])
if err != nil {
return nil, err
}
@@ -141,7 +142,7 @@ func (api *Router) GetLyricsBySongId(r *http.Request) (*responses.Subsonic, erro
return nil, err
}
structuredLyrics, err := api.lyrics.GetLyrics(r.Context(), mediaFile)
structuredLyrics, err := lyrics.GetLyrics(r.Context(), mediaFile)
if err != nil {
return nil, err
}

View File

@@ -14,7 +14,6 @@ import (
"github.com/navidrome/navidrome/conf"
"github.com/navidrome/navidrome/conf/configtest"
"github.com/navidrome/navidrome/core/artwork"
"github.com/navidrome/navidrome/core/lyrics"
"github.com/navidrome/navidrome/model"
"github.com/navidrome/navidrome/server/subsonic/responses"
"github.com/navidrome/navidrome/tests"
@@ -34,7 +33,7 @@ var _ = Describe("MediaRetrievalController", func() {
MockedMediaFile: mockRepo,
}
artwork = &fakeArtwork{data: "image data"}
router = New(ds, artwork, nil, nil, nil, nil, nil, nil, nil, nil, nil, nil, nil, lyrics.NewLyrics(nil))
router = New(ds, artwork, nil, nil, nil, nil, nil, nil, nil, nil, nil, nil, nil)
w = httptest.NewRecorder()
DeferCleanup(configtest.SetupConfig())
conf.Server.LyricsPriority = "embedded,.lrc"

View File

@@ -19,7 +19,7 @@ var _ = Describe("GetOpenSubsonicExtensions", func() {
)
BeforeEach(func() {
router = subsonic.New(nil, nil, nil, nil, nil, nil, nil, nil, nil, nil, nil, nil, nil, nil)
router = subsonic.New(nil, nil, nil, nil, nil, nil, nil, nil, nil, nil, nil, nil, nil)
w = httptest.NewRecorder()
r = httptest.NewRequest("GET", "/getOpenSubsonicExtensions?f=json", nil)
})

View File

@@ -24,7 +24,7 @@ var _ = Describe("buildPlaylist", func() {
BeforeEach(func() {
ds = &tests.MockDataStore{}
router = New(ds, nil, nil, nil, nil, nil, nil, nil, nil, nil, nil, nil, nil, nil)
router = New(ds, nil, nil, nil, nil, nil, nil, nil, nil, nil, nil, nil, nil)
ctx = context.Background()
})
@@ -224,7 +224,7 @@ var _ = Describe("UpdatePlaylist", func() {
BeforeEach(func() {
ds = &tests.MockDataStore{}
playlists = &fakePlaylists{}
router = New(ds, nil, nil, nil, nil, nil, nil, nil, playlists, nil, nil, nil, nil, nil)
router = New(ds, nil, nil, nil, nil, nil, nil, nil, playlists, nil, nil, nil, nil)
})
It("clears the comment when parameter is empty", func() {

View File

@@ -21,7 +21,7 @@ var _ = Describe("Search", func() {
ds = &tests.MockDataStore{}
auth.Init(ds)
router = New(ds, nil, nil, nil, nil, nil, nil, nil, nil, nil, nil, nil, nil, nil)
router = New(ds, nil, nil, nil, nil, nil, nil, nil, nil, nil, nil, nil, nil)
// Get references to the mock repositories so we can inspect their Options
mockAlbumRepo = ds.Album(nil).(*tests.MockAlbumRepo)