Compare commits

..

1 Commits

Author SHA1 Message Date
Deluan
759214cfbc chore(deps): update Go version to 1.26 in Dockerfile and go.mod
Signed-off-by: Deluan <deluan@navidrome.org>
2026-02-24 21:49:19 -05:00
43 changed files with 32 additions and 3140 deletions

View File

@@ -4,7 +4,7 @@
"dockerfile": "Dockerfile",
"args": {
// Update the VARIANT arg to pick a version of Go: 1, 1.15, 1.14
"VARIANT": "1.25",
"VARIANT": "1.26",
// Options
"INSTALL_NODE": "true",
"NODE_VERSION": "v24",

View File

@@ -63,7 +63,7 @@ COPY --from=ui /build /build
########################################################################################################################
### Build Navidrome binary
FROM --platform=$BUILDPLATFORM public.ecr.aws/docker/library/golang:1.25-trixie AS base
FROM --platform=$BUILDPLATFORM public.ecr.aws/docker/library/golang:1.26-trixie AS base
RUN apt-get update && apt-get install -y clang lld
COPY --from=xx / /
WORKDIR /workspace

View File

@@ -155,7 +155,6 @@ type scannerOptions struct {
type subsonicOptions struct {
AppendSubtitle bool
AppendAlbumVersion bool
ArtistParticipations bool
DefaultReportRealPath bool
EnableAverageRating bool
@@ -690,7 +689,6 @@ func setViperDefaults() {
viper.SetDefault("scanner.followsymlinks", true)
viper.SetDefault("scanner.purgemissing", consts.PurgeMissingNever)
viper.SetDefault("subsonic.appendsubtitle", true)
viper.SetDefault("subsonic.appendalbumversion", true)
viper.SetDefault("subsonic.artistparticipations", false)
viper.SetDefault("subsonic.defaultreportrealpath", false)
viper.SetDefault("subsonic.enableaveragerating", true)

2
go.mod
View File

@@ -1,6 +1,6 @@
module github.com/navidrome/navidrome
go 1.25.0
go 1.26
replace (
// Fork to fix https://github.com/navidrome/navidrome/issues/3254

View File

@@ -1,14 +1,11 @@
package model
import (
"fmt"
"iter"
"math"
"sync"
"time"
"github.com/navidrome/navidrome/conf"
"github.com/gohugoio/hashstructure"
)
@@ -73,13 +70,6 @@ func (a Album) CoverArtID() ArtworkID {
return artworkIDFromAlbum(a)
}
func (a Album) FullName() string {
if conf.Server.Subsonic.AppendAlbumVersion && len(a.Tags[TagAlbumVersion]) > 0 {
return fmt.Sprintf("%s (%s)", a.Name, a.Tags[TagAlbumVersion][0])
}
return a.Name
}
// Equals compares two Album structs, ignoring calculated fields
func (a Album) Equals(other Album) bool {
// Normalize float32 values to avoid false negatives

View File

@@ -3,30 +3,11 @@ package model_test
import (
"encoding/json"
"github.com/navidrome/navidrome/conf"
"github.com/navidrome/navidrome/conf/configtest"
. "github.com/navidrome/navidrome/model"
. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"
)
var _ = Describe("Album", func() {
BeforeEach(func() {
DeferCleanup(configtest.SetupConfig())
})
DescribeTable("FullName",
func(enabled bool, tags Tags, expected string) {
conf.Server.Subsonic.AppendAlbumVersion = enabled
a := Album{Name: "Album", Tags: tags}
Expect(a.FullName()).To(Equal(expected))
},
Entry("appends version when enabled and tag is present", true, Tags{TagAlbumVersion: []string{"Remastered"}}, "Album (Remastered)"),
Entry("returns just name when disabled", false, Tags{TagAlbumVersion: []string{"Remastered"}}, "Album"),
Entry("returns just name when tag is absent", true, Tags{}, "Album"),
Entry("returns just name when tag is an empty slice", true, Tags{TagAlbumVersion: []string{}}, "Album"),
)
})
var _ = Describe("Albums", func() {
var albums Albums

View File

@@ -95,19 +95,12 @@ type MediaFile struct {
}
func (mf MediaFile) FullTitle() string {
if conf.Server.Subsonic.AppendSubtitle && len(mf.Tags[TagSubtitle]) > 0 {
if conf.Server.Subsonic.AppendSubtitle && mf.Tags[TagSubtitle] != nil {
return fmt.Sprintf("%s (%s)", mf.Title, mf.Tags[TagSubtitle][0])
}
return mf.Title
}
func (mf MediaFile) FullAlbumName() string {
if conf.Server.Subsonic.AppendAlbumVersion && len(mf.Tags[TagAlbumVersion]) > 0 {
return fmt.Sprintf("%s (%s)", mf.Album, mf.Tags[TagAlbumVersion][0])
}
return mf.Album
}
func (mf MediaFile) ContentType() string {
return mime.TypeByExtension("." + mf.Suffix)
}

View File

@@ -475,29 +475,7 @@ var _ = Describe("MediaFile", func() {
DeferCleanup(configtest.SetupConfig())
conf.Server.EnableMediaFileCoverArt = true
})
DescribeTable("FullTitle",
func(enabled bool, tags Tags, expected string) {
conf.Server.Subsonic.AppendSubtitle = enabled
mf := MediaFile{Title: "Song", Tags: tags}
Expect(mf.FullTitle()).To(Equal(expected))
},
Entry("appends subtitle when enabled and tag is present", true, Tags{TagSubtitle: []string{"Live"}}, "Song (Live)"),
Entry("returns just title when disabled", false, Tags{TagSubtitle: []string{"Live"}}, "Song"),
Entry("returns just title when tag is absent", true, Tags{}, "Song"),
Entry("returns just title when tag is an empty slice", true, Tags{TagSubtitle: []string{}}, "Song"),
)
DescribeTable("FullAlbumName",
func(enabled bool, tags Tags, expected string) {
conf.Server.Subsonic.AppendAlbumVersion = enabled
mf := MediaFile{Album: "Album", Tags: tags}
Expect(mf.FullAlbumName()).To(Equal(expected))
},
Entry("appends version when enabled and tag is present", true, Tags{TagAlbumVersion: []string{"Deluxe Edition"}}, "Album (Deluxe Edition)"),
Entry("returns just album name when disabled", false, Tags{TagAlbumVersion: []string{"Deluxe Edition"}}, "Album"),
Entry("returns just album name when tag is absent", true, Tags{}, "Album"),
Entry("returns just album name when tag is an empty slice", true, Tags{TagAlbumVersion: []string{}}, "Album"),
)
Describe("CoverArtId()", func() {
Describe(".CoverArtId()", func() {
It("returns its own id if it HasCoverArt", func() {
mf := MediaFile{ID: "111", AlbumID: "1", HasCoverArt: true}
id := mf.CoverArtID()

View File

@@ -1,33 +0,0 @@
package capabilities
// TaskWorker provides task execution handling.
// This capability allows plugins to receive callbacks when their queued tasks
// are ready to execute. Plugins that use the taskqueue host service must
// implement this capability.
//
//nd:capability name=taskworker
type TaskWorker interface {
// OnTaskExecute is called when a queued task is ready to run.
// Return an error to trigger retry (if retries are configured).
//nd:export name=nd_task_execute
OnTaskExecute(TaskExecuteRequest) (TaskExecuteResponse, error)
}
// TaskExecuteRequest is the request provided when a task is ready to execute.
type TaskExecuteRequest struct {
// QueueName is the name of the queue this task belongs to.
QueueName string `json:"queueName"`
// TaskID is the unique identifier for this task.
TaskID string `json:"taskId"`
// Payload is the opaque data provided when the task was enqueued.
Payload []byte `json:"payload"`
// Attempt is the current attempt number (1-based: first attempt = 1).
Attempt int32 `json:"attempt"`
}
// TaskExecuteResponse is the response from task execution.
type TaskExecuteResponse struct {
// Error, if non-empty, indicates the task failed. The task will be retried
// if retries are configured and attempts remain.
Error string `json:"error,omitempty"`
}

View File

@@ -1,45 +0,0 @@
version: v1-draft
exports:
nd_task_execute:
description: |-
OnTaskExecute is called when a queued task is ready to run.
Return an error to trigger retry (if retries are configured).
input:
$ref: '#/components/schemas/TaskExecuteRequest'
contentType: application/json
output:
$ref: '#/components/schemas/TaskExecuteResponse'
contentType: application/json
components:
schemas:
TaskExecuteRequest:
description: TaskExecuteRequest is the request provided when a task is ready to execute.
properties:
queueName:
type: string
description: QueueName is the name of the queue this task belongs to.
taskId:
type: string
description: TaskID is the unique identifier for this task.
payload:
type: array
description: Payload is the opaque data provided when the task was enqueued.
items:
type: object
attempt:
type: integer
format: int32
description: 'Attempt is the current attempt number (1-based: first attempt = 1).'
required:
- queueName
- taskId
- payload
- attempt
TaskExecuteResponse:
description: TaskExecuteResponse is the response from task execution.
properties:
error:
type: string
description: |-
Error, if non-empty, indicates the task failed. The task will be retried
if retries are configured and attempts remain.

View File

@@ -1,6 +1,6 @@
module github.com/navidrome/navidrome/plugins/cmd/ndpgen
go 1.25
go 1.26
require (
github.com/extism/go-pdk v1.1.3

View File

@@ -1,57 +0,0 @@
package host
import "context"
// QueueConfig holds configuration for a task queue.
type QueueConfig struct {
// Concurrency is the max number of parallel workers. Default: 1.
// Capped by the plugin's manifest maxConcurrency.
Concurrency int32 `json:"concurrency"`
// MaxRetries is the number of times to retry a failed task. Default: 0.
MaxRetries int32 `json:"maxRetries"`
// BackoffMs is the initial backoff between retries in milliseconds.
// Doubles each retry (exponential: backoffMs * 2^(attempt-1)). Default: 1000.
BackoffMs int64 `json:"backoffMs"`
// DelayMs is the minimum delay between starting consecutive tasks
// in milliseconds. Useful for rate limiting. Default: 0.
DelayMs int64 `json:"delayMs"`
// RetentionMs is how long completed/failed/cancelled tasks are kept
// in milliseconds. Default: 3600000 (1h). Min: 60000 (1m). Max: 604800000 (1w).
RetentionMs int64 `json:"retentionMs"`
}
// TaskQueueService provides persistent task queues for plugins.
//
// This service allows plugins to create named queues with configurable concurrency,
// retry policies, and rate limiting. Tasks are persisted to SQLite and survive
// server restarts. When a task is ready to execute, the host calls the plugin's
// nd_task_execute callback function.
//
//nd:hostservice name=TaskQueue permission=taskqueue
type TaskQueueService interface {
// CreateQueue creates a named task queue with the given configuration.
// Zero-value fields in config use sensible defaults.
// If a queue with the same name already exists, returns an error.
// On startup, this also recovers any stale "running" tasks from a previous crash.
//nd:hostfunc
CreateQueue(ctx context.Context, name string, config QueueConfig) error
// Enqueue adds a task to the named queue. Returns the task ID.
// payload is opaque bytes passed back to the plugin on execution.
//nd:hostfunc
Enqueue(ctx context.Context, queueName string, payload []byte) (string, error)
// GetTaskStatus returns the status of a task: "pending", "running",
// "completed", "failed", or "cancelled".
//nd:hostfunc
GetTaskStatus(ctx context.Context, taskID string) (string, error)
// CancelTask cancels a pending task. Returns error if already
// running, completed, or failed.
//nd:hostfunc
CancelTask(ctx context.Context, taskID string) error
}

View File

@@ -1,220 +0,0 @@
// Code generated by ndpgen. DO NOT EDIT.
package host
import (
"context"
"encoding/json"
extism "github.com/extism/go-sdk"
)
// TaskQueueCreateQueueRequest is the request type for TaskQueue.CreateQueue.
type TaskQueueCreateQueueRequest struct {
Name string `json:"name"`
Config QueueConfig `json:"config"`
}
// TaskQueueCreateQueueResponse is the response type for TaskQueue.CreateQueue.
type TaskQueueCreateQueueResponse struct {
Error string `json:"error,omitempty"`
}
// TaskQueueEnqueueRequest is the request type for TaskQueue.Enqueue.
type TaskQueueEnqueueRequest struct {
QueueName string `json:"queueName"`
Payload []byte `json:"payload"`
}
// TaskQueueEnqueueResponse is the response type for TaskQueue.Enqueue.
type TaskQueueEnqueueResponse struct {
Result string `json:"result,omitempty"`
Error string `json:"error,omitempty"`
}
// TaskQueueGetTaskStatusRequest is the request type for TaskQueue.GetTaskStatus.
type TaskQueueGetTaskStatusRequest struct {
TaskID string `json:"taskId"`
}
// TaskQueueGetTaskStatusResponse is the response type for TaskQueue.GetTaskStatus.
type TaskQueueGetTaskStatusResponse struct {
Result string `json:"result,omitempty"`
Error string `json:"error,omitempty"`
}
// TaskQueueCancelTaskRequest is the request type for TaskQueue.CancelTask.
type TaskQueueCancelTaskRequest struct {
TaskID string `json:"taskId"`
}
// TaskQueueCancelTaskResponse is the response type for TaskQueue.CancelTask.
type TaskQueueCancelTaskResponse struct {
Error string `json:"error,omitempty"`
}
// RegisterTaskQueueHostFunctions registers TaskQueue service host functions.
// The returned host functions should be added to the plugin's configuration.
func RegisterTaskQueueHostFunctions(service TaskQueueService) []extism.HostFunction {
return []extism.HostFunction{
newTaskQueueCreateQueueHostFunction(service),
newTaskQueueEnqueueHostFunction(service),
newTaskQueueGetTaskStatusHostFunction(service),
newTaskQueueCancelTaskHostFunction(service),
}
}
func newTaskQueueCreateQueueHostFunction(service TaskQueueService) extism.HostFunction {
return extism.NewHostFunctionWithStack(
"taskqueue_createqueue",
func(ctx context.Context, p *extism.CurrentPlugin, stack []uint64) {
// Read JSON request from plugin memory
reqBytes, err := p.ReadBytes(stack[0])
if err != nil {
taskqueueWriteError(p, stack, err)
return
}
var req TaskQueueCreateQueueRequest
if err := json.Unmarshal(reqBytes, &req); err != nil {
taskqueueWriteError(p, stack, err)
return
}
// Call the service method
if svcErr := service.CreateQueue(ctx, req.Name, req.Config); svcErr != nil {
taskqueueWriteError(p, stack, svcErr)
return
}
// Write JSON response to plugin memory
resp := TaskQueueCreateQueueResponse{}
taskqueueWriteResponse(p, stack, resp)
},
[]extism.ValueType{extism.ValueTypePTR},
[]extism.ValueType{extism.ValueTypePTR},
)
}
func newTaskQueueEnqueueHostFunction(service TaskQueueService) extism.HostFunction {
return extism.NewHostFunctionWithStack(
"taskqueue_enqueue",
func(ctx context.Context, p *extism.CurrentPlugin, stack []uint64) {
// Read JSON request from plugin memory
reqBytes, err := p.ReadBytes(stack[0])
if err != nil {
taskqueueWriteError(p, stack, err)
return
}
var req TaskQueueEnqueueRequest
if err := json.Unmarshal(reqBytes, &req); err != nil {
taskqueueWriteError(p, stack, err)
return
}
// Call the service method
result, svcErr := service.Enqueue(ctx, req.QueueName, req.Payload)
if svcErr != nil {
taskqueueWriteError(p, stack, svcErr)
return
}
// Write JSON response to plugin memory
resp := TaskQueueEnqueueResponse{
Result: result,
}
taskqueueWriteResponse(p, stack, resp)
},
[]extism.ValueType{extism.ValueTypePTR},
[]extism.ValueType{extism.ValueTypePTR},
)
}
func newTaskQueueGetTaskStatusHostFunction(service TaskQueueService) extism.HostFunction {
return extism.NewHostFunctionWithStack(
"taskqueue_gettaskstatus",
func(ctx context.Context, p *extism.CurrentPlugin, stack []uint64) {
// Read JSON request from plugin memory
reqBytes, err := p.ReadBytes(stack[0])
if err != nil {
taskqueueWriteError(p, stack, err)
return
}
var req TaskQueueGetTaskStatusRequest
if err := json.Unmarshal(reqBytes, &req); err != nil {
taskqueueWriteError(p, stack, err)
return
}
// Call the service method
result, svcErr := service.GetTaskStatus(ctx, req.TaskID)
if svcErr != nil {
taskqueueWriteError(p, stack, svcErr)
return
}
// Write JSON response to plugin memory
resp := TaskQueueGetTaskStatusResponse{
Result: result,
}
taskqueueWriteResponse(p, stack, resp)
},
[]extism.ValueType{extism.ValueTypePTR},
[]extism.ValueType{extism.ValueTypePTR},
)
}
func newTaskQueueCancelTaskHostFunction(service TaskQueueService) extism.HostFunction {
return extism.NewHostFunctionWithStack(
"taskqueue_canceltask",
func(ctx context.Context, p *extism.CurrentPlugin, stack []uint64) {
// Read JSON request from plugin memory
reqBytes, err := p.ReadBytes(stack[0])
if err != nil {
taskqueueWriteError(p, stack, err)
return
}
var req TaskQueueCancelTaskRequest
if err := json.Unmarshal(reqBytes, &req); err != nil {
taskqueueWriteError(p, stack, err)
return
}
// Call the service method
if svcErr := service.CancelTask(ctx, req.TaskID); svcErr != nil {
taskqueueWriteError(p, stack, svcErr)
return
}
// Write JSON response to plugin memory
resp := TaskQueueCancelTaskResponse{}
taskqueueWriteResponse(p, stack, resp)
},
[]extism.ValueType{extism.ValueTypePTR},
[]extism.ValueType{extism.ValueTypePTR},
)
}
// taskqueueWriteResponse writes a JSON response to plugin memory.
func taskqueueWriteResponse(p *extism.CurrentPlugin, stack []uint64, resp any) {
respBytes, err := json.Marshal(resp)
if err != nil {
taskqueueWriteError(p, stack, err)
return
}
respPtr, err := p.WriteBytes(respBytes)
if err != nil {
stack[0] = 0
return
}
stack[0] = respPtr
}
// taskqueueWriteError writes an error response to plugin memory.
func taskqueueWriteError(p *extism.CurrentPlugin, stack []uint64, err error) {
errResp := struct {
Error string `json:"error"`
}{Error: err.Error()}
respBytes, _ := json.Marshal(errResp)
respPtr, _ := p.WriteBytes(respBytes)
stack[0] = respPtr
}

View File

@@ -188,6 +188,12 @@ func (s *schedulerServiceImpl) invokeCallback(ctx context.Context, scheduleID st
return
}
// Check if plugin has the scheduler capability
if !hasCapability(instance.capabilities, CapabilityScheduler) {
log.Warn(ctx, "Plugin does not have scheduler capability", "plugin", s.pluginName, "scheduleID", scheduleID)
return
}
// Prepare callback input
input := capabilities.SchedulerCallbackRequest{
ScheduleID: scheduleID,

View File

@@ -1,562 +0,0 @@
package plugins
import (
"context"
"database/sql"
"errors"
"fmt"
"io"
"os"
"path/filepath"
"sync"
"time"
_ "github.com/mattn/go-sqlite3"
"github.com/navidrome/navidrome/conf"
"github.com/navidrome/navidrome/log"
"github.com/navidrome/navidrome/model/id"
"github.com/navidrome/navidrome/plugins/capabilities"
"github.com/navidrome/navidrome/plugins/host"
"golang.org/x/time/rate"
)
const (
defaultConcurrency int32 = 1
defaultBackoffMs int64 = 1000
defaultRetentionMs int64 = 3_600_000 // 1 hour
minRetentionMs int64 = 60_000 // 1 minute
maxRetentionMs int64 = 604_800_000 // 1 week
maxQueueNameLength = 128
maxPayloadSize = 1 * 1024 * 1024 // 1MB
maxBackoffMs int64 = 3_600_000 // 1 hour
cleanupInterval = 5 * time.Minute
pollInterval = 5 * time.Second
shutdownTimeout = 10 * time.Second
taskStatusPending = "pending"
taskStatusRunning = "running"
taskStatusCompleted = "completed"
taskStatusFailed = "failed"
taskStatusCancelled = "cancelled"
)
// CapabilityTaskWorker indicates the plugin can receive task execution callbacks.
const CapabilityTaskWorker Capability = "TaskWorker"
const FuncTaskWorkerCallback = "nd_task_execute"
func init() {
registerCapability(CapabilityTaskWorker, FuncTaskWorkerCallback)
}
type queueState struct {
config host.QueueConfig
signal chan struct{}
limiter *rate.Limiter
}
// notifyWorkers sends a non-blocking signal to wake up queue workers.
func (qs *queueState) notifyWorkers() {
select {
case qs.signal <- struct{}{}:
default:
}
}
// taskQueueServiceImpl implements host.TaskQueueService with SQLite persistence
// and background worker goroutines for task execution.
type taskQueueServiceImpl struct {
pluginName string
manager *Manager
maxConcurrency int32
db *sql.DB
ctx context.Context
cancel context.CancelFunc
wg sync.WaitGroup
mu sync.Mutex
queues map[string]*queueState
// For testing: override how callbacks are invoked
invokeCallbackFn func(ctx context.Context, queueName, taskID string, payload []byte, attempt int32) error
}
// newTaskQueueService creates a new taskQueueServiceImpl with its own SQLite database.
func newTaskQueueService(pluginName string, manager *Manager, maxConcurrency int32) (*taskQueueServiceImpl, error) {
dataDir := filepath.Join(conf.Server.DataFolder, "plugins", pluginName)
if err := os.MkdirAll(dataDir, 0700); err != nil {
return nil, fmt.Errorf("creating plugin data directory: %w", err)
}
dbPath := filepath.Join(dataDir, "taskqueue.db")
db, err := sql.Open("sqlite3", dbPath+"?_busy_timeout=5000&_journal_mode=WAL&_foreign_keys=off")
if err != nil {
return nil, fmt.Errorf("opening taskqueue database: %w", err)
}
db.SetMaxOpenConns(3)
db.SetMaxIdleConns(1)
if err := createTaskQueueSchema(db); err != nil {
db.Close()
return nil, fmt.Errorf("creating taskqueue schema: %w", err)
}
ctx, cancel := context.WithCancel(manager.ctx)
s := &taskQueueServiceImpl{
pluginName: pluginName,
manager: manager,
maxConcurrency: maxConcurrency,
db: db,
ctx: ctx,
cancel: cancel,
queues: make(map[string]*queueState),
}
s.invokeCallbackFn = s.defaultInvokeCallback
s.wg.Go(s.cleanupLoop)
log.Debug("Initialized plugin taskqueue", "plugin", pluginName, "path", dbPath, "maxConcurrency", maxConcurrency)
return s, nil
}
func createTaskQueueSchema(db *sql.DB) error {
_, err := db.Exec(`
CREATE TABLE IF NOT EXISTS queues (
name TEXT PRIMARY KEY,
concurrency INTEGER NOT NULL DEFAULT 1,
max_retries INTEGER NOT NULL DEFAULT 0,
backoff_ms INTEGER NOT NULL DEFAULT 1000,
delay_ms INTEGER NOT NULL DEFAULT 0,
retention_ms INTEGER NOT NULL DEFAULT 3600000
);
CREATE TABLE IF NOT EXISTS tasks (
id TEXT PRIMARY KEY,
queue_name TEXT NOT NULL REFERENCES queues(name),
payload BLOB NOT NULL,
status TEXT NOT NULL DEFAULT 'pending',
attempt INTEGER NOT NULL DEFAULT 0,
max_retries INTEGER NOT NULL,
next_run_at INTEGER NOT NULL,
created_at INTEGER NOT NULL,
updated_at INTEGER NOT NULL
);
CREATE INDEX IF NOT EXISTS idx_tasks_dequeue ON tasks(queue_name, status, next_run_at);
`)
return err
}
// applyConfigDefaults fills zero-value config fields with sensible defaults
// and clamps values to valid ranges, logging warnings for clamped values.
func (s *taskQueueServiceImpl) applyConfigDefaults(ctx context.Context, name string, config *host.QueueConfig) {
if config.Concurrency <= 0 {
config.Concurrency = defaultConcurrency
}
if config.BackoffMs <= 0 {
config.BackoffMs = defaultBackoffMs
}
if config.RetentionMs <= 0 {
config.RetentionMs = defaultRetentionMs
}
if config.RetentionMs < minRetentionMs {
log.Warn(ctx, "TaskQueue retention clamped to minimum", "plugin", s.pluginName, "queue", name,
"requested", config.RetentionMs, "min", minRetentionMs)
config.RetentionMs = minRetentionMs
}
if config.RetentionMs > maxRetentionMs {
log.Warn(ctx, "TaskQueue retention clamped to maximum", "plugin", s.pluginName, "queue", name,
"requested", config.RetentionMs, "max", maxRetentionMs)
config.RetentionMs = maxRetentionMs
}
}
// clampConcurrency reduces config.Concurrency if it exceeds the remaining budget.
// Returns an error when the concurrency budget is fully exhausted.
// Must be called with s.mu held.
func (s *taskQueueServiceImpl) clampConcurrency(ctx context.Context, name string, config *host.QueueConfig) error {
var allocated int32
for _, qs := range s.queues {
allocated += qs.config.Concurrency
}
available := s.maxConcurrency - allocated
if available <= 0 {
log.Warn(ctx, "TaskQueue concurrency budget exhausted", "plugin", s.pluginName, "queue", name,
"allocated", allocated, "maxConcurrency", s.maxConcurrency)
return fmt.Errorf("concurrency budget exhausted (%d/%d allocated)", allocated, s.maxConcurrency)
}
if config.Concurrency > available {
log.Warn(ctx, "TaskQueue concurrency clamped", "plugin", s.pluginName, "queue", name,
"requested", config.Concurrency, "available", available, "maxConcurrency", s.maxConcurrency)
config.Concurrency = available
}
return nil
}
func (s *taskQueueServiceImpl) CreateQueue(ctx context.Context, name string, config host.QueueConfig) error {
if len(name) == 0 {
return fmt.Errorf("queue name cannot be empty")
}
if len(name) > maxQueueNameLength {
return fmt.Errorf("queue name exceeds maximum length of %d bytes", maxQueueNameLength)
}
s.applyConfigDefaults(ctx, name, &config)
s.mu.Lock()
defer s.mu.Unlock()
if err := s.clampConcurrency(ctx, name, &config); err != nil {
return err
}
if _, exists := s.queues[name]; exists {
return fmt.Errorf("queue %q already exists", name)
}
// Upsert into queues table (idempotent across restarts)
_, err := s.db.ExecContext(ctx, `
INSERT INTO queues (name, concurrency, max_retries, backoff_ms, delay_ms, retention_ms)
VALUES (?, ?, ?, ?, ?, ?)
ON CONFLICT(name) DO UPDATE SET
concurrency = excluded.concurrency,
max_retries = excluded.max_retries,
backoff_ms = excluded.backoff_ms,
delay_ms = excluded.delay_ms,
retention_ms = excluded.retention_ms
`, name, config.Concurrency, config.MaxRetries, config.BackoffMs, config.DelayMs, config.RetentionMs)
if err != nil {
return fmt.Errorf("creating queue: %w", err)
}
// Reset stale running tasks from previous crash
now := time.Now().UnixMilli()
_, err = s.db.ExecContext(ctx, `
UPDATE tasks SET status = ?, updated_at = ? WHERE queue_name = ? AND status = ?
`, taskStatusPending, now, name, taskStatusRunning)
if err != nil {
return fmt.Errorf("resetting stale tasks: %w", err)
}
qs := &queueState{
config: config,
signal: make(chan struct{}, 1),
}
if config.DelayMs > 0 {
// Rate limit dispatches to enforce delay between tasks.
// Burst of 1 allows one immediate dispatch, then enforces the delay interval.
qs.limiter = rate.NewLimiter(rate.Every(time.Duration(config.DelayMs)*time.Millisecond), 1)
}
s.queues[name] = qs
for i := int32(0); i < config.Concurrency; i++ {
s.wg.Go(func() { s.worker(name, qs) })
}
log.Debug(ctx, "Created task queue", "plugin", s.pluginName, "queue", name,
"concurrency", config.Concurrency, "maxRetries", config.MaxRetries,
"backoffMs", config.BackoffMs, "delayMs", config.DelayMs, "retentionMs", config.RetentionMs)
return nil
}
func (s *taskQueueServiceImpl) Enqueue(ctx context.Context, queueName string, payload []byte) (string, error) {
s.mu.Lock()
qs, exists := s.queues[queueName]
s.mu.Unlock()
if !exists {
return "", fmt.Errorf("queue %q does not exist", queueName)
}
if len(payload) > maxPayloadSize {
return "", fmt.Errorf("payload size %d exceeds maximum of %d bytes", len(payload), maxPayloadSize)
}
taskID := id.NewRandom()
now := time.Now().UnixMilli()
_, err := s.db.ExecContext(ctx, `
INSERT INTO tasks (id, queue_name, payload, status, attempt, max_retries, next_run_at, created_at, updated_at)
VALUES (?, ?, ?, ?, 0, ?, ?, ?, ?)
`, taskID, queueName, payload, taskStatusPending, qs.config.MaxRetries, now, now, now)
if err != nil {
return "", fmt.Errorf("enqueuing task: %w", err)
}
qs.notifyWorkers()
log.Trace(ctx, "Enqueued task", "plugin", s.pluginName, "queue", queueName, "taskID", taskID)
return taskID, nil
}
// GetTaskStatus returns the status of a task.
func (s *taskQueueServiceImpl) GetTaskStatus(ctx context.Context, taskID string) (string, error) {
var status string
err := s.db.QueryRowContext(ctx, `SELECT status FROM tasks WHERE id = ?`, taskID).Scan(&status)
if errors.Is(err, sql.ErrNoRows) {
return "", fmt.Errorf("task %q not found", taskID)
}
if err != nil {
return "", fmt.Errorf("getting task status: %w", err)
}
return status, nil
}
// CancelTask cancels a pending task.
func (s *taskQueueServiceImpl) CancelTask(ctx context.Context, taskID string) error {
now := time.Now().UnixMilli()
result, err := s.db.ExecContext(ctx, `
UPDATE tasks SET status = ?, updated_at = ? WHERE id = ? AND status = ?
`, taskStatusCancelled, now, taskID, taskStatusPending)
if err != nil {
return fmt.Errorf("cancelling task: %w", err)
}
rowsAffected, err := result.RowsAffected()
if err != nil {
return fmt.Errorf("checking cancel result: %w", err)
}
if rowsAffected == 0 {
// Check if task exists at all
var status string
err := s.db.QueryRowContext(ctx, `SELECT status FROM tasks WHERE id = ?`, taskID).Scan(&status)
if errors.Is(err, sql.ErrNoRows) {
return fmt.Errorf("task %q not found", taskID)
}
if err != nil {
return fmt.Errorf("checking task existence: %w", err)
}
return fmt.Errorf("task %q cannot be cancelled (status: %s)", taskID, status)
}
log.Trace(ctx, "Cancelled task", "plugin", s.pluginName, "taskID", taskID)
return nil
}
// worker is the main loop for a single worker goroutine.
func (s *taskQueueServiceImpl) worker(queueName string, qs *queueState) {
// Process any existing pending tasks immediately on startup
s.drainQueue(queueName, qs)
ticker := time.NewTicker(pollInterval)
defer ticker.Stop()
for {
select {
case <-s.ctx.Done():
return
case <-qs.signal:
s.drainQueue(queueName, qs)
case <-ticker.C:
s.drainQueue(queueName, qs)
}
}
}
func (s *taskQueueServiceImpl) drainQueue(queueName string, qs *queueState) {
for s.ctx.Err() == nil && s.processTask(queueName, qs) {
}
}
// processTask dequeues and processes a single task. Returns true if a task was processed.
func (s *taskQueueServiceImpl) processTask(queueName string, qs *queueState) bool {
now := time.Now().UnixMilli()
// Atomically dequeue a task
var taskID string
var payload []byte
var attempt, maxRetries int32
err := s.db.QueryRowContext(s.ctx, `
UPDATE tasks SET status = ?, attempt = attempt + 1, updated_at = ?
WHERE id = (
SELECT id FROM tasks
WHERE queue_name = ? AND status = ? AND next_run_at <= ?
ORDER BY next_run_at, created_at LIMIT 1
)
RETURNING id, payload, attempt, max_retries
`, taskStatusRunning, now, queueName, taskStatusPending, now).Scan(&taskID, &payload, &attempt, &maxRetries)
if errors.Is(err, sql.ErrNoRows) {
return false
}
if err != nil {
log.Error(s.ctx, "Failed to dequeue task", "plugin", s.pluginName, "queue", queueName, err)
return false
}
// Enforce delay between task dispatches using a rate limiter.
// This is done after dequeue so that empty polls don't consume rate tokens.
if qs.limiter != nil {
if err := qs.limiter.Wait(s.ctx); err != nil {
// Context cancelled during wait — revert task to pending for recovery
s.revertTaskToPending(taskID)
return false
}
}
// Invoke callback
log.Debug(s.ctx, "Executing task", "plugin", s.pluginName, "queue", queueName, "taskID", taskID, "attempt", attempt)
callbackErr := s.invokeCallbackFn(s.ctx, queueName, taskID, payload, attempt)
// If context was cancelled (shutdown), revert task to pending for recovery
if s.ctx.Err() != nil {
s.revertTaskToPending(taskID)
return false
}
if callbackErr == nil {
s.completeTask(queueName, taskID)
} else {
s.handleTaskFailure(queueName, taskID, attempt, maxRetries, qs, callbackErr)
}
return true
}
func (s *taskQueueServiceImpl) completeTask(queueName, taskID string) {
now := time.Now().UnixMilli()
if _, err := s.db.ExecContext(s.ctx, `UPDATE tasks SET status = ?, updated_at = ? WHERE id = ?`, taskStatusCompleted, now, taskID); err != nil {
log.Error(s.ctx, "Failed to mark task as completed", "plugin", s.pluginName, "taskID", taskID, err)
}
log.Debug(s.ctx, "Task completed", "plugin", s.pluginName, "queue", queueName, "taskID", taskID)
}
func (s *taskQueueServiceImpl) handleTaskFailure(queueName, taskID string, attempt, maxRetries int32, qs *queueState, callbackErr error) {
log.Warn(s.ctx, "Task execution failed", "plugin", s.pluginName, "queue", queueName,
"taskID", taskID, "attempt", attempt, "maxRetries", maxRetries, "err", callbackErr)
now := time.Now().UnixMilli()
if attempt > maxRetries {
if _, err := s.db.ExecContext(s.ctx, `UPDATE tasks SET status = ?, updated_at = ? WHERE id = ?`, taskStatusFailed, now, taskID); err != nil {
log.Error(s.ctx, "Failed to mark task as failed", "plugin", s.pluginName, "taskID", taskID, err)
}
log.Warn(s.ctx, "Task failed after all retries", "plugin", s.pluginName, "queue", queueName, "taskID", taskID)
return
}
// Exponential backoff: backoffMs * 2^(attempt-1)
backoff := qs.config.BackoffMs << (attempt - 1)
if backoff <= 0 || backoff > maxBackoffMs {
backoff = maxBackoffMs
}
nextRunAt := now + backoff
if _, err := s.db.ExecContext(s.ctx, `
UPDATE tasks SET status = ?, next_run_at = ?, updated_at = ? WHERE id = ?
`, taskStatusPending, nextRunAt, now, taskID); err != nil {
log.Error(s.ctx, "Failed to reschedule task for retry", "plugin", s.pluginName, "taskID", taskID, err)
}
// Wake worker after backoff expires
time.AfterFunc(time.Duration(backoff)*time.Millisecond, func() {
qs.notifyWorkers()
})
}
// revertTaskToPending puts a running task back to pending status and decrements the attempt
// counter (used during shutdown to ensure the interrupted attempt doesn't count).
func (s *taskQueueServiceImpl) revertTaskToPending(taskID string) {
now := time.Now().UnixMilli()
_, err := s.db.Exec(`UPDATE tasks SET status = ?, attempt = MAX(attempt - 1, 0), updated_at = ? WHERE id = ? AND status = ?`, taskStatusPending, now, taskID, taskStatusRunning)
if err != nil {
log.Error("Failed to revert task to pending", "plugin", s.pluginName, "taskID", taskID, err)
}
}
// defaultInvokeCallback calls the plugin's nd_task_execute function.
func (s *taskQueueServiceImpl) defaultInvokeCallback(ctx context.Context, queueName, taskID string, payload []byte, attempt int32) error {
s.manager.mu.RLock()
p, ok := s.manager.plugins[s.pluginName]
s.manager.mu.RUnlock()
if !ok {
return fmt.Errorf("plugin %s not loaded", s.pluginName)
}
input := capabilities.TaskExecuteRequest{
QueueName: queueName,
TaskID: taskID,
Payload: payload,
Attempt: attempt,
}
result, err := callPluginFunction[capabilities.TaskExecuteRequest, capabilities.TaskExecuteResponse](ctx, p, FuncTaskWorkerCallback, input)
if err != nil {
return err
}
if result.Error != "" {
return fmt.Errorf("%s", result.Error)
}
return nil
}
// cleanupLoop periodically removes terminal tasks past their retention period.
func (s *taskQueueServiceImpl) cleanupLoop() {
ticker := time.NewTicker(cleanupInterval)
defer ticker.Stop()
for {
select {
case <-s.ctx.Done():
return
case <-ticker.C:
s.runCleanup()
}
}
}
// runCleanup deletes terminal tasks past their retention period.
func (s *taskQueueServiceImpl) runCleanup() {
s.mu.Lock()
queues := make(map[string]*queueState, len(s.queues))
for k, v := range s.queues {
queues[k] = v
}
s.mu.Unlock()
now := time.Now().UnixMilli()
for name, qs := range queues {
result, err := s.db.ExecContext(s.ctx, `
DELETE FROM tasks WHERE queue_name = ? AND status IN (?, ?, ?) AND updated_at + ? < ?
`, name, taskStatusCompleted, taskStatusFailed, taskStatusCancelled, qs.config.RetentionMs, now)
if err != nil {
log.Error(s.ctx, "Failed to cleanup tasks", "plugin", s.pluginName, "queue", name, err)
continue
}
if deleted, _ := result.RowsAffected(); deleted > 0 {
log.Debug(s.ctx, "Cleaned up terminal tasks", "plugin", s.pluginName, "queue", name, "deleted", deleted)
}
}
}
// Close shuts down the task queue service, stopping all workers and closing the database.
func (s *taskQueueServiceImpl) Close() error {
// Cancel context to signal all goroutines
s.cancel()
// Wait for goroutines with timeout
done := make(chan struct{})
go func() {
s.wg.Wait()
close(done)
}()
select {
case <-done:
case <-time.After(shutdownTimeout):
log.Warn("TaskQueue shutdown timed out", "plugin", s.pluginName)
}
// Mark running tasks as pending for recovery on next startup
if s.db != nil {
now := time.Now().UnixMilli()
if _, err := s.db.Exec(`UPDATE tasks SET status = ?, updated_at = ? WHERE status = ?`, taskStatusPending, now, taskStatusRunning); err != nil {
log.Error("Failed to reset running tasks on shutdown", "plugin", s.pluginName, err)
}
log.Debug("Closing plugin taskqueue", "plugin", s.pluginName)
return s.db.Close()
}
return nil
}
// Compile-time verification
var _ host.TaskQueueService = (*taskQueueServiceImpl)(nil)
var _ io.Closer = (*taskQueueServiceImpl)(nil)

View File

@@ -1,968 +0,0 @@
//go:build !windows
package plugins
import (
"context"
"crypto/sha256"
"encoding/hex"
"encoding/json"
"errors"
"fmt"
"net/http"
"os"
"path/filepath"
"sort"
"strings"
"sync"
"sync/atomic"
"time"
"github.com/navidrome/navidrome/conf"
"github.com/navidrome/navidrome/conf/configtest"
"github.com/navidrome/navidrome/model"
"github.com/navidrome/navidrome/plugins/host"
"github.com/navidrome/navidrome/tests"
. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"
)
var _ = Describe("TaskQueueService", func() {
var tmpDir string
var service *taskQueueServiceImpl
var ctx context.Context
var manager *Manager
BeforeEach(func() {
ctx = GinkgoT().Context()
var err error
tmpDir, err = os.MkdirTemp("", "taskqueue-test-*")
Expect(err).ToNot(HaveOccurred())
DeferCleanup(configtest.SetupConfig())
conf.Server.DataFolder = tmpDir
// Create a mock manager with context
managerCtx, cancel := context.WithCancel(ctx)
manager = &Manager{
plugins: make(map[string]*plugin),
ctx: managerCtx,
}
DeferCleanup(cancel)
service, err = newTaskQueueService("test_plugin", manager, 5)
Expect(err).ToNot(HaveOccurred())
})
AfterEach(func() {
if service != nil {
service.Close()
}
os.RemoveAll(tmpDir)
})
Describe("CreateQueue", func() {
It("creates a queue successfully", func() {
err := service.CreateQueue(ctx, "my-queue", host.QueueConfig{
Concurrency: 2,
MaxRetries: 3,
BackoffMs: 2000,
RetentionMs: 7200000,
})
Expect(err).ToNot(HaveOccurred())
service.mu.Lock()
qs, exists := service.queues["my-queue"]
service.mu.Unlock()
Expect(exists).To(BeTrue())
Expect(qs.config.Concurrency).To(Equal(int32(2)))
Expect(qs.config.MaxRetries).To(Equal(int32(3)))
Expect(qs.config.BackoffMs).To(Equal(int64(2000)))
Expect(qs.config.RetentionMs).To(Equal(int64(7200000)))
})
It("returns error for duplicate queue name", func() {
err := service.CreateQueue(ctx, "dup-queue", host.QueueConfig{})
Expect(err).ToNot(HaveOccurred())
err = service.CreateQueue(ctx, "dup-queue", host.QueueConfig{})
Expect(err).To(HaveOccurred())
Expect(err.Error()).To(ContainSubstring("already exists"))
})
})
Describe("CreateQueue name validation", func() {
It("rejects empty queue name", func() {
err := service.CreateQueue(ctx, "", host.QueueConfig{})
Expect(err).To(HaveOccurred())
Expect(err.Error()).To(ContainSubstring("queue name cannot be empty"))
})
It("rejects over-length queue name", func() {
longName := strings.Repeat("a", maxQueueNameLength+1)
err := service.CreateQueue(ctx, longName, host.QueueConfig{})
Expect(err).To(HaveOccurred())
Expect(err.Error()).To(ContainSubstring("exceeds maximum length"))
})
It("accepts queue name at maximum length", func() {
service.invokeCallbackFn = func(_ context.Context, _, _ string, _ []byte, _ int32) error {
return nil
}
exactName := strings.Repeat("a", maxQueueNameLength)
err := service.CreateQueue(ctx, exactName, host.QueueConfig{})
Expect(err).ToNot(HaveOccurred())
})
})
Describe("CreateQueue defaults", func() {
It("applies defaults for zero-value config", func() {
err := service.CreateQueue(ctx, "defaults-queue", host.QueueConfig{})
Expect(err).ToNot(HaveOccurred())
service.mu.Lock()
qs := service.queues["defaults-queue"]
service.mu.Unlock()
Expect(qs.config.Concurrency).To(Equal(defaultConcurrency))
Expect(qs.config.BackoffMs).To(Equal(defaultBackoffMs))
Expect(qs.config.RetentionMs).To(Equal(defaultRetentionMs))
})
})
Describe("CreateQueue defaults with negative values", func() {
It("applies default RetentionMs for negative value", func() {
service.invokeCallbackFn = func(_ context.Context, _, _ string, _ []byte, _ int32) error {
return nil
}
err := service.CreateQueue(ctx, "neg-retention", host.QueueConfig{
RetentionMs: -500,
})
Expect(err).ToNot(HaveOccurred())
service.mu.Lock()
qs := service.queues["neg-retention"]
service.mu.Unlock()
Expect(qs.config.RetentionMs).To(Equal(defaultRetentionMs))
})
})
Describe("CreateQueue clamping", func() {
It("clamps concurrency exceeding maxConcurrency", func() {
// maxConcurrency is 5; request 10
err := service.CreateQueue(ctx, "clamped-queue", host.QueueConfig{
Concurrency: 10,
})
Expect(err).ToNot(HaveOccurred())
service.mu.Lock()
qs := service.queues["clamped-queue"]
service.mu.Unlock()
Expect(qs.config.Concurrency).To(Equal(int32(5)))
})
It("returns error when concurrency budget is exhausted", func() {
// maxConcurrency is 5; create a queue that uses all 5
err := service.CreateQueue(ctx, "full-budget", host.QueueConfig{
Concurrency: 5,
})
Expect(err).ToNot(HaveOccurred())
// Next queue should fail — no budget remaining
err = service.CreateQueue(ctx, "over-budget", host.QueueConfig{
Concurrency: 1,
})
Expect(err).To(HaveOccurred())
Expect(err.Error()).To(ContainSubstring("concurrency budget exhausted"))
})
It("clamps retention below minimum", func() {
err := service.CreateQueue(ctx, "low-retention", host.QueueConfig{
RetentionMs: 100, // below minRetentionMs
})
Expect(err).ToNot(HaveOccurred())
service.mu.Lock()
qs := service.queues["low-retention"]
service.mu.Unlock()
Expect(qs.config.RetentionMs).To(Equal(minRetentionMs))
})
It("clamps retention above maximum", func() {
err := service.CreateQueue(ctx, "high-retention", host.QueueConfig{
RetentionMs: 999_999_999_999, // above maxRetentionMs
})
Expect(err).ToNot(HaveOccurred())
service.mu.Lock()
qs := service.queues["high-retention"]
service.mu.Unlock()
Expect(qs.config.RetentionMs).To(Equal(maxRetentionMs))
})
})
Describe("Enqueue", func() {
BeforeEach(func() {
// Use a no-op callback to prevent actual execution attempts
service.invokeCallbackFn = func(_ context.Context, _, _ string, _ []byte, _ int32) error {
return nil
}
err := service.CreateQueue(ctx, "enqueue-test", host.QueueConfig{})
Expect(err).ToNot(HaveOccurred())
})
It("enqueues a task and returns task ID", func() {
taskID, err := service.Enqueue(ctx, "enqueue-test", []byte("payload"))
Expect(err).ToNot(HaveOccurred())
Expect(taskID).ToNot(BeEmpty())
})
It("returns error for non-existent queue", func() {
_, err := service.Enqueue(ctx, "no-such-queue", []byte("payload"))
Expect(err).To(HaveOccurred())
Expect(err.Error()).To(ContainSubstring("does not exist"))
})
It("rejects payload exceeding maximum size", func() {
bigPayload := make([]byte, maxPayloadSize+1)
_, err := service.Enqueue(ctx, "enqueue-test", bigPayload)
Expect(err).To(HaveOccurred())
Expect(err.Error()).To(ContainSubstring("exceeds maximum"))
})
It("accepts payload at maximum size", func() {
exactPayload := make([]byte, maxPayloadSize)
taskID, err := service.Enqueue(ctx, "enqueue-test", exactPayload)
Expect(err).ToNot(HaveOccurred())
Expect(taskID).ToNot(BeEmpty())
})
})
Describe("GetTaskStatus", func() {
BeforeEach(func() {
// Use a callback that blocks until context is cancelled so tasks stay pending
service.invokeCallbackFn = func(ctx context.Context, _, _ string, _ []byte, _ int32) error {
<-ctx.Done()
return ctx.Err()
}
})
It("returns pending for a new task", func() {
err := service.CreateQueue(ctx, "status-test", host.QueueConfig{})
Expect(err).ToNot(HaveOccurred())
taskID, err := service.Enqueue(ctx, "status-test", []byte("data"))
Expect(err).ToNot(HaveOccurred())
// The task may get picked up quickly; check initial status
// Since the callback blocks, it should be either pending or running
status, err := service.GetTaskStatus(ctx, taskID)
Expect(err).ToNot(HaveOccurred())
Expect(status).To(BeElementOf("pending", "running"))
})
It("returns error for unknown task ID", func() {
_, err := service.GetTaskStatus(ctx, "nonexistent-id")
Expect(err).To(HaveOccurred())
Expect(err.Error()).To(ContainSubstring("not found"))
})
})
Describe("CancelTask", func() {
BeforeEach(func() {
// Block callback so tasks stay in pending/running
service.invokeCallbackFn = func(ctx context.Context, _, _ string, _ []byte, _ int32) error {
<-ctx.Done()
return ctx.Err()
}
})
It("cancels a pending task", func() {
// Block the callback so the first task occupies the worker
started := make(chan struct{})
service.invokeCallbackFn = func(ctx context.Context, _, _ string, _ []byte, _ int32) error {
close(started)
<-ctx.Done()
return ctx.Err()
}
err := service.CreateQueue(ctx, "cancel-test", host.QueueConfig{
Concurrency: 1,
})
Expect(err).ToNot(HaveOccurred())
// Enqueue a blocker task to occupy the single worker
_, err = service.Enqueue(ctx, "cancel-test", []byte("blocker"))
Expect(err).ToNot(HaveOccurred())
// Wait for the blocker task to start running
Eventually(started).WithTimeout(5 * time.Second).Should(BeClosed())
// Enqueue a second task — it stays pending since the worker is busy
taskID, err := service.Enqueue(ctx, "cancel-test", []byte("cancel-me"))
Expect(err).ToNot(HaveOccurred())
err = service.CancelTask(ctx, taskID)
Expect(err).ToNot(HaveOccurred())
status, err := service.GetTaskStatus(ctx, taskID)
Expect(err).ToNot(HaveOccurred())
Expect(status).To(Equal("cancelled"))
})
It("returns error for unknown task ID", func() {
err := service.CancelTask(ctx, "nonexistent-id")
Expect(err).To(HaveOccurred())
Expect(err.Error()).To(ContainSubstring("not found"))
})
It("returns error for non-pending task", func() {
// Create a queue where tasks complete immediately
service.invokeCallbackFn = func(_ context.Context, _, _ string, _ []byte, _ int32) error {
return nil
}
err := service.CreateQueue(ctx, "completed-test", host.QueueConfig{})
Expect(err).ToNot(HaveOccurred())
taskID, err := service.Enqueue(ctx, "completed-test", []byte("data"))
Expect(err).ToNot(HaveOccurred())
// Wait for task to complete
Eventually(func() string {
status, _ := service.GetTaskStatus(ctx, taskID)
return status
}).WithTimeout(5 * time.Second).WithPolling(50 * time.Millisecond).Should(Equal("completed"))
// Try to cancel completed task
err = service.CancelTask(ctx, taskID)
Expect(err).To(HaveOccurred())
Expect(err.Error()).To(ContainSubstring("cannot be cancelled"))
})
})
Describe("Worker execution", func() {
It("invokes callback and completes task", func() {
var callCount atomic.Int32
var receivedQueueName, receivedTaskID string
var receivedPayload []byte
var receivedAttempt int32
service.invokeCallbackFn = func(_ context.Context, queueName, taskID string, payload []byte, attempt int32) error {
callCount.Add(1)
receivedQueueName = queueName
receivedTaskID = taskID
receivedPayload = payload
receivedAttempt = attempt
return nil
}
err := service.CreateQueue(ctx, "worker-test", host.QueueConfig{})
Expect(err).ToNot(HaveOccurred())
taskID, err := service.Enqueue(ctx, "worker-test", []byte("test-payload"))
Expect(err).ToNot(HaveOccurred())
Eventually(func() string {
status, _ := service.GetTaskStatus(ctx, taskID)
return status
}).WithTimeout(5 * time.Second).WithPolling(50 * time.Millisecond).Should(Equal("completed"))
Expect(callCount.Load()).To(Equal(int32(1)))
Expect(receivedQueueName).To(Equal("worker-test"))
Expect(receivedTaskID).To(Equal(taskID))
Expect(receivedPayload).To(Equal([]byte("test-payload")))
Expect(receivedAttempt).To(Equal(int32(1)))
})
})
Describe("Retry on failure", func() {
It("retries and eventually fails after exhausting retries", func() {
var callCount atomic.Int32
service.invokeCallbackFn = func(_ context.Context, _, _ string, _ []byte, _ int32) error {
callCount.Add(1)
return fmt.Errorf("task failed")
}
err := service.CreateQueue(ctx, "retry-test", host.QueueConfig{
MaxRetries: 2,
BackoffMs: 10, // Very short for testing
})
Expect(err).ToNot(HaveOccurred())
taskID, err := service.Enqueue(ctx, "retry-test", []byte("retry-payload"))
Expect(err).ToNot(HaveOccurred())
Eventually(func() string {
status, _ := service.GetTaskStatus(ctx, taskID)
return status
}).WithTimeout(10 * time.Second).WithPolling(50 * time.Millisecond).Should(Equal("failed"))
// 1 initial attempt + 2 retries = 3 total calls
Expect(callCount.Load()).To(Equal(int32(3)))
})
})
Describe("Retry then succeed", func() {
It("retries and succeeds on second attempt", func() {
var callCount atomic.Int32
service.invokeCallbackFn = func(_ context.Context, _, _ string, _ []byte, attempt int32) error {
callCount.Add(1)
if attempt == 1 {
return fmt.Errorf("temporary error")
}
return nil
}
err := service.CreateQueue(ctx, "retry-succeed", host.QueueConfig{
MaxRetries: 1,
BackoffMs: 10, // Very short for testing
})
Expect(err).ToNot(HaveOccurred())
taskID, err := service.Enqueue(ctx, "retry-succeed", []byte("data"))
Expect(err).ToNot(HaveOccurred())
Eventually(func() string {
status, _ := service.GetTaskStatus(ctx, taskID)
return status
}).WithTimeout(10 * time.Second).WithPolling(50 * time.Millisecond).Should(Equal("completed"))
Expect(callCount.Load()).To(Equal(int32(2)))
})
})
Describe("Backoff overflow cap", func() {
It("caps backoff at maxRetentionMs to prevent overflow", func() {
var callCount atomic.Int32
service.invokeCallbackFn = func(_ context.Context, _, _ string, _ []byte, _ int32) error {
callCount.Add(1)
return fmt.Errorf("always fail")
}
err := service.CreateQueue(ctx, "backoff-overflow", host.QueueConfig{
MaxRetries: 3,
BackoffMs: 1_000_000_000, // Very large backoff to trigger overflow on exponentiation
})
Expect(err).ToNot(HaveOccurred())
taskID, err := service.Enqueue(ctx, "backoff-overflow", []byte("overflow-test"))
Expect(err).ToNot(HaveOccurred())
// Wait for first attempt to fail
Eventually(func() int32 {
return callCount.Load()
}).WithTimeout(5 * time.Second).WithPolling(50 * time.Millisecond).Should(BeNumerically(">=", int32(1)))
// Check next_run_at is positive and reasonable (capped at maxRetentionMs from now)
var nextRunAt int64
err = service.db.QueryRow(`SELECT next_run_at FROM tasks WHERE id = ?`, taskID).Scan(&nextRunAt)
Expect(err).ToNot(HaveOccurred())
now := time.Now().UnixMilli()
Expect(nextRunAt).To(BeNumerically(">", int64(0)), "next_run_at should be positive")
Expect(nextRunAt).To(BeNumerically("<=", now+maxBackoffMs+1000), "next_run_at should be at most maxBackoffMs from now")
})
})
Describe("Delay enforcement with concurrent workers", func() {
It("enforces delay between dispatches even with multiple workers", func() {
var mu sync.Mutex
var dispatchTimes []time.Time
service.invokeCallbackFn = func(_ context.Context, _, _ string, _ []byte, _ int32) error {
mu.Lock()
dispatchTimes = append(dispatchTimes, time.Now())
mu.Unlock()
return nil
}
err := service.CreateQueue(ctx, "delay-concurrent", host.QueueConfig{
Concurrency: 3,
DelayMs: 200,
})
Expect(err).ToNot(HaveOccurred())
// Enqueue 5 tasks
for i := 0; i < 5; i++ {
_, err := service.Enqueue(ctx, "delay-concurrent", []byte(fmt.Sprintf("task-%d", i)))
Expect(err).ToNot(HaveOccurred())
}
// Wait for all tasks to complete
Eventually(func() int {
mu.Lock()
defer mu.Unlock()
return len(dispatchTimes)
}).WithTimeout(10 * time.Second).WithPolling(50 * time.Millisecond).Should(Equal(5))
// Sort dispatch times and verify gaps
mu.Lock()
sort.Slice(dispatchTimes, func(i, j int) bool {
return dispatchTimes[i].Before(dispatchTimes[j])
})
times := make([]time.Time, len(dispatchTimes))
copy(times, dispatchTimes)
mu.Unlock()
// Consecutive dispatches should have at least ~160ms gap (80% of 200ms)
for i := 1; i < len(times); i++ {
gap := times[i].Sub(times[i-1])
Expect(gap).To(BeNumerically(">=", 160*time.Millisecond),
fmt.Sprintf("gap between dispatch %d and %d was %v, expected >= 160ms", i-1, i, gap))
}
})
})
Describe("Shutdown recovery", func() {
It("resets stale running tasks on CreateQueue", func() {
// Create a first service and queue, enqueue a task
service.invokeCallbackFn = func(ctx context.Context, _, _ string, _ []byte, _ int32) error {
<-ctx.Done()
return ctx.Err()
}
err := service.CreateQueue(ctx, "recovery-queue", host.QueueConfig{})
Expect(err).ToNot(HaveOccurred())
taskID, err := service.Enqueue(ctx, "recovery-queue", []byte("stale-task"))
Expect(err).ToNot(HaveOccurred())
// Wait for the task to start running
Eventually(func() string {
status, _ := service.GetTaskStatus(ctx, taskID)
return status
}).WithTimeout(5 * time.Second).WithPolling(50 * time.Millisecond).Should(Equal("running"))
// Close the service (simulates crash - tasks left in running state)
service.Close()
// Create a new service pointing to the same DB
managerCtx2, cancel2 := context.WithCancel(ctx)
DeferCleanup(cancel2)
manager2 := &Manager{
plugins: make(map[string]*plugin),
ctx: managerCtx2,
}
service, err = newTaskQueueService("test_plugin", manager2, 5)
Expect(err).ToNot(HaveOccurred())
// Override callback to succeed
service.invokeCallbackFn = func(_ context.Context, _, _ string, _ []byte, _ int32) error {
return nil
}
// Re-create the queue - the upsert handles the existing row from the old service
err = service.CreateQueue(ctx, "recovery-queue", host.QueueConfig{})
Expect(err).ToNot(HaveOccurred())
// The stale running task should now be reset to pending and eventually completed
Eventually(func() string {
status, _ := service.GetTaskStatus(ctx, taskID)
return status
}).WithTimeout(10 * time.Second).WithPolling(50 * time.Millisecond).Should(Equal("completed"))
})
})
Describe("Close", func() {
It("prevents subsequent operations after close", func() {
err := service.CreateQueue(ctx, "close-test", host.QueueConfig{})
Expect(err).ToNot(HaveOccurred())
service.Close()
// After close, operations should fail
_, err = service.Enqueue(ctx, "close-test", []byte("data"))
Expect(err).To(HaveOccurred())
})
})
Describe("Plugin isolation", func() {
It("uses separate databases for different plugins", func() {
managerCtx2, cancel2 := context.WithCancel(ctx)
DeferCleanup(cancel2)
manager2 := &Manager{
plugins: make(map[string]*plugin),
ctx: managerCtx2,
}
service2, err := newTaskQueueService("other_plugin", manager2, 5)
Expect(err).ToNot(HaveOccurred())
defer service2.Close()
// Check that separate database files exist
_, err = os.Stat(filepath.Join(tmpDir, "plugins", "test_plugin", "taskqueue.db"))
Expect(err).ToNot(HaveOccurred())
_, err = os.Stat(filepath.Join(tmpDir, "plugins", "other_plugin", "taskqueue.db"))
Expect(err).ToNot(HaveOccurred())
// Both services should be able to create queues with the same name independently
service.invokeCallbackFn = func(_ context.Context, _, _ string, _ []byte, _ int32) error { return nil }
service2.invokeCallbackFn = func(_ context.Context, _, _ string, _ []byte, _ int32) error { return nil }
err = service.CreateQueue(ctx, "shared-name", host.QueueConfig{})
Expect(err).ToNot(HaveOccurred())
err = service2.CreateQueue(ctx, "shared-name", host.QueueConfig{})
Expect(err).ToNot(HaveOccurred())
// Enqueue to each and verify they work independently
taskID1, err := service.Enqueue(ctx, "shared-name", []byte("plugin1"))
Expect(err).ToNot(HaveOccurred())
taskID2, err := service2.Enqueue(ctx, "shared-name", []byte("plugin2"))
Expect(err).ToNot(HaveOccurred())
Expect(taskID1).ToNot(Equal(taskID2))
// Both should complete
Eventually(func() string {
status, _ := service.GetTaskStatus(ctx, taskID1)
return status
}).WithTimeout(5 * time.Second).WithPolling(50 * time.Millisecond).Should(Equal("completed"))
Eventually(func() string {
status, _ := service2.GetTaskStatus(ctx, taskID2)
return status
}).WithTimeout(5 * time.Second).WithPolling(50 * time.Millisecond).Should(Equal("completed"))
})
})
})
var _ = Describe("TaskQueueService Integration", Ordered, func() {
var manager *Manager
var tmpDir string
BeforeAll(func() {
var err error
tmpDir, err = os.MkdirTemp("", "taskqueue-integration-test-*")
Expect(err).ToNot(HaveOccurred())
// Copy the test-taskqueue plugin
srcPath := filepath.Join(testdataDir, "test-taskqueue"+PackageExtension)
destPath := filepath.Join(tmpDir, "test-taskqueue"+PackageExtension)
data, err := os.ReadFile(srcPath)
Expect(err).ToNot(HaveOccurred())
err = os.WriteFile(destPath, data, 0600)
Expect(err).ToNot(HaveOccurred())
// Compute SHA256 for the plugin
hash := sha256.Sum256(data)
hashHex := hex.EncodeToString(hash[:])
// Setup config
DeferCleanup(configtest.SetupConfig())
conf.Server.Plugins.Enabled = true
conf.Server.Plugins.Folder = tmpDir
conf.Server.Plugins.AutoReload = false
conf.Server.CacheFolder = filepath.Join(tmpDir, "cache")
conf.Server.DataFolder = tmpDir
// Setup mock DataStore with pre-enabled plugin
mockPluginRepo := tests.CreateMockPluginRepo()
mockPluginRepo.Permitted = true
mockPluginRepo.SetData(model.Plugins{{
ID: "test-taskqueue",
Path: destPath,
SHA256: hashHex,
Enabled: true,
}})
dataStore := &tests.MockDataStore{MockedPlugin: mockPluginRepo}
// Create and start manager
manager = &Manager{
plugins: make(map[string]*plugin),
ds: dataStore,
metrics: noopMetricsRecorder{},
subsonicRouter: http.NotFoundHandler(),
}
err = manager.Start(GinkgoT().Context())
Expect(err).ToNot(HaveOccurred())
DeferCleanup(func() {
_ = manager.Stop()
_ = os.RemoveAll(tmpDir)
})
})
// Helper types for calling the test plugin
type testQueueConfig struct {
Concurrency int32 `json:"concurrency,omitempty"`
MaxRetries int32 `json:"maxRetries,omitempty"`
BackoffMs int64 `json:"backoffMs,omitempty"`
DelayMs int64 `json:"delayMs,omitempty"`
RetentionMs int64 `json:"retentionMs,omitempty"`
}
type testTaskQueueInput struct {
Operation string `json:"operation"`
QueueName string `json:"queueName,omitempty"`
Config *testQueueConfig `json:"config,omitempty"`
Payload []byte `json:"payload,omitempty"`
TaskID string `json:"taskId,omitempty"`
}
type testTaskQueueOutput struct {
TaskID string `json:"taskId,omitempty"`
Status string `json:"status,omitempty"`
Error *string `json:"error,omitempty"`
}
callTestTaskQueue := func(ctx context.Context, input testTaskQueueInput) (*testTaskQueueOutput, error) {
manager.mu.RLock()
p := manager.plugins["test-taskqueue"]
manager.mu.RUnlock()
instance, err := p.instance(ctx)
if err != nil {
return nil, err
}
defer instance.Close(ctx)
inputBytes, _ := json.Marshal(input)
_, outputBytes, err := instance.Call("nd_test_taskqueue", inputBytes)
if err != nil {
return nil, err
}
var output testTaskQueueOutput
if err := json.Unmarshal(outputBytes, &output); err != nil {
return nil, err
}
if output.Error != nil {
return nil, errors.New(*output.Error)
}
return &output, nil
}
Describe("Plugin Loading", func() {
It("should load plugin with taskqueue permission and TaskWorker capability", func() {
manager.mu.RLock()
p, ok := manager.plugins["test-taskqueue"]
manager.mu.RUnlock()
Expect(ok).To(BeTrue())
Expect(p.manifest.Permissions).ToNot(BeNil())
Expect(p.manifest.Permissions.Taskqueue).ToNot(BeNil())
Expect(p.manifest.Permissions.Taskqueue.MaxConcurrency).To(Equal(10))
Expect(p.capabilities).To(ContainElement(CapabilityTaskWorker))
})
})
Describe("Create Queue", func() {
It("should create a queue without error", func() {
ctx := GinkgoT().Context()
_, err := callTestTaskQueue(ctx, testTaskQueueInput{
Operation: "create_queue",
QueueName: "test-create",
})
Expect(err).ToNot(HaveOccurred())
})
It("should return error for duplicate queue name", func() {
ctx := GinkgoT().Context()
_, err := callTestTaskQueue(ctx, testTaskQueueInput{
Operation: "create_queue",
QueueName: "test-dup",
})
Expect(err).ToNot(HaveOccurred())
_, err = callTestTaskQueue(ctx, testTaskQueueInput{
Operation: "create_queue",
QueueName: "test-dup",
})
Expect(err).To(HaveOccurred())
Expect(err.Error()).To(ContainSubstring("already exists"))
})
})
Describe("Enqueue and Task Completion", func() {
It("should enqueue a task and complete successfully", func() {
ctx := GinkgoT().Context()
// Create queue
_, err := callTestTaskQueue(ctx, testTaskQueueInput{
Operation: "create_queue",
QueueName: "test-complete",
})
Expect(err).ToNot(HaveOccurred())
// Enqueue task with payload "hello"
output, err := callTestTaskQueue(ctx, testTaskQueueInput{
Operation: "enqueue",
QueueName: "test-complete",
Payload: []byte("hello"),
})
Expect(err).ToNot(HaveOccurred())
Expect(output.TaskID).ToNot(BeEmpty())
taskID := output.TaskID
// Poll until completed
Eventually(func() string {
out, err := callTestTaskQueue(ctx, testTaskQueueInput{
Operation: "get_task_status",
TaskID: taskID,
})
if err != nil {
return "error"
}
return out.Status
}).WithTimeout(5 * time.Second).WithPolling(100 * time.Millisecond).Should(Equal("completed"))
})
})
Describe("Enqueue with Failure, No Retries", func() {
It("should fail when payload is 'fail' and maxRetries is 0", func() {
ctx := GinkgoT().Context()
// Create queue with no retries
_, err := callTestTaskQueue(ctx, testTaskQueueInput{
Operation: "create_queue",
QueueName: "test-fail-no-retry",
Config: &testQueueConfig{
MaxRetries: 0,
},
})
Expect(err).ToNot(HaveOccurred())
// Enqueue task that will fail
output, err := callTestTaskQueue(ctx, testTaskQueueInput{
Operation: "enqueue",
QueueName: "test-fail-no-retry",
Payload: []byte("fail"),
})
Expect(err).ToNot(HaveOccurred())
taskID := output.TaskID
// Poll until failed
Eventually(func() string {
out, err := callTestTaskQueue(ctx, testTaskQueueInput{
Operation: "get_task_status",
TaskID: taskID,
})
if err != nil {
return "error"
}
return out.Status
}).WithTimeout(5 * time.Second).WithPolling(100 * time.Millisecond).Should(Equal("failed"))
})
})
Describe("Enqueue with Retry Then Success", func() {
It("should retry and eventually succeed with 'fail-then-succeed' payload", func() {
ctx := GinkgoT().Context()
// Create queue with retries and short backoff
_, err := callTestTaskQueue(ctx, testTaskQueueInput{
Operation: "create_queue",
QueueName: "test-retry-succeed",
Config: &testQueueConfig{
MaxRetries: 2,
BackoffMs: 100,
},
})
Expect(err).ToNot(HaveOccurred())
// Enqueue task that fails on attempt < 2, then succeeds
output, err := callTestTaskQueue(ctx, testTaskQueueInput{
Operation: "enqueue",
QueueName: "test-retry-succeed",
Payload: []byte("fail-then-succeed"),
})
Expect(err).ToNot(HaveOccurred())
taskID := output.TaskID
// Poll until completed
Eventually(func() string {
out, err := callTestTaskQueue(ctx, testTaskQueueInput{
Operation: "get_task_status",
TaskID: taskID,
})
if err != nil {
return "error"
}
return out.Status
}).WithTimeout(5 * time.Second).WithPolling(100 * time.Millisecond).Should(Equal("completed"))
})
})
Describe("Cancel Pending Task", func() {
It("should cancel a pending task", func() {
ctx := GinkgoT().Context()
// Create queue with concurrency=1 and a large delay between dispatches.
// The first task completes immediately (burst token), the second is dequeued
// but blocks on the rate limiter. Tasks 3+ remain in 'pending' and can be cancelled.
_, err := callTestTaskQueue(ctx, testTaskQueueInput{
Operation: "create_queue",
QueueName: "test-cancel",
Config: &testQueueConfig{
Concurrency: 1,
DelayMs: 60000,
},
})
Expect(err).ToNot(HaveOccurred())
// Enqueue several tasks - the first will complete immediately,
// the second will be dequeued but block on the rate limiter (status=running),
// the rest will stay pending.
var taskIDs []string
for i := 0; i < 5; i++ {
output, err := callTestTaskQueue(ctx, testTaskQueueInput{
Operation: "enqueue",
QueueName: "test-cancel",
Payload: []byte("hello"),
})
Expect(err).ToNot(HaveOccurred())
taskIDs = append(taskIDs, output.TaskID)
}
// Wait for the first task to complete (it has no delay)
Eventually(func() string {
out, err := callTestTaskQueue(ctx, testTaskQueueInput{
Operation: "get_task_status",
TaskID: taskIDs[0],
})
if err != nil {
return "error"
}
return out.Status
}).WithTimeout(5 * time.Second).WithPolling(50 * time.Millisecond).Should(Equal("completed"))
// Give the worker a moment to dequeue the second task (which will
// block on the delay) so tasks 3+ stay in 'pending'
time.Sleep(100 * time.Millisecond)
// Cancel the last task - it should still be pending
lastTaskID := taskIDs[len(taskIDs)-1]
_, err = callTestTaskQueue(ctx, testTaskQueueInput{
Operation: "cancel_task",
TaskID: lastTaskID,
})
Expect(err).ToNot(HaveOccurred())
// Verify status is cancelled
statusOut, err := callTestTaskQueue(ctx, testTaskQueueInput{
Operation: "get_task_status",
TaskID: lastTaskID,
})
Expect(err).ToNot(HaveOccurred())
Expect(statusOut.Status).To(Equal("cancelled"))
})
})
Describe("Enqueue to Non-Existent Queue", func() {
It("should return error when enqueueing to a queue that does not exist", func() {
ctx := GinkgoT().Context()
_, err := callTestTaskQueue(ctx, testTaskQueueInput{
Operation: "enqueue",
QueueName: "nonexistent-queue",
Payload: []byte("payload"),
})
Expect(err).To(HaveOccurred())
Expect(err.Error()).To(ContainSubstring("does not exist"))
})
})
})

View File

@@ -128,23 +128,6 @@ var hostServices = []hostServiceEntry{
return host.RegisterHTTPHostFunctions(service), nil
},
},
{
name: "TaskQueue",
hasPermission: func(p *Permissions) bool { return p != nil && p.Taskqueue != nil },
create: func(ctx *serviceContext) ([]extism.HostFunction, io.Closer) {
perm := ctx.permissions.Taskqueue
maxConcurrency := int32(1)
if perm.MaxConcurrency > 0 {
maxConcurrency = int32(perm.MaxConcurrency)
}
service, err := newTaskQueueService(ctx.pluginName, ctx.manager, maxConcurrency)
if err != nil {
log.Error("Failed to create TaskQueue service", "plugin", ctx.pluginName, err)
return nil, nil
}
return host.RegisterTaskQueueHostFunctions(service), service
},
},
}
// extractManifest reads manifest from an .ndp package and computes its SHA-256 hash.

View File

@@ -110,9 +110,6 @@
},
"users": {
"$ref": "#/$defs/UsersPermission"
},
"taskqueue": {
"$ref": "#/$defs/TaskQueuePermission"
}
}
},
@@ -227,23 +224,6 @@
}
}
},
"TaskQueuePermission": {
"type": "object",
"description": "Task queue permissions for background task processing",
"additionalProperties": false,
"properties": {
"reason": {
"type": "string",
"description": "Explanation for why task queue access is needed"
},
"maxConcurrency": {
"type": "integer",
"description": "Maximum total concurrent workers across all queues. Default: 1",
"minimum": 1,
"default": 1
}
}
},
"UsersPermission": {
"type": "object",
"description": "Users service permissions for accessing user information",

View File

@@ -64,21 +64,6 @@ func ValidateWithCapabilities(m *Manifest, capabilities []Capability) error {
return fmt.Errorf("scrobbler capability requires 'users' permission to be declared in manifest")
}
}
// Scheduler permission requires SchedulerCallback capability
if m.Permissions != nil && m.Permissions.Scheduler != nil {
if !hasCapability(capabilities, CapabilityScheduler) {
return fmt.Errorf("'scheduler' permission requires plugin to export '%s' function", FuncSchedulerCallback)
}
}
// TaskQueue permission requires TaskWorker capability
if m.Permissions != nil && m.Permissions.Taskqueue != nil {
if !hasCapability(capabilities, CapabilityTaskWorker) {
return fmt.Errorf("'taskqueue' permission requires plugin to export '%s' function", FuncTaskWorkerCallback)
}
}
return nil
}

View File

@@ -181,9 +181,6 @@ type Permissions struct {
// Subsonicapi corresponds to the JSON schema field "subsonicapi".
Subsonicapi *SubsonicAPIPermission `json:"subsonicapi,omitempty" yaml:"subsonicapi,omitempty" mapstructure:"subsonicapi,omitempty"`
// Taskqueue corresponds to the JSON schema field "taskqueue".
Taskqueue *TaskQueuePermission `json:"taskqueue,omitempty" yaml:"taskqueue,omitempty" mapstructure:"taskqueue,omitempty"`
// Users corresponds to the JSON schema field "users".
Users *UsersPermission `json:"users,omitempty" yaml:"users,omitempty" mapstructure:"users,omitempty"`
@@ -203,36 +200,6 @@ type SubsonicAPIPermission struct {
Reason *string `json:"reason,omitempty" yaml:"reason,omitempty" mapstructure:"reason,omitempty"`
}
// Task queue permissions for background task processing
type TaskQueuePermission struct {
// Maximum total concurrent workers across all queues. Default: 1
MaxConcurrency int `json:"maxConcurrency,omitempty" yaml:"maxConcurrency,omitempty" mapstructure:"maxConcurrency,omitempty"`
// Explanation for why task queue access is needed
Reason *string `json:"reason,omitempty" yaml:"reason,omitempty" mapstructure:"reason,omitempty"`
}
// UnmarshalJSON implements json.Unmarshaler.
func (j *TaskQueuePermission) UnmarshalJSON(value []byte) error {
var raw map[string]interface{}
if err := json.Unmarshal(value, &raw); err != nil {
return err
}
type Plain TaskQueuePermission
var plain Plain
if err := json.Unmarshal(value, &plain); err != nil {
return err
}
if v, ok := raw["maxConcurrency"]; !ok || v == nil {
plain.MaxConcurrency = 1.0
}
if 1 > plain.MaxConcurrency {
return fmt.Errorf("field %s: must be >= %v", "maxConcurrency", 1)
}
*j = TaskQueuePermission(plain)
return nil
}
// Enable experimental WebAssembly threads support
type ThreadsFeature struct {
// Explanation for why threads support is needed

View File

@@ -6,10 +6,3 @@ require (
github.com/extism/go-pdk v1.1.3
github.com/stretchr/testify v1.11.1
)
require (
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/stretchr/objx v0.5.2 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
)

View File

@@ -43,7 +43,6 @@ The following host services are available:
- Library: provides access to music library metadata for plugins.
- Scheduler: provides task scheduling capabilities for plugins.
- SubsonicAPI: provides access to Navidrome's Subsonic API from plugins.
- TaskQueue: provides persistent task queues for plugins.
- Users: provides access to user information for plugins.
- WebSocket: provides WebSocket communication capabilities for plugins.

View File

@@ -14,7 +14,6 @@ import (
"github.com/navidrome/navidrome/plugins/pdk/go/pdk"
)
// HTTPRequest represents the HTTPRequest data structure.
// HTTPRequest represents an outbound HTTP request from a plugin.
type HTTPRequest struct {
Method string `json:"method"`
@@ -24,7 +23,6 @@ type HTTPRequest struct {
TimeoutMs int32 `json:"timeoutMs"`
}
// HTTPResponse represents the HTTPResponse data structure.
// HTTPResponse represents the response from an outbound HTTP request.
type HTTPResponse struct {
StatusCode int32 `json:"statusCode"`
@@ -37,11 +35,11 @@ type HTTPResponse struct {
//go:wasmimport extism:host/user http_send
func http_send(uint64) uint64
type hTTPSendRequest struct {
type httpSendRequest struct {
Request HTTPRequest `json:"request"`
}
type hTTPSendResponse struct {
type httpSendResponse struct {
Result *HTTPResponse `json:"result,omitempty"`
Error string `json:"error,omitempty"`
}
@@ -57,7 +55,7 @@ type hTTPSendResponse struct {
// Successful HTTP calls (including 4xx/5xx status codes) return a non-nil response with nil error.
func HTTPSend(request HTTPRequest) (*HTTPResponse, error) {
// Marshal request to JSON
req := hTTPSendRequest{
req := httpSendRequest{
Request: request,
}
reqBytes, err := json.Marshal(req)
@@ -75,7 +73,7 @@ func HTTPSend(request HTTPRequest) (*HTTPResponse, error) {
responseBytes := responseMem.ReadBytes()
// Parse the response
var response hTTPSendResponse
var response httpSendResponse
if err := json.Unmarshal(responseBytes, &response); err != nil {
return nil, err
}

View File

@@ -10,7 +10,6 @@ package host
import "github.com/stretchr/testify/mock"
// HTTPRequest represents the HTTPRequest data structure.
// HTTPRequest represents an outbound HTTP request from a plugin.
type HTTPRequest struct {
Method string `json:"method"`
@@ -20,7 +19,6 @@ type HTTPRequest struct {
TimeoutMs int32 `json:"timeoutMs"`
}
// HTTPResponse represents the HTTPResponse data structure.
// HTTPResponse represents the response from an outbound HTTP request.
type HTTPResponse struct {
StatusCode int32 `json:"statusCode"`

View File

@@ -1,219 +0,0 @@
// Code generated by ndpgen. DO NOT EDIT.
//
// This file contains client wrappers for the TaskQueue host service.
// It is intended for use in Navidrome plugins built with TinyGo.
//
//go:build wasip1
package host
import (
"encoding/json"
"errors"
"github.com/navidrome/navidrome/plugins/pdk/go/pdk"
)
// QueueConfig represents the QueueConfig data structure.
// QueueConfig holds configuration for a task queue.
type QueueConfig struct {
Concurrency int32 `json:"concurrency"`
MaxRetries int32 `json:"maxRetries"`
BackoffMs int64 `json:"backoffMs"`
DelayMs int64 `json:"delayMs"`
RetentionMs int64 `json:"retentionMs"`
}
// taskqueue_createqueue is the host function provided by Navidrome.
//
//go:wasmimport extism:host/user taskqueue_createqueue
func taskqueue_createqueue(uint64) uint64
// taskqueue_enqueue is the host function provided by Navidrome.
//
//go:wasmimport extism:host/user taskqueue_enqueue
func taskqueue_enqueue(uint64) uint64
// taskqueue_gettaskstatus is the host function provided by Navidrome.
//
//go:wasmimport extism:host/user taskqueue_gettaskstatus
func taskqueue_gettaskstatus(uint64) uint64
// taskqueue_canceltask is the host function provided by Navidrome.
//
//go:wasmimport extism:host/user taskqueue_canceltask
func taskqueue_canceltask(uint64) uint64
type taskQueueCreateQueueRequest struct {
Name string `json:"name"`
Config QueueConfig `json:"config"`
}
type taskQueueEnqueueRequest struct {
QueueName string `json:"queueName"`
Payload []byte `json:"payload"`
}
type taskQueueEnqueueResponse struct {
Result string `json:"result,omitempty"`
Error string `json:"error,omitempty"`
}
type taskQueueGetTaskStatusRequest struct {
TaskID string `json:"taskId"`
}
type taskQueueGetTaskStatusResponse struct {
Result string `json:"result,omitempty"`
Error string `json:"error,omitempty"`
}
type taskQueueCancelTaskRequest struct {
TaskID string `json:"taskId"`
}
// TaskQueueCreateQueue calls the taskqueue_createqueue host function.
// CreateQueue creates a named task queue with the given configuration.
// Zero-value fields in config use sensible defaults.
// If a queue with the same name already exists, returns an error.
// On startup, this also recovers any stale "running" tasks from a previous crash.
func TaskQueueCreateQueue(name string, config QueueConfig) error {
// Marshal request to JSON
req := taskQueueCreateQueueRequest{
Name: name,
Config: config,
}
reqBytes, err := json.Marshal(req)
if err != nil {
return err
}
reqMem := pdk.AllocateBytes(reqBytes)
defer reqMem.Free()
// Call the host function
responsePtr := taskqueue_createqueue(reqMem.Offset())
// Read the response from memory
responseMem := pdk.FindMemory(responsePtr)
responseBytes := responseMem.ReadBytes()
// Parse error-only response
var response struct {
Error string `json:"error,omitempty"`
}
if err := json.Unmarshal(responseBytes, &response); err != nil {
return err
}
if response.Error != "" {
return errors.New(response.Error)
}
return nil
}
// TaskQueueEnqueue calls the taskqueue_enqueue host function.
// Enqueue adds a task to the named queue. Returns the task ID.
// payload is opaque bytes passed back to the plugin on execution.
func TaskQueueEnqueue(queueName string, payload []byte) (string, error) {
// Marshal request to JSON
req := taskQueueEnqueueRequest{
QueueName: queueName,
Payload: payload,
}
reqBytes, err := json.Marshal(req)
if err != nil {
return "", err
}
reqMem := pdk.AllocateBytes(reqBytes)
defer reqMem.Free()
// Call the host function
responsePtr := taskqueue_enqueue(reqMem.Offset())
// Read the response from memory
responseMem := pdk.FindMemory(responsePtr)
responseBytes := responseMem.ReadBytes()
// Parse the response
var response taskQueueEnqueueResponse
if err := json.Unmarshal(responseBytes, &response); err != nil {
return "", err
}
// Convert Error field to Go error
if response.Error != "" {
return "", errors.New(response.Error)
}
return response.Result, nil
}
// TaskQueueGetTaskStatus calls the taskqueue_gettaskstatus host function.
// GetTaskStatus returns the status of a task: "pending", "running",
// "completed", "failed", or "cancelled".
func TaskQueueGetTaskStatus(taskID string) (string, error) {
// Marshal request to JSON
req := taskQueueGetTaskStatusRequest{
TaskID: taskID,
}
reqBytes, err := json.Marshal(req)
if err != nil {
return "", err
}
reqMem := pdk.AllocateBytes(reqBytes)
defer reqMem.Free()
// Call the host function
responsePtr := taskqueue_gettaskstatus(reqMem.Offset())
// Read the response from memory
responseMem := pdk.FindMemory(responsePtr)
responseBytes := responseMem.ReadBytes()
// Parse the response
var response taskQueueGetTaskStatusResponse
if err := json.Unmarshal(responseBytes, &response); err != nil {
return "", err
}
// Convert Error field to Go error
if response.Error != "" {
return "", errors.New(response.Error)
}
return response.Result, nil
}
// TaskQueueCancelTask calls the taskqueue_canceltask host function.
// CancelTask cancels a pending task. Returns error if already
// running, completed, or failed.
func TaskQueueCancelTask(taskID string) error {
// Marshal request to JSON
req := taskQueueCancelTaskRequest{
TaskID: taskID,
}
reqBytes, err := json.Marshal(req)
if err != nil {
return err
}
reqMem := pdk.AllocateBytes(reqBytes)
defer reqMem.Free()
// Call the host function
responsePtr := taskqueue_canceltask(reqMem.Offset())
// Read the response from memory
responseMem := pdk.FindMemory(responsePtr)
responseBytes := responseMem.ReadBytes()
// Parse error-only response
var response struct {
Error string `json:"error,omitempty"`
}
if err := json.Unmarshal(responseBytes, &response); err != nil {
return err
}
if response.Error != "" {
return errors.New(response.Error)
}
return nil
}

View File

@@ -1,84 +0,0 @@
// Code generated by ndpgen. DO NOT EDIT.
//
// This file contains mock implementations for non-WASM builds.
// These mocks allow IDE support, compilation, and unit testing on non-WASM platforms.
// Plugin authors can use the exported mock instances to set expectations in tests.
//
//go:build !wasip1
package host
import "github.com/stretchr/testify/mock"
// QueueConfig represents the QueueConfig data structure.
// QueueConfig holds configuration for a task queue.
type QueueConfig struct {
Concurrency int32 `json:"concurrency"`
MaxRetries int32 `json:"maxRetries"`
BackoffMs int64 `json:"backoffMs"`
DelayMs int64 `json:"delayMs"`
RetentionMs int64 `json:"retentionMs"`
}
// mockTaskQueueService is the mock implementation for testing.
type mockTaskQueueService struct {
mock.Mock
}
// TaskQueueMock is the auto-instantiated mock instance for testing.
// Use this to set expectations: host.TaskQueueMock.On("MethodName", args...).Return(values...)
var TaskQueueMock = &mockTaskQueueService{}
// CreateQueue is the mock method for TaskQueueCreateQueue.
func (m *mockTaskQueueService) CreateQueue(name string, config QueueConfig) error {
args := m.Called(name, config)
return args.Error(0)
}
// TaskQueueCreateQueue delegates to the mock instance.
// CreateQueue creates a named task queue with the given configuration.
// Zero-value fields in config use sensible defaults.
// If a queue with the same name already exists, returns an error.
// On startup, this also recovers any stale "running" tasks from a previous crash.
func TaskQueueCreateQueue(name string, config QueueConfig) error {
return TaskQueueMock.CreateQueue(name, config)
}
// Enqueue is the mock method for TaskQueueEnqueue.
func (m *mockTaskQueueService) Enqueue(queueName string, payload []byte) (string, error) {
args := m.Called(queueName, payload)
return args.String(0), args.Error(1)
}
// TaskQueueEnqueue delegates to the mock instance.
// Enqueue adds a task to the named queue. Returns the task ID.
// payload is opaque bytes passed back to the plugin on execution.
func TaskQueueEnqueue(queueName string, payload []byte) (string, error) {
return TaskQueueMock.Enqueue(queueName, payload)
}
// GetTaskStatus is the mock method for TaskQueueGetTaskStatus.
func (m *mockTaskQueueService) GetTaskStatus(taskID string) (string, error) {
args := m.Called(taskID)
return args.String(0), args.Error(1)
}
// TaskQueueGetTaskStatus delegates to the mock instance.
// GetTaskStatus returns the status of a task: "pending", "running",
// "completed", "failed", or "cancelled".
func TaskQueueGetTaskStatus(taskID string) (string, error) {
return TaskQueueMock.GetTaskStatus(taskID)
}
// CancelTask is the mock method for TaskQueueCancelTask.
func (m *mockTaskQueueService) CancelTask(taskID string) error {
args := m.Called(taskID)
return args.Error(0)
}
// TaskQueueCancelTask delegates to the mock instance.
// CancelTask cancels a pending task. Returns error if already
// running, completed, or failed.
func TaskQueueCancelTask(taskID string) error {
return TaskQueueMock.CancelTask(taskID)
}

View File

@@ -1,86 +0,0 @@
// Code generated by ndpgen. DO NOT EDIT.
//
// This file contains export wrappers for the TaskWorker capability.
// It is intended for use in Navidrome plugins built with TinyGo.
//
//go:build wasip1
package taskworker
import (
"github.com/navidrome/navidrome/plugins/pdk/go/pdk"
)
// TaskExecuteRequest is the request provided when a task is ready to execute.
type TaskExecuteRequest struct {
// QueueName is the name of the queue this task belongs to.
QueueName string `json:"queueName"`
// TaskID is the unique identifier for this task.
TaskID string `json:"taskId"`
// Payload is the opaque data provided when the task was enqueued.
Payload []byte `json:"payload"`
// Attempt is the current attempt number (1-based: first attempt = 1).
Attempt int32 `json:"attempt"`
}
// TaskExecuteResponse is the response from task execution.
type TaskExecuteResponse struct {
// Error, if non-empty, indicates the task failed. The task will be retried
// if retries are configured and attempts remain.
Error string `json:"error,omitempty"`
}
// TaskWorker is the marker interface for taskworker plugins.
// Implement one or more of the provider interfaces below.
// TaskWorker provides task execution handling.
// This capability allows plugins to receive callbacks when their queued tasks
// are ready to execute. Plugins that use the taskqueue host service must
// implement this capability.
type TaskWorker interface{}
// TaskExecuteProvider provides the OnTaskExecute function.
type TaskExecuteProvider interface {
OnTaskExecute(TaskExecuteRequest) (TaskExecuteResponse, error)
} // Internal implementation holders
var (
taskExecuteImpl func(TaskExecuteRequest) (TaskExecuteResponse, error)
)
// Register registers a taskworker implementation.
// The implementation is checked for optional provider interfaces.
func Register(impl TaskWorker) {
if p, ok := impl.(TaskExecuteProvider); ok {
taskExecuteImpl = p.OnTaskExecute
}
}
// NotImplementedCode is the standard return code for unimplemented functions.
// The host recognizes this and skips the plugin gracefully.
const NotImplementedCode int32 = -2
//go:wasmexport nd_task_execute
func _NdTaskExecute() int32 {
if taskExecuteImpl == nil {
// Return standard code - host will skip this plugin gracefully
return NotImplementedCode
}
var input TaskExecuteRequest
if err := pdk.InputJSON(&input); err != nil {
pdk.SetError(err)
return -1
}
output, err := taskExecuteImpl(input)
if err != nil {
pdk.SetError(err)
return -1
}
if err := pdk.OutputJSON(output); err != nil {
pdk.SetError(err)
return -1
}
return 0
}

View File

@@ -1,48 +0,0 @@
// Code generated by ndpgen. DO NOT EDIT.
//
// This file provides stub implementations for non-WASM platforms.
// It allows Go plugins to compile and run tests outside of WASM,
// but the actual functionality is only available in WASM builds.
//
//go:build !wasip1
package taskworker
// TaskExecuteRequest is the request provided when a task is ready to execute.
type TaskExecuteRequest struct {
// QueueName is the name of the queue this task belongs to.
QueueName string `json:"queueName"`
// TaskID is the unique identifier for this task.
TaskID string `json:"taskId"`
// Payload is the opaque data provided when the task was enqueued.
Payload []byte `json:"payload"`
// Attempt is the current attempt number (1-based: first attempt = 1).
Attempt int32 `json:"attempt"`
}
// TaskExecuteResponse is the response from task execution.
type TaskExecuteResponse struct {
// Error, if non-empty, indicates the task failed. The task will be retried
// if retries are configured and attempts remain.
Error string `json:"error,omitempty"`
}
// TaskWorker is the marker interface for taskworker plugins.
// Implement one or more of the provider interfaces below.
// TaskWorker provides task execution handling.
// This capability allows plugins to receive callbacks when their queued tasks
// are ready to execute. Plugins that use the taskqueue host service must
// implement this capability.
type TaskWorker interface{}
// TaskExecuteProvider provides the OnTaskExecute function.
type TaskExecuteProvider interface {
OnTaskExecute(TaskExecuteRequest) (TaskExecuteResponse, error)
}
// NotImplementedCode is the standard return code for unimplemented functions.
const NotImplementedCode int32 = -2
// Register is a no-op on non-WASM platforms.
// This stub allows code to compile outside of WASM.
func Register(_ TaskWorker) {}

View File

@@ -1,59 +0,0 @@
# Code generated by ndpgen. DO NOT EDIT.
#
# This file contains client wrappers for the HTTP host service.
# It is intended for use in Navidrome plugins built with extism-py.
#
# IMPORTANT: Due to a limitation in extism-py, you cannot import this file directly.
# The @extism.import_fn decorators are only detected when defined in the plugin's
# main __init__.py file. Copy the needed functions from this file into your plugin.
from dataclasses import dataclass
from typing import Any
import extism
import json
class HostFunctionError(Exception):
"""Raised when a host function returns an error."""
pass
@extism.import_fn("extism:host/user", "http_send")
def _http_send(offset: int) -> int:
"""Raw host function - do not call directly."""
...
def http_send(request: Any) -> Any:
"""Send executes an HTTP request and returns the response.
Parameters:
- request: The HTTP request to execute, including method, URL, headers, body, and timeout
Returns the HTTP response with status code, headers, and body.
Network errors, timeouts, and permission failures are returned as Go errors.
Successful HTTP calls (including 4xx/5xx status codes) return a non-nil response with nil error.
Args:
request: Any parameter.
Returns:
Any: The result value.
Raises:
HostFunctionError: If the host function returns an error.
"""
request = {
"request": request,
}
request_bytes = json.dumps(request).encode("utf-8")
request_mem = extism.memory.alloc(request_bytes)
response_offset = _http_send(request_mem.offset)
response_mem = extism.memory.find(response_offset)
response = json.loads(extism.memory.string(response_mem))
if response.get("error"):
raise HostFunctionError(response["error"])
return response.get("result", None)

View File

@@ -1,153 +0,0 @@
# Code generated by ndpgen. DO NOT EDIT.
#
# This file contains client wrappers for the TaskQueue host service.
# It is intended for use in Navidrome plugins built with extism-py.
#
# IMPORTANT: Due to a limitation in extism-py, you cannot import this file directly.
# The @extism.import_fn decorators are only detected when defined in the plugin's
# main __init__.py file. Copy the needed functions from this file into your plugin.
from dataclasses import dataclass
from typing import Any
import extism
import json
class HostFunctionError(Exception):
"""Raised when a host function returns an error."""
pass
@extism.import_fn("extism:host/user", "taskqueue_createqueue")
def _taskqueue_createqueue(offset: int) -> int:
"""Raw host function - do not call directly."""
...
@extism.import_fn("extism:host/user", "taskqueue_enqueue")
def _taskqueue_enqueue(offset: int) -> int:
"""Raw host function - do not call directly."""
...
@extism.import_fn("extism:host/user", "taskqueue_gettaskstatus")
def _taskqueue_gettaskstatus(offset: int) -> int:
"""Raw host function - do not call directly."""
...
@extism.import_fn("extism:host/user", "taskqueue_canceltask")
def _taskqueue_canceltask(offset: int) -> int:
"""Raw host function - do not call directly."""
...
def taskqueue_create_queue(name: str, config: Any) -> None:
"""CreateQueue creates a named task queue with the given configuration.
Zero-value fields in config use sensible defaults.
If a queue with the same name already exists, returns an error.
On startup, this also recovers any stale "running" tasks from a previous crash.
Args:
name: str parameter.
config: Any parameter.
Raises:
HostFunctionError: If the host function returns an error.
"""
request = {
"name": name,
"config": config,
}
request_bytes = json.dumps(request).encode("utf-8")
request_mem = extism.memory.alloc(request_bytes)
response_offset = _taskqueue_createqueue(request_mem.offset)
response_mem = extism.memory.find(response_offset)
response = json.loads(extism.memory.string(response_mem))
if response.get("error"):
raise HostFunctionError(response["error"])
def taskqueue_enqueue(queue_name: str, payload: bytes) -> str:
"""Enqueue adds a task to the named queue. Returns the task ID.
payload is opaque bytes passed back to the plugin on execution.
Args:
queue_name: str parameter.
payload: bytes parameter.
Returns:
str: The result value.
Raises:
HostFunctionError: If the host function returns an error.
"""
request = {
"queueName": queue_name,
"payload": payload,
}
request_bytes = json.dumps(request).encode("utf-8")
request_mem = extism.memory.alloc(request_bytes)
response_offset = _taskqueue_enqueue(request_mem.offset)
response_mem = extism.memory.find(response_offset)
response = json.loads(extism.memory.string(response_mem))
if response.get("error"):
raise HostFunctionError(response["error"])
return response.get("result", "")
def taskqueue_get_task_status(task_id: str) -> str:
"""GetTaskStatus returns the status of a task: "pending", "running",
"completed", "failed", or "cancelled".
Args:
task_id: str parameter.
Returns:
str: The result value.
Raises:
HostFunctionError: If the host function returns an error.
"""
request = {
"taskId": task_id,
}
request_bytes = json.dumps(request).encode("utf-8")
request_mem = extism.memory.alloc(request_bytes)
response_offset = _taskqueue_gettaskstatus(request_mem.offset)
response_mem = extism.memory.find(response_offset)
response = json.loads(extism.memory.string(response_mem))
if response.get("error"):
raise HostFunctionError(response["error"])
return response.get("result", "")
def taskqueue_cancel_task(task_id: str) -> None:
"""CancelTask cancels a pending task. Returns error if already
running, completed, or failed.
Args:
task_id: str parameter.
Raises:
HostFunctionError: If the host function returns an error.
"""
request = {
"taskId": task_id,
}
request_bytes = json.dumps(request).encode("utf-8")
request_mem = extism.memory.alloc(request_bytes)
response_offset = _taskqueue_canceltask(request_mem.offset)
response_mem = extism.memory.find(response_offset)
response = json.loads(extism.memory.string(response_mem))
if response.get("error"):
raise HostFunctionError(response["error"])

View File

@@ -9,5 +9,4 @@ pub mod lifecycle;
pub mod metadata;
pub mod scheduler;
pub mod scrobbler;
pub mod taskworker;
pub mod websocket;

View File

@@ -1,87 +0,0 @@
// Code generated by ndpgen. DO NOT EDIT.
//
// This file contains export wrappers for the TaskWorker capability.
// It is intended for use in Navidrome plugins built with extism-pdk.
use serde::{Deserialize, Serialize};
// Helper functions for skip_serializing_if with numeric types
#[allow(dead_code)]
fn is_zero_i32(value: &i32) -> bool { *value == 0 }
#[allow(dead_code)]
fn is_zero_u32(value: &u32) -> bool { *value == 0 }
#[allow(dead_code)]
fn is_zero_i64(value: &i64) -> bool { *value == 0 }
#[allow(dead_code)]
fn is_zero_u64(value: &u64) -> bool { *value == 0 }
#[allow(dead_code)]
fn is_zero_f32(value: &f32) -> bool { *value == 0.0 }
#[allow(dead_code)]
fn is_zero_f64(value: &f64) -> bool { *value == 0.0 }
/// TaskExecuteRequest is the request provided when a task is ready to execute.
#[derive(Debug, Clone, Default, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct TaskExecuteRequest {
/// QueueName is the name of the queue this task belongs to.
#[serde(default)]
pub queue_name: String,
/// TaskID is the unique identifier for this task.
#[serde(default)]
pub task_id: String,
/// Payload is the opaque data provided when the task was enqueued.
#[serde(default)]
pub payload: Vec<u8>,
/// Attempt is the current attempt number (1-based: first attempt = 1).
#[serde(default)]
pub attempt: i32,
}
/// TaskExecuteResponse is the response from task execution.
#[derive(Debug, Clone, Default, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct TaskExecuteResponse {
/// Error, if non-empty, indicates the task failed. The task will be retried
/// if retries are configured and attempts remain.
#[serde(default, skip_serializing_if = "String::is_empty")]
pub error: String,
}
/// Error represents an error from a capability method.
#[derive(Debug)]
pub struct Error {
pub message: String,
}
impl std::fmt::Display for Error {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{}", self.message)
}
}
impl std::error::Error for Error {}
impl Error {
pub fn new(message: impl Into<String>) -> Self {
Self { message: message.into() }
}
}
/// TaskExecuteProvider provides the OnTaskExecute function.
pub trait TaskExecuteProvider {
fn on_task_execute(&self, req: TaskExecuteRequest) -> Result<TaskExecuteResponse, Error>;
}
/// Register the on_task_execute export.
/// This macro generates the WASM export function for this method.
#[macro_export]
macro_rules! register_taskworker_task_execute {
($plugin_type:ty) => {
#[extism_pdk::plugin_fn]
pub fn nd_task_execute(
req: extism_pdk::Json<$crate::taskworker::TaskExecuteRequest>
) -> extism_pdk::FnResult<extism_pdk::Json<$crate::taskworker::TaskExecuteResponse>> {
let plugin = <$plugin_type>::default();
let result = $crate::taskworker::TaskExecuteProvider::on_task_execute(&plugin, req.into_inner())?;
Ok(extism_pdk::Json(result))
}
};
}

View File

@@ -40,7 +40,6 @@
//! - [`library`] - provides access to music library metadata for plugins.
//! - [`scheduler`] - provides task scheduling capabilities for plugins.
//! - [`subsonicapi`] - provides access to Navidrome's Subsonic API from plugins.
//! - [`taskqueue`] - provides persistent task queues for plugins.
//! - [`users`] - provides access to user information for plugins.
//! - [`websocket`] - provides WebSocket communication capabilities for plugins.
@@ -100,13 +99,6 @@ pub mod subsonicapi {
pub use super::nd_host_subsonicapi::*;
}
#[doc(hidden)]
mod nd_host_taskqueue;
/// provides persistent task queues for plugins.
pub mod taskqueue {
pub use super::nd_host_taskqueue::*;
}
#[doc(hidden)]
mod nd_host_users;
/// provides access to user information for plugins.

View File

@@ -9,7 +9,7 @@ use serde::{Deserialize, Serialize};
/// HTTPRequest represents an outbound HTTP request from a plugin.
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct HTTPRequest {
pub struct HttpRequest {
pub method: String,
pub url: String,
#[serde(default)]
@@ -23,7 +23,7 @@ pub struct HTTPRequest {
/// HTTPResponse represents the response from an outbound HTTP request.
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct HTTPResponse {
pub struct HttpResponse {
pub status_code: i32,
#[serde(default)]
pub headers: std::collections::HashMap<String, String>,
@@ -34,14 +34,14 @@ pub struct HTTPResponse {
#[derive(Debug, Clone, Serialize)]
#[serde(rename_all = "camelCase")]
struct HTTPSendRequest {
request: HTTPRequest,
request: HttpRequest,
}
#[derive(Debug, Clone, Deserialize)]
#[serde(rename_all = "camelCase")]
struct HTTPSendResponse {
#[serde(default)]
result: Option<HTTPResponse>,
result: Option<HttpResponse>,
#[serde(default)]
error: Option<String>,
}
@@ -52,23 +52,23 @@ extern "ExtismHost" {
}
/// Send executes an HTTP request and returns the response.
///
///
/// Parameters:
/// - request: The HTTP request to execute, including method, URL, headers, body, and timeout
///
///
/// Returns the HTTP response with status code, headers, and body.
/// Network errors, timeouts, and permission failures are returned as Go errors.
/// Network errors, timeouts, and permission failures are returned as errors.
/// Successful HTTP calls (including 4xx/5xx status codes) return a non-nil response with nil error.
///
/// # Arguments
/// * `request` - HTTPRequest parameter.
/// * `request` - HttpRequest parameter.
///
/// # Returns
/// The result value.
///
/// # Errors
/// Returns an error if the host function call fails.
pub fn send(request: HTTPRequest) -> Result<Option<HTTPResponse>, Error> {
pub fn send(request: HttpRequest) -> Result<Option<HttpResponse>, Error> {
let response = unsafe {
http_send(Json(HTTPSendRequest {
request: request,

View File

@@ -1,184 +0,0 @@
// Code generated by ndpgen. DO NOT EDIT.
//
// This file contains client wrappers for the TaskQueue host service.
// It is intended for use in Navidrome plugins built with extism-pdk.
use extism_pdk::*;
use serde::{Deserialize, Serialize};
/// QueueConfig holds configuration for a task queue.
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct QueueConfig {
pub concurrency: i32,
pub max_retries: i32,
pub backoff_ms: i64,
pub delay_ms: i64,
pub retention_ms: i64,
}
#[derive(Debug, Clone, Serialize)]
#[serde(rename_all = "camelCase")]
struct TaskQueueCreateQueueRequest {
name: String,
config: QueueConfig,
}
#[derive(Debug, Clone, Deserialize)]
#[serde(rename_all = "camelCase")]
struct TaskQueueCreateQueueResponse {
#[serde(default)]
error: Option<String>,
}
#[derive(Debug, Clone, Serialize)]
#[serde(rename_all = "camelCase")]
struct TaskQueueEnqueueRequest {
queue_name: String,
payload: Vec<u8>,
}
#[derive(Debug, Clone, Deserialize)]
#[serde(rename_all = "camelCase")]
struct TaskQueueEnqueueResponse {
#[serde(default)]
result: String,
#[serde(default)]
error: Option<String>,
}
#[derive(Debug, Clone, Serialize)]
#[serde(rename_all = "camelCase")]
struct TaskQueueGetTaskStatusRequest {
task_id: String,
}
#[derive(Debug, Clone, Deserialize)]
#[serde(rename_all = "camelCase")]
struct TaskQueueGetTaskStatusResponse {
#[serde(default)]
result: String,
#[serde(default)]
error: Option<String>,
}
#[derive(Debug, Clone, Serialize)]
#[serde(rename_all = "camelCase")]
struct TaskQueueCancelTaskRequest {
task_id: String,
}
#[derive(Debug, Clone, Deserialize)]
#[serde(rename_all = "camelCase")]
struct TaskQueueCancelTaskResponse {
#[serde(default)]
error: Option<String>,
}
#[host_fn]
extern "ExtismHost" {
fn taskqueue_createqueue(input: Json<TaskQueueCreateQueueRequest>) -> Json<TaskQueueCreateQueueResponse>;
fn taskqueue_enqueue(input: Json<TaskQueueEnqueueRequest>) -> Json<TaskQueueEnqueueResponse>;
fn taskqueue_gettaskstatus(input: Json<TaskQueueGetTaskStatusRequest>) -> Json<TaskQueueGetTaskStatusResponse>;
fn taskqueue_canceltask(input: Json<TaskQueueCancelTaskRequest>) -> Json<TaskQueueCancelTaskResponse>;
}
/// CreateQueue creates a named task queue with the given configuration.
/// Zero-value fields in config use sensible defaults.
/// If a queue with the same name already exists, returns an error.
/// On startup, this also recovers any stale "running" tasks from a previous crash.
///
/// # Arguments
/// * `name` - String parameter.
/// * `config` - QueueConfig parameter.
///
/// # Errors
/// Returns an error if the host function call fails.
pub fn create_queue(name: &str, config: QueueConfig) -> Result<(), Error> {
let response = unsafe {
taskqueue_createqueue(Json(TaskQueueCreateQueueRequest {
name: name.to_owned(),
config: config,
}))?
};
if let Some(err) = response.0.error {
return Err(Error::msg(err));
}
Ok(())
}
/// Enqueue adds a task to the named queue. Returns the task ID.
/// payload is opaque bytes passed back to the plugin on execution.
///
/// # Arguments
/// * `queue_name` - String parameter.
/// * `payload` - Vec<u8> parameter.
///
/// # Returns
/// The result value.
///
/// # Errors
/// Returns an error if the host function call fails.
pub fn enqueue(queue_name: &str, payload: Vec<u8>) -> Result<String, Error> {
let response = unsafe {
taskqueue_enqueue(Json(TaskQueueEnqueueRequest {
queue_name: queue_name.to_owned(),
payload: payload,
}))?
};
if let Some(err) = response.0.error {
return Err(Error::msg(err));
}
Ok(response.0.result)
}
/// GetTaskStatus returns the status of a task: "pending", "running",
/// "completed", "failed", or "cancelled".
///
/// # Arguments
/// * `task_id` - String parameter.
///
/// # Returns
/// The result value.
///
/// # Errors
/// Returns an error if the host function call fails.
pub fn get_task_status(task_id: &str) -> Result<String, Error> {
let response = unsafe {
taskqueue_gettaskstatus(Json(TaskQueueGetTaskStatusRequest {
task_id: task_id.to_owned(),
}))?
};
if let Some(err) = response.0.error {
return Err(Error::msg(err));
}
Ok(response.0.result)
}
/// CancelTask cancels a pending task. Returns error if already
/// running, completed, or failed.
///
/// # Arguments
/// * `task_id` - String parameter.
///
/// # Errors
/// Returns an error if the host function call fails.
pub fn cancel_task(task_id: &str) -> Result<(), Error> {
let response = unsafe {
taskqueue_canceltask(Json(TaskQueueCancelTaskRequest {
task_id: task_id.to_owned(),
}))?
};
if let Some(err) = response.0.error {
return Err(Error::msg(err));
}
Ok(())
}

View File

@@ -1,16 +0,0 @@
module test-taskqueue
go 1.25
require github.com/navidrome/navidrome/plugins/pdk/go v0.0.0
require (
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/extism/go-pdk v1.1.3 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/stretchr/objx v0.5.2 // indirect
github.com/stretchr/testify v1.11.1 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
)
replace github.com/navidrome/navidrome/plugins/pdk/go => ../../pdk/go

View File

@@ -1,14 +0,0 @@
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/extism/go-pdk v1.1.3 h1:hfViMPWrqjN6u67cIYRALZTZLk/enSPpNKa+rZ9X2SQ=
github.com/extism/go-pdk v1.1.3/go.mod h1:Gz+LIU/YCKnKXhgge8yo5Yu1F/lbv7KtKFkiCSzW/P4=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/stretchr/objx v0.5.2 h1:xuMeJ0Sdp5ZMRXx/aWO6RZxdr3beISkG5/G/aIRr3pY=
github.com/stretchr/objx v0.5.2/go.mod h1:FRsXN1f5AsAjCGJKqEizvkpNtU+EGNCLh3NxZ/8L+MA=
github.com/stretchr/testify v1.11.1 h1:7s2iGBzp5EwR7/aIZr8ao5+dra3wiQyKjjFuvgVKu7U=
github.com/stretchr/testify v1.11.1/go.mod h1:wZwfW3scLgRK+23gO65QZefKpKQRnfz6sD981Nm4B6U=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=

View File

@@ -1,100 +0,0 @@
// Test TaskQueue plugin for Navidrome plugin system integration tests.
// Build with: tinygo build -o ../test-taskqueue.wasm -target wasip1 -buildmode=c-shared .
package main
import (
"github.com/navidrome/navidrome/plugins/pdk/go/host"
"github.com/navidrome/navidrome/plugins/pdk/go/pdk"
"github.com/navidrome/navidrome/plugins/pdk/go/taskworker"
)
func init() {
taskworker.Register(&handler{})
}
type handler struct{}
func (h *handler) OnTaskExecute(req taskworker.TaskExecuteRequest) (taskworker.TaskExecuteResponse, error) {
payload := string(req.Payload)
if payload == "fail" {
return taskworker.TaskExecuteResponse{Error: "task failed as instructed"}, nil
}
if payload == "fail-then-succeed" && req.Attempt < 2 {
return taskworker.TaskExecuteResponse{Error: "transient failure"}, nil
}
return taskworker.TaskExecuteResponse{}, nil
}
// Test helper types
type TestInput struct {
Operation string `json:"operation"`
QueueName string `json:"queueName,omitempty"`
Config *host.QueueConfig `json:"config,omitempty"`
Payload []byte `json:"payload,omitempty"`
TaskID string `json:"taskId,omitempty"`
}
type TestOutput struct {
TaskID string `json:"taskId,omitempty"`
Status string `json:"status,omitempty"`
Error *string `json:"error,omitempty"`
}
//go:wasmexport nd_test_taskqueue
func ndTestTaskQueue() int32 {
var input TestInput
if err := pdk.InputJSON(&input); err != nil {
errStr := err.Error()
pdk.OutputJSON(TestOutput{Error: &errStr})
return 0
}
switch input.Operation {
case "create_queue":
config := host.QueueConfig{}
if input.Config != nil {
config = *input.Config
}
err := host.TaskQueueCreateQueue(input.QueueName, config)
if err != nil {
errStr := err.Error()
pdk.OutputJSON(TestOutput{Error: &errStr})
return 0
}
pdk.OutputJSON(TestOutput{})
case "enqueue":
taskID, err := host.TaskQueueEnqueue(input.QueueName, input.Payload)
if err != nil {
errStr := err.Error()
pdk.OutputJSON(TestOutput{Error: &errStr})
return 0
}
pdk.OutputJSON(TestOutput{TaskID: taskID})
case "get_task_status":
status, err := host.TaskQueueGetTaskStatus(input.TaskID)
if err != nil {
errStr := err.Error()
pdk.OutputJSON(TestOutput{Error: &errStr})
return 0
}
pdk.OutputJSON(TestOutput{Status: status})
case "cancel_task":
err := host.TaskQueueCancelTask(input.TaskID)
if err != nil {
errStr := err.Error()
pdk.OutputJSON(TestOutput{Error: &errStr})
return 0
}
pdk.OutputJSON(TestOutput{})
default:
errStr := "unknown operation: " + input.Operation
pdk.OutputJSON(TestOutput{Error: &errStr})
}
return 0
}
func main() {}

View File

@@ -1,12 +0,0 @@
{
"name": "Test TaskQueue Plugin",
"author": "Navidrome Test",
"version": "1.0.0",
"description": "A test plugin for TaskQueue integration testing",
"permissions": {
"taskqueue": {
"reason": "For testing task queue operations",
"maxConcurrency": 10
}
}
}

View File

@@ -443,7 +443,7 @@ func (api *Router) buildArtist(r *http.Request, artist *model.Artist) (*response
func (api *Router) buildAlbumDirectory(ctx context.Context, album *model.Album) (*responses.Directory, error) {
dir := &responses.Directory{}
dir.Id = album.ID
dir.Name = album.FullName()
dir.Name = album.Name
dir.Parent = album.AlbumArtistID
dir.PlayCount = album.PlayCount
if album.PlayCount > 0 {

View File

@@ -197,7 +197,7 @@ func childFromMediaFile(ctx context.Context, mf model.MediaFile) responses.Child
}
child.Parent = mf.AlbumID
child.Album = mf.FullAlbumName()
child.Album = mf.Album
child.Year = int32(mf.Year)
child.Artist = mf.Artist
child.Genre = mf.Genre
@@ -302,7 +302,7 @@ func artistRefs(participants model.ParticipantList) []responses.ArtistID3Ref {
func fakePath(mf model.MediaFile) string {
builder := strings.Builder{}
builder.WriteString(fmt.Sprintf("%s/%s/", sanitizeSlashes(mf.AlbumArtist), sanitizeSlashes(mf.FullAlbumName())))
builder.WriteString(fmt.Sprintf("%s/%s/", sanitizeSlashes(mf.AlbumArtist), sanitizeSlashes(mf.Album)))
if mf.DiscNumber != 0 {
builder.WriteString(fmt.Sprintf("%02d-", mf.DiscNumber))
}
@@ -321,10 +321,9 @@ func childFromAlbum(ctx context.Context, al model.Album) responses.Child {
child := responses.Child{}
child.Id = al.ID
child.IsDir = true
fullName := al.FullName()
child.Title = fullName
child.Name = fullName
child.Album = fullName
child.Title = al.Name
child.Name = al.Name
child.Album = al.Name
child.Artist = al.AlbumArtist
child.Year = int32(cmp.Or(al.MaxOriginalYear, al.MaxYear))
child.Genre = al.Genre
@@ -406,7 +405,7 @@ func buildDiscSubtitles(a model.Album) []responses.DiscTitle {
func buildAlbumID3(ctx context.Context, album model.Album) responses.AlbumID3 {
dir := responses.AlbumID3{}
dir.Id = album.ID
dir.Name = album.FullName()
dir.Name = album.Name
dir.Artist = album.AlbumArtist
dir.ArtistId = album.AlbumArtistID
dir.CoverArt = album.CoverArtID().String()