Compare commits

..

2 Commits

Author SHA1 Message Date
Deluan
b545574c37 test: add tests to buildAllowedPaths
Signed-off-by: Deluan <deluan@navidrome.org>
2026-02-27 21:55:59 -05:00
Deluan
2c245e5446 feat(plugins): mount library directories as read-only by default
Add an AllowWriteAccess boolean to the plugin model, defaulting to
false. When off, library directories are mounted with the extism "ro:"
prefix (read-only). Admins can explicitly grant write access via a new
toggle in the Library Permission card.
2026-02-27 21:34:12 -05:00
36 changed files with 226 additions and 3213 deletions

View File

@@ -0,0 +1,5 @@
-- +goose Up
ALTER TABLE plugin ADD COLUMN allow_write_access BOOL NOT NULL DEFAULT false;
-- +goose Down
ALTER TABLE plugin DROP COLUMN allow_write_access;

View File

@@ -3,19 +3,20 @@ package model
import "time"
type Plugin struct {
ID string `structs:"id" json:"id"`
Path string `structs:"path" json:"path"`
Manifest string `structs:"manifest" json:"manifest"`
Config string `structs:"config" json:"config,omitempty"`
Users string `structs:"users" json:"users,omitempty"`
AllUsers bool `structs:"all_users" json:"allUsers,omitempty"`
Libraries string `structs:"libraries" json:"libraries,omitempty"`
AllLibraries bool `structs:"all_libraries" json:"allLibraries,omitempty"`
Enabled bool `structs:"enabled" json:"enabled"`
LastError string `structs:"last_error" json:"lastError,omitempty"`
SHA256 string `structs:"sha256" json:"sha256"`
CreatedAt time.Time `structs:"created_at" json:"createdAt"`
UpdatedAt time.Time `structs:"updated_at" json:"updatedAt"`
ID string `structs:"id" json:"id"`
Path string `structs:"path" json:"path"`
Manifest string `structs:"manifest" json:"manifest"`
Config string `structs:"config" json:"config,omitempty"`
Users string `structs:"users" json:"users,omitempty"`
AllUsers bool `structs:"all_users" json:"allUsers,omitempty"`
Libraries string `structs:"libraries" json:"libraries,omitempty"`
AllLibraries bool `structs:"all_libraries" json:"allLibraries,omitempty"`
AllowWriteAccess bool `structs:"allow_write_access" json:"allowWriteAccess,omitempty"`
Enabled bool `structs:"enabled" json:"enabled"`
LastError string `structs:"last_error" json:"lastError,omitempty"`
SHA256 string `structs:"sha256" json:"sha256"`
CreatedAt time.Time `structs:"created_at" json:"createdAt"`
UpdatedAt time.Time `structs:"updated_at" json:"updatedAt"`
}
type Plugins []Plugin

View File

@@ -79,8 +79,8 @@ func (r *pluginRepository) Put(plugin *model.Plugin) error {
// Upsert using INSERT ... ON CONFLICT for atomic operation
_, err := r.db.NewQuery(`
INSERT INTO plugin (id, path, manifest, config, users, all_users, libraries, all_libraries, enabled, last_error, sha256, created_at, updated_at)
VALUES ({:id}, {:path}, {:manifest}, {:config}, {:users}, {:all_users}, {:libraries}, {:all_libraries}, {:enabled}, {:last_error}, {:sha256}, {:created_at}, {:updated_at})
INSERT INTO plugin (id, path, manifest, config, users, all_users, libraries, all_libraries, allow_write_access, enabled, last_error, sha256, created_at, updated_at)
VALUES ({:id}, {:path}, {:manifest}, {:config}, {:users}, {:all_users}, {:libraries}, {:all_libraries}, {:allow_write_access}, {:enabled}, {:last_error}, {:sha256}, {:created_at}, {:updated_at})
ON CONFLICT(id) DO UPDATE SET
path = excluded.path,
manifest = excluded.manifest,
@@ -89,24 +89,26 @@ func (r *pluginRepository) Put(plugin *model.Plugin) error {
all_users = excluded.all_users,
libraries = excluded.libraries,
all_libraries = excluded.all_libraries,
allow_write_access = excluded.allow_write_access,
enabled = excluded.enabled,
last_error = excluded.last_error,
sha256 = excluded.sha256,
updated_at = excluded.updated_at
`).Bind(dbx.Params{
"id": plugin.ID,
"path": plugin.Path,
"manifest": plugin.Manifest,
"config": plugin.Config,
"users": plugin.Users,
"all_users": plugin.AllUsers,
"libraries": plugin.Libraries,
"all_libraries": plugin.AllLibraries,
"enabled": plugin.Enabled,
"last_error": plugin.LastError,
"sha256": plugin.SHA256,
"created_at": time.Now(),
"updated_at": plugin.UpdatedAt,
"id": plugin.ID,
"path": plugin.Path,
"manifest": plugin.Manifest,
"config": plugin.Config,
"users": plugin.Users,
"all_users": plugin.AllUsers,
"libraries": plugin.Libraries,
"all_libraries": plugin.AllLibraries,
"allow_write_access": plugin.AllowWriteAccess,
"enabled": plugin.Enabled,
"last_error": plugin.LastError,
"sha256": plugin.SHA256,
"created_at": time.Now(),
"updated_at": plugin.UpdatedAt,
}).Execute()
return err
}

View File

@@ -1,27 +0,0 @@
package capabilities
// TaskWorker provides task execution handling.
// This capability allows plugins to receive callbacks when their queued tasks
// are ready to execute. Plugins that use the taskqueue host service must
// implement this capability.
//
//nd:capability name=taskworker
type TaskWorker interface {
// OnTaskExecute is called when a queued task is ready to run.
// The returned string is a status/result message stored in the tasks table.
// Return an error to trigger retry (if retries are configured).
//nd:export name=nd_task_execute
OnTaskExecute(TaskExecuteRequest) (string, error)
}
// TaskExecuteRequest is the request provided when a task is ready to execute.
type TaskExecuteRequest struct {
// QueueName is the name of the queue this task belongs to.
QueueName string `json:"queueName"`
// TaskID is the unique identifier for this task.
TaskID string `json:"taskId"`
// Payload is the opaque data provided when the task was enqueued.
Payload []byte `json:"payload"`
// Attempt is the current attempt number (1-based: first attempt = 1).
Attempt int32 `json:"attempt"`
}

View File

@@ -1,38 +0,0 @@
version: v1-draft
exports:
nd_task_execute:
description: |-
OnTaskExecute is called when a queued task is ready to run.
The returned string is a status/result message stored in the tasks table.
Return an error to trigger retry (if retries are configured).
input:
$ref: '#/components/schemas/TaskExecuteRequest'
contentType: application/json
output:
type: string
contentType: application/json
components:
schemas:
TaskExecuteRequest:
description: TaskExecuteRequest is the request provided when a task is ready to execute.
properties:
queueName:
type: string
description: QueueName is the name of the queue this task belongs to.
taskId:
type: string
description: TaskID is the unique identifier for this task.
payload:
type: array
description: Payload is the opaque data provided when the task was enqueued.
items:
type: object
attempt:
type: integer
format: int32
description: 'Attempt is the current attempt number (1-based: first attempt = 1).'
required:
- queueName
- taskId
- payload
- attempt

View File

@@ -1,68 +0,0 @@
package host
import "context"
// TaskInfo holds the current state of a task.
type TaskInfo struct {
// Status is the current task status: "pending", "running",
// "completed", "failed", or "cancelled".
Status string `json:"status"`
// Message is the status/result message returned by the plugin callback.
Message string `json:"message"`
// Attempt is the current or last attempt number (1-based).
Attempt int32 `json:"attempt"`
}
// QueueConfig holds configuration for a task queue.
type QueueConfig struct {
// Concurrency is the max number of parallel workers. Default: 1.
// Capped by the plugin's manifest maxConcurrency.
Concurrency int32 `json:"concurrency"`
// MaxRetries is the number of times to retry a failed task. Default: 0.
MaxRetries int32 `json:"maxRetries"`
// BackoffMs is the initial backoff between retries in milliseconds.
// Doubles each retry (exponential: backoffMs * 2^(attempt-1)). Default: 1000.
BackoffMs int64 `json:"backoffMs"`
// DelayMs is the minimum delay between starting consecutive tasks
// in milliseconds. Useful for rate limiting. Default: 0.
DelayMs int64 `json:"delayMs"`
// RetentionMs is how long completed/failed/cancelled tasks are kept
// in milliseconds. Default: 3600000 (1h). Min: 60000 (1m). Max: 604800000 (1w).
RetentionMs int64 `json:"retentionMs"`
}
// TaskService provides persistent task queues for plugins.
//
// This service allows plugins to create named queues with configurable concurrency,
// retry policies, and rate limiting. Tasks are persisted to SQLite and survive
// server restarts. When a task is ready to execute, the host calls the plugin's
// nd_task_execute callback function.
//
//nd:hostservice name=Task permission=taskqueue
type TaskService interface {
// CreateQueue creates a named task queue with the given configuration.
// Zero-value fields in config use sensible defaults.
// If a queue with the same name already exists, returns an error.
// On startup, this also recovers any stale "running" tasks from a previous crash.
//nd:hostfunc
CreateQueue(ctx context.Context, name string, config QueueConfig) error
// Enqueue adds a task to the named queue. Returns the task ID.
// payload is opaque bytes passed back to the plugin on execution.
//nd:hostfunc
Enqueue(ctx context.Context, queueName string, payload []byte) (string, error)
// Get returns the current state of a task including its status,
// message, and attempt count.
//nd:hostfunc
Get(ctx context.Context, taskID string) (*TaskInfo, error)
// Cancel cancels a pending task. Returns error if already
// running, completed, or failed.
//nd:hostfunc
Cancel(ctx context.Context, taskID string) error
}

View File

@@ -1,220 +0,0 @@
// Code generated by ndpgen. DO NOT EDIT.
package host
import (
"context"
"encoding/json"
extism "github.com/extism/go-sdk"
)
// TaskCreateQueueRequest is the request type for Task.CreateQueue.
type TaskCreateQueueRequest struct {
Name string `json:"name"`
Config QueueConfig `json:"config"`
}
// TaskCreateQueueResponse is the response type for Task.CreateQueue.
type TaskCreateQueueResponse struct {
Error string `json:"error,omitempty"`
}
// TaskEnqueueRequest is the request type for Task.Enqueue.
type TaskEnqueueRequest struct {
QueueName string `json:"queueName"`
Payload []byte `json:"payload"`
}
// TaskEnqueueResponse is the response type for Task.Enqueue.
type TaskEnqueueResponse struct {
Result string `json:"result,omitempty"`
Error string `json:"error,omitempty"`
}
// TaskGetRequest is the request type for Task.Get.
type TaskGetRequest struct {
TaskID string `json:"taskId"`
}
// TaskGetResponse is the response type for Task.Get.
type TaskGetResponse struct {
Result *TaskInfo `json:"result,omitempty"`
Error string `json:"error,omitempty"`
}
// TaskCancelRequest is the request type for Task.Cancel.
type TaskCancelRequest struct {
TaskID string `json:"taskId"`
}
// TaskCancelResponse is the response type for Task.Cancel.
type TaskCancelResponse struct {
Error string `json:"error,omitempty"`
}
// RegisterTaskHostFunctions registers Task service host functions.
// The returned host functions should be added to the plugin's configuration.
func RegisterTaskHostFunctions(service TaskService) []extism.HostFunction {
return []extism.HostFunction{
newTaskCreateQueueHostFunction(service),
newTaskEnqueueHostFunction(service),
newTaskGetHostFunction(service),
newTaskCancelHostFunction(service),
}
}
func newTaskCreateQueueHostFunction(service TaskService) extism.HostFunction {
return extism.NewHostFunctionWithStack(
"task_createqueue",
func(ctx context.Context, p *extism.CurrentPlugin, stack []uint64) {
// Read JSON request from plugin memory
reqBytes, err := p.ReadBytes(stack[0])
if err != nil {
taskWriteError(p, stack, err)
return
}
var req TaskCreateQueueRequest
if err := json.Unmarshal(reqBytes, &req); err != nil {
taskWriteError(p, stack, err)
return
}
// Call the service method
if svcErr := service.CreateQueue(ctx, req.Name, req.Config); svcErr != nil {
taskWriteError(p, stack, svcErr)
return
}
// Write JSON response to plugin memory
resp := TaskCreateQueueResponse{}
taskWriteResponse(p, stack, resp)
},
[]extism.ValueType{extism.ValueTypePTR},
[]extism.ValueType{extism.ValueTypePTR},
)
}
func newTaskEnqueueHostFunction(service TaskService) extism.HostFunction {
return extism.NewHostFunctionWithStack(
"task_enqueue",
func(ctx context.Context, p *extism.CurrentPlugin, stack []uint64) {
// Read JSON request from plugin memory
reqBytes, err := p.ReadBytes(stack[0])
if err != nil {
taskWriteError(p, stack, err)
return
}
var req TaskEnqueueRequest
if err := json.Unmarshal(reqBytes, &req); err != nil {
taskWriteError(p, stack, err)
return
}
// Call the service method
result, svcErr := service.Enqueue(ctx, req.QueueName, req.Payload)
if svcErr != nil {
taskWriteError(p, stack, svcErr)
return
}
// Write JSON response to plugin memory
resp := TaskEnqueueResponse{
Result: result,
}
taskWriteResponse(p, stack, resp)
},
[]extism.ValueType{extism.ValueTypePTR},
[]extism.ValueType{extism.ValueTypePTR},
)
}
func newTaskGetHostFunction(service TaskService) extism.HostFunction {
return extism.NewHostFunctionWithStack(
"task_get",
func(ctx context.Context, p *extism.CurrentPlugin, stack []uint64) {
// Read JSON request from plugin memory
reqBytes, err := p.ReadBytes(stack[0])
if err != nil {
taskWriteError(p, stack, err)
return
}
var req TaskGetRequest
if err := json.Unmarshal(reqBytes, &req); err != nil {
taskWriteError(p, stack, err)
return
}
// Call the service method
result, svcErr := service.Get(ctx, req.TaskID)
if svcErr != nil {
taskWriteError(p, stack, svcErr)
return
}
// Write JSON response to plugin memory
resp := TaskGetResponse{
Result: result,
}
taskWriteResponse(p, stack, resp)
},
[]extism.ValueType{extism.ValueTypePTR},
[]extism.ValueType{extism.ValueTypePTR},
)
}
func newTaskCancelHostFunction(service TaskService) extism.HostFunction {
return extism.NewHostFunctionWithStack(
"task_cancel",
func(ctx context.Context, p *extism.CurrentPlugin, stack []uint64) {
// Read JSON request from plugin memory
reqBytes, err := p.ReadBytes(stack[0])
if err != nil {
taskWriteError(p, stack, err)
return
}
var req TaskCancelRequest
if err := json.Unmarshal(reqBytes, &req); err != nil {
taskWriteError(p, stack, err)
return
}
// Call the service method
if svcErr := service.Cancel(ctx, req.TaskID); svcErr != nil {
taskWriteError(p, stack, svcErr)
return
}
// Write JSON response to plugin memory
resp := TaskCancelResponse{}
taskWriteResponse(p, stack, resp)
},
[]extism.ValueType{extism.ValueTypePTR},
[]extism.ValueType{extism.ValueTypePTR},
)
}
// taskWriteResponse writes a JSON response to plugin memory.
func taskWriteResponse(p *extism.CurrentPlugin, stack []uint64, resp any) {
respBytes, err := json.Marshal(resp)
if err != nil {
taskWriteError(p, stack, err)
return
}
respPtr, err := p.WriteBytes(respBytes)
if err != nil {
stack[0] = 0
return
}
stack[0] = respPtr
}
// taskWriteError writes an error response to plugin memory.
func taskWriteError(p *extism.CurrentPlugin, stack []uint64, err error) {
errResp := struct {
Error string `json:"error"`
}{Error: err.Error()}
respBytes, _ := json.Marshal(errResp)
respPtr, _ := p.WriteBytes(respBytes)
stack[0] = respPtr
}

View File

@@ -1,566 +0,0 @@
package plugins
import (
"context"
"database/sql"
"errors"
"fmt"
"io"
"os"
"path/filepath"
"sync"
"time"
_ "github.com/mattn/go-sqlite3"
"github.com/navidrome/navidrome/conf"
"github.com/navidrome/navidrome/log"
"github.com/navidrome/navidrome/model/id"
"github.com/navidrome/navidrome/plugins/capabilities"
"github.com/navidrome/navidrome/plugins/host"
"golang.org/x/time/rate"
)
const (
defaultConcurrency int32 = 1
defaultBackoffMs int64 = 1000
defaultRetentionMs int64 = 3_600_000 // 1 hour
minRetentionMs int64 = 60_000 // 1 minute
maxRetentionMs int64 = 604_800_000 // 1 week
maxQueueNameLength = 128
maxPayloadSize = 1 * 1024 * 1024 // 1MB
maxBackoffMs int64 = 3_600_000 // 1 hour
cleanupInterval = 5 * time.Minute
pollInterval = 5 * time.Second
shutdownTimeout = 10 * time.Second
taskStatusPending = "pending"
taskStatusRunning = "running"
taskStatusCompleted = "completed"
taskStatusFailed = "failed"
taskStatusCancelled = "cancelled"
)
// CapabilityTaskWorker indicates the plugin can receive task execution callbacks.
const CapabilityTaskWorker Capability = "TaskWorker"
const FuncTaskWorkerCallback = "nd_task_execute"
func init() {
registerCapability(CapabilityTaskWorker, FuncTaskWorkerCallback)
}
type queueState struct {
config host.QueueConfig
signal chan struct{}
limiter *rate.Limiter
}
// notifyWorkers sends a non-blocking signal to wake up queue workers.
func (qs *queueState) notifyWorkers() {
select {
case qs.signal <- struct{}{}:
default:
}
}
// taskQueueServiceImpl implements host.TaskQueueService with SQLite persistence
// and background worker goroutines for task execution.
type taskQueueServiceImpl struct {
pluginName string
manager *Manager
maxConcurrency int32
db *sql.DB
ctx context.Context
cancel context.CancelFunc
wg sync.WaitGroup
mu sync.Mutex
queues map[string]*queueState
// For testing: override how callbacks are invoked
invokeCallbackFn func(ctx context.Context, queueName, taskID string, payload []byte, attempt int32) (string, error)
}
// newTaskQueueService creates a new taskQueueServiceImpl with its own SQLite database.
func newTaskQueueService(pluginName string, manager *Manager, maxConcurrency int32) (*taskQueueServiceImpl, error) {
dataDir := filepath.Join(conf.Server.DataFolder, "plugins", pluginName)
if err := os.MkdirAll(dataDir, 0700); err != nil {
return nil, fmt.Errorf("creating plugin data directory: %w", err)
}
dbPath := filepath.Join(dataDir, "taskqueue.db")
db, err := sql.Open("sqlite3", dbPath+"?_busy_timeout=5000&_journal_mode=WAL&_foreign_keys=off")
if err != nil {
return nil, fmt.Errorf("opening taskqueue database: %w", err)
}
db.SetMaxOpenConns(3)
db.SetMaxIdleConns(1)
if err := createTaskQueueSchema(db); err != nil {
db.Close()
return nil, fmt.Errorf("creating taskqueue schema: %w", err)
}
ctx, cancel := context.WithCancel(manager.ctx)
s := &taskQueueServiceImpl{
pluginName: pluginName,
manager: manager,
maxConcurrency: maxConcurrency,
db: db,
ctx: ctx,
cancel: cancel,
queues: make(map[string]*queueState),
}
s.invokeCallbackFn = s.defaultInvokeCallback
s.wg.Go(s.cleanupLoop)
log.Debug("Initialized plugin taskqueue", "plugin", pluginName, "path", dbPath, "maxConcurrency", maxConcurrency)
return s, nil
}
func createTaskQueueSchema(db *sql.DB) error {
_, err := db.Exec(`
CREATE TABLE IF NOT EXISTS queues (
name TEXT PRIMARY KEY,
concurrency INTEGER NOT NULL DEFAULT 1,
max_retries INTEGER NOT NULL DEFAULT 0,
backoff_ms INTEGER NOT NULL DEFAULT 1000,
delay_ms INTEGER NOT NULL DEFAULT 0,
retention_ms INTEGER NOT NULL DEFAULT 3600000
);
CREATE TABLE IF NOT EXISTS tasks (
id TEXT PRIMARY KEY,
queue_name TEXT NOT NULL REFERENCES queues(name),
payload BLOB NOT NULL,
status TEXT NOT NULL DEFAULT 'pending',
attempt INTEGER NOT NULL DEFAULT 0,
max_retries INTEGER NOT NULL,
next_run_at INTEGER NOT NULL,
created_at INTEGER NOT NULL,
updated_at INTEGER NOT NULL,
message TEXT NOT NULL DEFAULT ''
);
CREATE INDEX IF NOT EXISTS idx_tasks_dequeue ON tasks(queue_name, status, next_run_at);
`)
return err
}
// applyConfigDefaults fills zero-value config fields with sensible defaults
// and clamps values to valid ranges, logging warnings for clamped values.
func (s *taskQueueServiceImpl) applyConfigDefaults(ctx context.Context, name string, config *host.QueueConfig) {
if config.Concurrency <= 0 {
config.Concurrency = defaultConcurrency
}
if config.BackoffMs <= 0 {
config.BackoffMs = defaultBackoffMs
}
if config.RetentionMs <= 0 {
config.RetentionMs = defaultRetentionMs
}
if config.RetentionMs < minRetentionMs {
log.Warn(ctx, "TaskQueue retention clamped to minimum", "plugin", s.pluginName, "queue", name,
"requested", config.RetentionMs, "min", minRetentionMs)
config.RetentionMs = minRetentionMs
}
if config.RetentionMs > maxRetentionMs {
log.Warn(ctx, "TaskQueue retention clamped to maximum", "plugin", s.pluginName, "queue", name,
"requested", config.RetentionMs, "max", maxRetentionMs)
config.RetentionMs = maxRetentionMs
}
}
// clampConcurrency reduces config.Concurrency if it exceeds the remaining budget.
// Returns an error when the concurrency budget is fully exhausted.
// Must be called with s.mu held.
func (s *taskQueueServiceImpl) clampConcurrency(ctx context.Context, name string, config *host.QueueConfig) error {
var allocated int32
for _, qs := range s.queues {
allocated += qs.config.Concurrency
}
available := s.maxConcurrency - allocated
if available <= 0 {
log.Warn(ctx, "TaskQueue concurrency budget exhausted", "plugin", s.pluginName, "queue", name,
"allocated", allocated, "maxConcurrency", s.maxConcurrency)
return fmt.Errorf("concurrency budget exhausted (%d/%d allocated)", allocated, s.maxConcurrency)
}
if config.Concurrency > available {
log.Warn(ctx, "TaskQueue concurrency clamped", "plugin", s.pluginName, "queue", name,
"requested", config.Concurrency, "available", available, "maxConcurrency", s.maxConcurrency)
config.Concurrency = available
}
return nil
}
func (s *taskQueueServiceImpl) CreateQueue(ctx context.Context, name string, config host.QueueConfig) error {
if len(name) == 0 {
return fmt.Errorf("queue name cannot be empty")
}
if len(name) > maxQueueNameLength {
return fmt.Errorf("queue name exceeds maximum length of %d bytes", maxQueueNameLength)
}
s.applyConfigDefaults(ctx, name, &config)
s.mu.Lock()
defer s.mu.Unlock()
if err := s.clampConcurrency(ctx, name, &config); err != nil {
return err
}
if _, exists := s.queues[name]; exists {
return fmt.Errorf("queue %q already exists", name)
}
// Upsert into queues table (idempotent across restarts)
_, err := s.db.ExecContext(ctx, `
INSERT INTO queues (name, concurrency, max_retries, backoff_ms, delay_ms, retention_ms)
VALUES (?, ?, ?, ?, ?, ?)
ON CONFLICT(name) DO UPDATE SET
concurrency = excluded.concurrency,
max_retries = excluded.max_retries,
backoff_ms = excluded.backoff_ms,
delay_ms = excluded.delay_ms,
retention_ms = excluded.retention_ms
`, name, config.Concurrency, config.MaxRetries, config.BackoffMs, config.DelayMs, config.RetentionMs)
if err != nil {
return fmt.Errorf("creating queue: %w", err)
}
// Reset stale running tasks from previous crash
now := time.Now().UnixMilli()
_, err = s.db.ExecContext(ctx, `
UPDATE tasks SET status = ?, updated_at = ? WHERE queue_name = ? AND status = ?
`, taskStatusPending, now, name, taskStatusRunning)
if err != nil {
return fmt.Errorf("resetting stale tasks: %w", err)
}
qs := &queueState{
config: config,
signal: make(chan struct{}, 1),
}
if config.DelayMs > 0 {
// Rate limit dispatches to enforce delay between tasks.
// Burst of 1 allows one immediate dispatch, then enforces the delay interval.
qs.limiter = rate.NewLimiter(rate.Every(time.Duration(config.DelayMs)*time.Millisecond), 1)
}
s.queues[name] = qs
for i := int32(0); i < config.Concurrency; i++ {
s.wg.Go(func() { s.worker(name, qs) })
}
log.Debug(ctx, "Created task queue", "plugin", s.pluginName, "queue", name,
"concurrency", config.Concurrency, "maxRetries", config.MaxRetries,
"backoffMs", config.BackoffMs, "delayMs", config.DelayMs, "retentionMs", config.RetentionMs)
return nil
}
func (s *taskQueueServiceImpl) Enqueue(ctx context.Context, queueName string, payload []byte) (string, error) {
s.mu.Lock()
qs, exists := s.queues[queueName]
s.mu.Unlock()
if !exists {
return "", fmt.Errorf("queue %q does not exist", queueName)
}
if len(payload) > maxPayloadSize {
return "", fmt.Errorf("payload size %d exceeds maximum of %d bytes", len(payload), maxPayloadSize)
}
taskID := id.NewRandom()
now := time.Now().UnixMilli()
_, err := s.db.ExecContext(ctx, `
INSERT INTO tasks (id, queue_name, payload, status, attempt, max_retries, next_run_at, created_at, updated_at)
VALUES (?, ?, ?, ?, 0, ?, ?, ?, ?)
`, taskID, queueName, payload, taskStatusPending, qs.config.MaxRetries, now, now, now)
if err != nil {
return "", fmt.Errorf("enqueuing task: %w", err)
}
qs.notifyWorkers()
log.Trace(ctx, "Enqueued task", "plugin", s.pluginName, "queue", queueName, "taskID", taskID)
return taskID, nil
}
// Get returns the current state of a task.
func (s *taskQueueServiceImpl) Get(ctx context.Context, taskID string) (*host.TaskInfo, error) {
var info host.TaskInfo
err := s.db.QueryRowContext(ctx, `SELECT status, message, attempt FROM tasks WHERE id = ?`, taskID).
Scan(&info.Status, &info.Message, &info.Attempt)
if errors.Is(err, sql.ErrNoRows) {
return nil, fmt.Errorf("task %q not found", taskID)
}
if err != nil {
return nil, fmt.Errorf("getting task info: %w", err)
}
return &info, nil
}
// Cancel cancels a pending task.
func (s *taskQueueServiceImpl) Cancel(ctx context.Context, taskID string) error {
now := time.Now().UnixMilli()
result, err := s.db.ExecContext(ctx, `
UPDATE tasks SET status = ?, updated_at = ? WHERE id = ? AND status = ?
`, taskStatusCancelled, now, taskID, taskStatusPending)
if err != nil {
return fmt.Errorf("cancelling task: %w", err)
}
rowsAffected, err := result.RowsAffected()
if err != nil {
return fmt.Errorf("checking cancel result: %w", err)
}
if rowsAffected == 0 {
// Check if task exists at all
var status string
err := s.db.QueryRowContext(ctx, `SELECT status FROM tasks WHERE id = ?`, taskID).Scan(&status)
if errors.Is(err, sql.ErrNoRows) {
return fmt.Errorf("task %q not found", taskID)
}
if err != nil {
return fmt.Errorf("checking task existence: %w", err)
}
return fmt.Errorf("task %q cannot be cancelled (status: %s)", taskID, status)
}
log.Trace(ctx, "Cancelled task", "plugin", s.pluginName, "taskID", taskID)
return nil
}
// worker is the main loop for a single worker goroutine.
func (s *taskQueueServiceImpl) worker(queueName string, qs *queueState) {
// Process any existing pending tasks immediately on startup
s.drainQueue(queueName, qs)
ticker := time.NewTicker(pollInterval)
defer ticker.Stop()
for {
select {
case <-s.ctx.Done():
return
case <-qs.signal:
s.drainQueue(queueName, qs)
case <-ticker.C:
s.drainQueue(queueName, qs)
}
}
}
func (s *taskQueueServiceImpl) drainQueue(queueName string, qs *queueState) {
for s.ctx.Err() == nil && s.processTask(queueName, qs) {
}
}
// processTask dequeues and processes a single task. Returns true if a task was processed.
func (s *taskQueueServiceImpl) processTask(queueName string, qs *queueState) bool {
now := time.Now().UnixMilli()
// Atomically dequeue a task
var taskID string
var payload []byte
var attempt, maxRetries int32
err := s.db.QueryRowContext(s.ctx, `
UPDATE tasks SET status = ?, attempt = attempt + 1, updated_at = ?
WHERE id = (
SELECT id FROM tasks
WHERE queue_name = ? AND status = ? AND next_run_at <= ?
ORDER BY next_run_at, created_at LIMIT 1
)
RETURNING id, payload, attempt, max_retries
`, taskStatusRunning, now, queueName, taskStatusPending, now).Scan(&taskID, &payload, &attempt, &maxRetries)
if errors.Is(err, sql.ErrNoRows) {
return false
}
if err != nil {
log.Error(s.ctx, "Failed to dequeue task", "plugin", s.pluginName, "queue", queueName, err)
return false
}
// Enforce delay between task dispatches using a rate limiter.
// This is done after dequeue so that empty polls don't consume rate tokens.
if qs.limiter != nil {
if err := qs.limiter.Wait(s.ctx); err != nil {
// Context cancelled during wait — revert task to pending for recovery
s.revertTaskToPending(taskID)
return false
}
}
// Invoke callback
log.Debug(s.ctx, "Executing task", "plugin", s.pluginName, "queue", queueName, "taskID", taskID, "attempt", attempt)
message, callbackErr := s.invokeCallbackFn(s.ctx, queueName, taskID, payload, attempt)
// If context was cancelled (shutdown), revert task to pending for recovery
if s.ctx.Err() != nil {
s.revertTaskToPending(taskID)
return false
}
if callbackErr == nil {
s.completeTask(queueName, taskID, message)
} else {
s.handleTaskFailure(queueName, taskID, attempt, maxRetries, qs, callbackErr, message)
}
return true
}
func (s *taskQueueServiceImpl) completeTask(queueName, taskID, message string) {
now := time.Now().UnixMilli()
if _, err := s.db.ExecContext(s.ctx, `UPDATE tasks SET status = ?, message = ?, updated_at = ? WHERE id = ?`, taskStatusCompleted, message, now, taskID); err != nil {
log.Error(s.ctx, "Failed to mark task as completed", "plugin", s.pluginName, "taskID", taskID, err)
}
log.Debug(s.ctx, "Task completed", "plugin", s.pluginName, "queue", queueName, "taskID", taskID)
}
func (s *taskQueueServiceImpl) handleTaskFailure(queueName, taskID string, attempt, maxRetries int32, qs *queueState, callbackErr error, message string) {
log.Warn(s.ctx, "Task execution failed", "plugin", s.pluginName, "queue", queueName,
"taskID", taskID, "attempt", attempt, "maxRetries", maxRetries, "err", callbackErr)
// Use error message as fallback if no message was provided
if message == "" {
message = callbackErr.Error()
}
now := time.Now().UnixMilli()
if attempt > maxRetries {
if _, err := s.db.ExecContext(s.ctx, `UPDATE tasks SET status = ?, message = ?, updated_at = ? WHERE id = ?`, taskStatusFailed, message, now, taskID); err != nil {
log.Error(s.ctx, "Failed to mark task as failed", "plugin", s.pluginName, "taskID", taskID, err)
}
log.Warn(s.ctx, "Task failed after all retries", "plugin", s.pluginName, "queue", queueName, "taskID", taskID)
return
}
// Exponential backoff: backoffMs * 2^(attempt-1)
backoff := qs.config.BackoffMs << (attempt - 1)
if backoff <= 0 || backoff > maxBackoffMs {
backoff = maxBackoffMs
}
nextRunAt := now + backoff
if _, err := s.db.ExecContext(s.ctx, `
UPDATE tasks SET status = ?, next_run_at = ?, updated_at = ? WHERE id = ?
`, taskStatusPending, nextRunAt, now, taskID); err != nil {
log.Error(s.ctx, "Failed to reschedule task for retry", "plugin", s.pluginName, "taskID", taskID, err)
}
// Wake worker after backoff expires
time.AfterFunc(time.Duration(backoff)*time.Millisecond, func() {
qs.notifyWorkers()
})
}
// revertTaskToPending puts a running task back to pending status and decrements the attempt
// counter (used during shutdown to ensure the interrupted attempt doesn't count).
func (s *taskQueueServiceImpl) revertTaskToPending(taskID string) {
now := time.Now().UnixMilli()
_, err := s.db.Exec(`UPDATE tasks SET status = ?, attempt = MAX(attempt - 1, 0), updated_at = ? WHERE id = ? AND status = ?`, taskStatusPending, now, taskID, taskStatusRunning)
if err != nil {
log.Error("Failed to revert task to pending", "plugin", s.pluginName, "taskID", taskID, err)
}
}
// defaultInvokeCallback calls the plugin's nd_task_execute function.
func (s *taskQueueServiceImpl) defaultInvokeCallback(ctx context.Context, queueName, taskID string, payload []byte, attempt int32) (string, error) {
s.manager.mu.RLock()
p, ok := s.manager.plugins[s.pluginName]
s.manager.mu.RUnlock()
if !ok {
return "", fmt.Errorf("plugin %s not loaded", s.pluginName)
}
input := capabilities.TaskExecuteRequest{
QueueName: queueName,
TaskID: taskID,
Payload: payload,
Attempt: attempt,
}
message, err := callPluginFunction[capabilities.TaskExecuteRequest, string](ctx, p, FuncTaskWorkerCallback, input)
if err != nil {
return "", err
}
return message, nil
}
// cleanupLoop periodically removes terminal tasks past their retention period.
func (s *taskQueueServiceImpl) cleanupLoop() {
ticker := time.NewTicker(cleanupInterval)
defer ticker.Stop()
for {
select {
case <-s.ctx.Done():
return
case <-ticker.C:
s.runCleanup()
}
}
}
// runCleanup deletes terminal tasks past their retention period.
func (s *taskQueueServiceImpl) runCleanup() {
s.mu.Lock()
queues := make(map[string]*queueState, len(s.queues))
for k, v := range s.queues {
queues[k] = v
}
s.mu.Unlock()
now := time.Now().UnixMilli()
for name, qs := range queues {
result, err := s.db.ExecContext(s.ctx, `
DELETE FROM tasks WHERE queue_name = ? AND status IN (?, ?, ?) AND updated_at + ? < ?
`, name, taskStatusCompleted, taskStatusFailed, taskStatusCancelled, qs.config.RetentionMs, now)
if err != nil {
log.Error(s.ctx, "Failed to cleanup tasks", "plugin", s.pluginName, "queue", name, err)
continue
}
if deleted, _ := result.RowsAffected(); deleted > 0 {
log.Debug(s.ctx, "Cleaned up terminal tasks", "plugin", s.pluginName, "queue", name, "deleted", deleted)
}
}
}
// Close shuts down the task queue service, stopping all workers and closing the database.
func (s *taskQueueServiceImpl) Close() error {
// Cancel context to signal all goroutines
s.cancel()
// Wait for goroutines with timeout
done := make(chan struct{})
go func() {
s.wg.Wait()
close(done)
}()
select {
case <-done:
case <-time.After(shutdownTimeout):
log.Warn("TaskQueue shutdown timed out", "plugin", s.pluginName)
}
// Mark running tasks as pending for recovery on next startup
if s.db != nil {
now := time.Now().UnixMilli()
if _, err := s.db.Exec(`UPDATE tasks SET status = ?, updated_at = ? WHERE status = ?`, taskStatusPending, now, taskStatusRunning); err != nil {
log.Error("Failed to reset running tasks on shutdown", "plugin", s.pluginName, err)
}
log.Debug("Closing plugin taskqueue", "plugin", s.pluginName)
return s.db.Close()
}
return nil
}
// Compile-time verification
var _ host.TaskService = (*taskQueueServiceImpl)(nil)
var _ io.Closer = (*taskQueueServiceImpl)(nil)

View File

File diff suppressed because it is too large Load Diff

View File

@@ -428,10 +428,11 @@ func (m *Manager) UpdatePluginUsers(ctx context.Context, id, usersJSON string, a
// If the plugin is enabled, it will be reloaded with the new settings.
// If the plugin requires library permission and no libraries are configured (and allLibraries is false),
// the plugin will be automatically disabled.
func (m *Manager) UpdatePluginLibraries(ctx context.Context, id, librariesJSON string, allLibraries bool) error {
func (m *Manager) UpdatePluginLibraries(ctx context.Context, id, librariesJSON string, allLibraries, allowWriteAccess bool) error {
return m.updatePluginSettings(ctx, id, func(p *model.Plugin) {
p.Libraries = librariesJSON
p.AllLibraries = allLibraries
p.AllowWriteAccess = allowWriteAccess
})
}

View File

@@ -128,23 +128,6 @@ var hostServices = []hostServiceEntry{
return host.RegisterHTTPHostFunctions(service), nil
},
},
{
name: "Task",
hasPermission: func(p *Permissions) bool { return p != nil && p.Taskqueue != nil },
create: func(ctx *serviceContext) ([]extism.HostFunction, io.Closer) {
perm := ctx.permissions.Taskqueue
maxConcurrency := int32(1)
if perm.MaxConcurrency > 0 {
maxConcurrency = int32(perm.MaxConcurrency)
}
service, err := newTaskQueueService(ctx.pluginName, ctx.manager, maxConcurrency)
if err != nil {
log.Error("Failed to create Task service", "plugin", ctx.pluginName, err)
return nil, nil
}
return host.RegisterTaskHostFunctions(service), service
},
},
}
// extractManifest reads manifest from an .ndp package and computes its SHA-256 hash.
@@ -243,6 +226,8 @@ func (m *Manager) loadEnabledPlugins(ctx context.Context) error {
// loadPluginWithConfig loads a plugin with configuration from DB.
// The p.Path should point to an .ndp package file.
func (m *Manager) loadPluginWithConfig(p *model.Plugin) error {
ctx := log.NewContext(m.ctx, "plugin", p.ID)
if m.stopped.Load() {
return fmt.Errorf("manager is stopped")
}
@@ -300,27 +285,13 @@ func (m *Manager) loadPluginWithConfig(p *model.Plugin) error {
// Configure filesystem access for library permission
if pkg.Manifest.Permissions != nil && pkg.Manifest.Permissions.Library != nil && pkg.Manifest.Permissions.Library.Filesystem {
adminCtx := adminContext(m.ctx)
adminCtx := adminContext(ctx)
libraries, err := m.ds.Library(adminCtx).GetAll()
if err != nil {
return fmt.Errorf("failed to get libraries for filesystem access: %w", err)
}
// Build a set of allowed library IDs for fast lookup
allowedLibrarySet := make(map[int]struct{}, len(allowedLibraries))
for _, id := range allowedLibraries {
allowedLibrarySet[id] = struct{}{}
}
allowedPaths := make(map[string]string)
for _, lib := range libraries {
// Only mount if allLibraries is true or library is in the allowed list
if p.AllLibraries {
allowedPaths[lib.Path] = toPluginMountPoint(int32(lib.ID))
} else if _, ok := allowedLibrarySet[lib.ID]; ok {
allowedPaths[lib.Path] = toPluginMountPoint(int32(lib.ID))
}
}
allowedPaths := buildAllowedPaths(ctx, libraries, allowedLibraries, p.AllLibraries, p.AllowWriteAccess)
pluginManifest.AllowedPaths = allowedPaths
}
@@ -356,7 +327,7 @@ func (m *Manager) loadPluginWithConfig(p *model.Plugin) error {
// Enable experimental threads if requested in manifest
if pkg.Manifest.HasExperimentalThreads() {
runtimeConfig = runtimeConfig.WithCoreFeatures(api.CoreFeaturesV2 | experimental.CoreFeaturesThreads)
log.Debug(m.ctx, "Enabling experimental threads support", "plugin", p.ID)
log.Debug(ctx, "Enabling experimental threads support")
}
extismConfig := extism.PluginConfig{
@@ -364,24 +335,24 @@ func (m *Manager) loadPluginWithConfig(p *model.Plugin) error {
RuntimeConfig: runtimeConfig,
EnableHttpResponseHeaders: true,
}
compiled, err := extism.NewCompiledPlugin(m.ctx, pluginManifest, extismConfig, hostFunctions)
compiled, err := extism.NewCompiledPlugin(ctx, pluginManifest, extismConfig, hostFunctions)
if err != nil {
return fmt.Errorf("compiling plugin: %w", err)
}
// Create instance to detect capabilities
instance, err := compiled.Instance(m.ctx, extism.PluginInstanceConfig{})
instance, err := compiled.Instance(ctx, extism.PluginInstanceConfig{})
if err != nil {
compiled.Close(m.ctx)
compiled.Close(ctx)
return fmt.Errorf("creating instance: %w", err)
}
instance.SetLogger(extismLogger(p.ID))
capabilities := detectCapabilities(instance)
instance.Close(m.ctx)
instance.Close(ctx)
// Validate manifest against detected capabilities
if err := ValidateWithCapabilities(pkg.Manifest, capabilities); err != nil {
compiled.Close(m.ctx)
compiled.Close(ctx)
return fmt.Errorf("manifest validation: %w", err)
}
@@ -400,7 +371,7 @@ func (m *Manager) loadPluginWithConfig(p *model.Plugin) error {
m.mu.Unlock()
// Call plugin init function
callPluginInit(m.ctx, m.plugins[p.ID])
callPluginInit(ctx, m.plugins[p.ID])
return nil
}
@@ -431,3 +402,29 @@ func parsePluginConfig(configJSON string) (map[string]string, error) {
}
return pluginConfig, nil
}
// buildAllowedPaths constructs the extism AllowedPaths map for filesystem access.
// When allowWriteAccess is false (default), paths are prefixed with "ro:" for read-only.
// Only libraries that match the allowed set (or all libraries if allLibraries is true) are included.
func buildAllowedPaths(ctx context.Context, libraries model.Libraries, allowedLibraryIDs []int, allLibraries, allowWriteAccess bool) map[string]string {
allowedLibrarySet := make(map[int]struct{}, len(allowedLibraryIDs))
for _, id := range allowedLibraryIDs {
allowedLibrarySet[id] = struct{}{}
}
allowedPaths := make(map[string]string)
for _, lib := range libraries {
_, allowed := allowedLibrarySet[lib.ID]
if allLibraries || allowed {
mountPoint := toPluginMountPoint(int32(lib.ID))
if allowWriteAccess {
log.Info(ctx, "Granting read-write filesystem access to library", "libraryID", lib.ID, "mountPoint", mountPoint)
allowedPaths[lib.Path] = mountPoint
} else {
log.Debug(ctx, "Granting read-only filesystem access to library", "libraryID", lib.ID, "mountPoint", mountPoint)
allowedPaths["ro:"+lib.Path] = mountPoint
}
}
}
return allowedPaths
}

View File

@@ -3,6 +3,7 @@
package plugins
import (
"github.com/navidrome/navidrome/model"
. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"
)
@@ -58,3 +59,66 @@ var _ = Describe("parsePluginConfig", func() {
Expect(result).ToNot(BeNil())
})
})
var _ = Describe("buildAllowedPaths", func() {
var libraries model.Libraries
BeforeEach(func() {
libraries = model.Libraries{
{ID: 1, Path: "/music/library1"},
{ID: 2, Path: "/music/library2"},
{ID: 3, Path: "/music/library3"},
}
})
Context("read-only (default)", func() {
It("mounts all libraries with ro: prefix when allLibraries is true", func() {
result := buildAllowedPaths(nil, libraries, nil, true, false)
Expect(result).To(HaveLen(3))
Expect(result).To(HaveKeyWithValue("ro:/music/library1", "/libraries/1"))
Expect(result).To(HaveKeyWithValue("ro:/music/library2", "/libraries/2"))
Expect(result).To(HaveKeyWithValue("ro:/music/library3", "/libraries/3"))
})
It("mounts only selected libraries with ro: prefix", func() {
result := buildAllowedPaths(nil, libraries, []int{1, 3}, false, false)
Expect(result).To(HaveLen(2))
Expect(result).To(HaveKeyWithValue("ro:/music/library1", "/libraries/1"))
Expect(result).To(HaveKeyWithValue("ro:/music/library3", "/libraries/3"))
Expect(result).ToNot(HaveKey("ro:/music/library2"))
})
})
Context("read-write (allowWriteAccess=true)", func() {
It("mounts all libraries without ro: prefix when allLibraries is true", func() {
result := buildAllowedPaths(nil, libraries, nil, true, true)
Expect(result).To(HaveLen(3))
Expect(result).To(HaveKeyWithValue("/music/library1", "/libraries/1"))
Expect(result).To(HaveKeyWithValue("/music/library2", "/libraries/2"))
Expect(result).To(HaveKeyWithValue("/music/library3", "/libraries/3"))
})
It("mounts only selected libraries without ro: prefix", func() {
result := buildAllowedPaths(nil, libraries, []int{2}, false, true)
Expect(result).To(HaveLen(1))
Expect(result).To(HaveKeyWithValue("/music/library2", "/libraries/2"))
})
})
Context("edge cases", func() {
It("returns empty map when no libraries match", func() {
result := buildAllowedPaths(nil, libraries, []int{99}, false, false)
Expect(result).To(BeEmpty())
})
It("returns empty map when libraries list is empty", func() {
result := buildAllowedPaths(nil, nil, []int{1}, false, false)
Expect(result).To(BeEmpty())
})
It("returns empty map when allLibraries is false and no IDs provided", func() {
result := buildAllowedPaths(nil, libraries, nil, false, false)
Expect(result).To(BeEmpty())
})
})
})

View File

@@ -110,9 +110,6 @@
},
"users": {
"$ref": "#/$defs/UsersPermission"
},
"taskqueue": {
"$ref": "#/$defs/TaskQueuePermission"
}
}
},
@@ -227,23 +224,6 @@
}
}
},
"TaskQueuePermission": {
"type": "object",
"description": "Task queue permissions for background task processing",
"additionalProperties": false,
"properties": {
"reason": {
"type": "string",
"description": "Explanation for why task queue access is needed"
},
"maxConcurrency": {
"type": "integer",
"description": "Maximum total concurrent workers across all queues. Default: 1",
"minimum": 1,
"default": 1
}
}
},
"UsersPermission": {
"type": "object",
"description": "Users service permissions for accessing user information",

View File

@@ -72,13 +72,6 @@ func ValidateWithCapabilities(m *Manifest, capabilities []Capability) error {
}
}
// Task (taskqueue) permission requires TaskWorker capability
if m.Permissions != nil && m.Permissions.Taskqueue != nil {
if !hasCapability(capabilities, CapabilityTaskWorker) {
return fmt.Errorf("'taskqueue' permission requires plugin to export '%s' function", FuncTaskWorkerCallback)
}
}
return nil
}

View File

@@ -181,9 +181,6 @@ type Permissions struct {
// Subsonicapi corresponds to the JSON schema field "subsonicapi".
Subsonicapi *SubsonicAPIPermission `json:"subsonicapi,omitempty" yaml:"subsonicapi,omitempty" mapstructure:"subsonicapi,omitempty"`
// Taskqueue corresponds to the JSON schema field "taskqueue".
Taskqueue *TaskQueuePermission `json:"taskqueue,omitempty" yaml:"taskqueue,omitempty" mapstructure:"taskqueue,omitempty"`
// Users corresponds to the JSON schema field "users".
Users *UsersPermission `json:"users,omitempty" yaml:"users,omitempty" mapstructure:"users,omitempty"`
@@ -203,36 +200,6 @@ type SubsonicAPIPermission struct {
Reason *string `json:"reason,omitempty" yaml:"reason,omitempty" mapstructure:"reason,omitempty"`
}
// Task queue permissions for background task processing
type TaskQueuePermission struct {
// Maximum total concurrent workers across all queues. Default: 1
MaxConcurrency int `json:"maxConcurrency,omitempty" yaml:"maxConcurrency,omitempty" mapstructure:"maxConcurrency,omitempty"`
// Explanation for why task queue access is needed
Reason *string `json:"reason,omitempty" yaml:"reason,omitempty" mapstructure:"reason,omitempty"`
}
// UnmarshalJSON implements json.Unmarshaler.
func (j *TaskQueuePermission) UnmarshalJSON(value []byte) error {
var raw map[string]interface{}
if err := json.Unmarshal(value, &raw); err != nil {
return err
}
type Plain TaskQueuePermission
var plain Plain
if err := json.Unmarshal(value, &plain); err != nil {
return err
}
if v, ok := raw["maxConcurrency"]; !ok || v == nil {
plain.MaxConcurrency = 1.0
}
if 1 > plain.MaxConcurrency {
return fmt.Errorf("field %s: must be >= %v", "maxConcurrency", 1)
}
*j = TaskQueuePermission(plain)
return nil
}
// Enable experimental WebAssembly threads support
type ThreadsFeature struct {
// Explanation for why threads support is needed

View File

@@ -43,7 +43,6 @@ The following host services are available:
- Library: provides access to music library metadata for plugins.
- Scheduler: provides task scheduling capabilities for plugins.
- SubsonicAPI: provides access to Navidrome's Subsonic API from plugins.
- Task: provides persistent task queues for plugins.
- Users: provides access to user information for plugins.
- WebSocket: provides WebSocket communication capabilities for plugins.

View File

@@ -1,227 +0,0 @@
// Code generated by ndpgen. DO NOT EDIT.
//
// This file contains client wrappers for the Task host service.
// It is intended for use in Navidrome plugins built with TinyGo.
//
//go:build wasip1
package host
import (
"encoding/json"
"errors"
"github.com/navidrome/navidrome/plugins/pdk/go/pdk"
)
// QueueConfig represents the QueueConfig data structure.
// QueueConfig holds configuration for a task queue.
type QueueConfig struct {
Concurrency int32 `json:"concurrency"`
MaxRetries int32 `json:"maxRetries"`
BackoffMs int64 `json:"backoffMs"`
DelayMs int64 `json:"delayMs"`
RetentionMs int64 `json:"retentionMs"`
}
// TaskInfo represents the TaskInfo data structure.
// TaskInfo holds the current state of a task.
type TaskInfo struct {
Status string `json:"status"`
Message string `json:"message"`
Attempt int32 `json:"attempt"`
}
// task_createqueue is the host function provided by Navidrome.
//
//go:wasmimport extism:host/user task_createqueue
func task_createqueue(uint64) uint64
// task_enqueue is the host function provided by Navidrome.
//
//go:wasmimport extism:host/user task_enqueue
func task_enqueue(uint64) uint64
// task_get is the host function provided by Navidrome.
//
//go:wasmimport extism:host/user task_get
func task_get(uint64) uint64
// task_cancel is the host function provided by Navidrome.
//
//go:wasmimport extism:host/user task_cancel
func task_cancel(uint64) uint64
type taskCreateQueueRequest struct {
Name string `json:"name"`
Config QueueConfig `json:"config"`
}
type taskEnqueueRequest struct {
QueueName string `json:"queueName"`
Payload []byte `json:"payload"`
}
type taskEnqueueResponse struct {
Result string `json:"result,omitempty"`
Error string `json:"error,omitempty"`
}
type taskGetRequest struct {
TaskID string `json:"taskId"`
}
type taskGetResponse struct {
Result *TaskInfo `json:"result,omitempty"`
Error string `json:"error,omitempty"`
}
type taskCancelRequest struct {
TaskID string `json:"taskId"`
}
// TaskCreateQueue calls the task_createqueue host function.
// CreateQueue creates a named task queue with the given configuration.
// Zero-value fields in config use sensible defaults.
// If a queue with the same name already exists, returns an error.
// On startup, this also recovers any stale "running" tasks from a previous crash.
func TaskCreateQueue(name string, config QueueConfig) error {
// Marshal request to JSON
req := taskCreateQueueRequest{
Name: name,
Config: config,
}
reqBytes, err := json.Marshal(req)
if err != nil {
return err
}
reqMem := pdk.AllocateBytes(reqBytes)
defer reqMem.Free()
// Call the host function
responsePtr := task_createqueue(reqMem.Offset())
// Read the response from memory
responseMem := pdk.FindMemory(responsePtr)
responseBytes := responseMem.ReadBytes()
// Parse error-only response
var response struct {
Error string `json:"error,omitempty"`
}
if err := json.Unmarshal(responseBytes, &response); err != nil {
return err
}
if response.Error != "" {
return errors.New(response.Error)
}
return nil
}
// TaskEnqueue calls the task_enqueue host function.
// Enqueue adds a task to the named queue. Returns the task ID.
// payload is opaque bytes passed back to the plugin on execution.
func TaskEnqueue(queueName string, payload []byte) (string, error) {
// Marshal request to JSON
req := taskEnqueueRequest{
QueueName: queueName,
Payload: payload,
}
reqBytes, err := json.Marshal(req)
if err != nil {
return "", err
}
reqMem := pdk.AllocateBytes(reqBytes)
defer reqMem.Free()
// Call the host function
responsePtr := task_enqueue(reqMem.Offset())
// Read the response from memory
responseMem := pdk.FindMemory(responsePtr)
responseBytes := responseMem.ReadBytes()
// Parse the response
var response taskEnqueueResponse
if err := json.Unmarshal(responseBytes, &response); err != nil {
return "", err
}
// Convert Error field to Go error
if response.Error != "" {
return "", errors.New(response.Error)
}
return response.Result, nil
}
// TaskGet calls the task_get host function.
// Get returns the current state of a task including its status,
// message, and attempt count.
func TaskGet(taskID string) (*TaskInfo, error) {
// Marshal request to JSON
req := taskGetRequest{
TaskID: taskID,
}
reqBytes, err := json.Marshal(req)
if err != nil {
return nil, err
}
reqMem := pdk.AllocateBytes(reqBytes)
defer reqMem.Free()
// Call the host function
responsePtr := task_get(reqMem.Offset())
// Read the response from memory
responseMem := pdk.FindMemory(responsePtr)
responseBytes := responseMem.ReadBytes()
// Parse the response
var response taskGetResponse
if err := json.Unmarshal(responseBytes, &response); err != nil {
return nil, err
}
// Convert Error field to Go error
if response.Error != "" {
return nil, errors.New(response.Error)
}
return response.Result, nil
}
// TaskCancel calls the task_cancel host function.
// Cancel cancels a pending task. Returns error if already
// running, completed, or failed.
func TaskCancel(taskID string) error {
// Marshal request to JSON
req := taskCancelRequest{
TaskID: taskID,
}
reqBytes, err := json.Marshal(req)
if err != nil {
return err
}
reqMem := pdk.AllocateBytes(reqBytes)
defer reqMem.Free()
// Call the host function
responsePtr := task_cancel(reqMem.Offset())
// Read the response from memory
responseMem := pdk.FindMemory(responsePtr)
responseBytes := responseMem.ReadBytes()
// Parse error-only response
var response struct {
Error string `json:"error,omitempty"`
}
if err := json.Unmarshal(responseBytes, &response); err != nil {
return err
}
if response.Error != "" {
return errors.New(response.Error)
}
return nil
}

View File

@@ -1,92 +0,0 @@
// Code generated by ndpgen. DO NOT EDIT.
//
// This file contains mock implementations for non-WASM builds.
// These mocks allow IDE support, compilation, and unit testing on non-WASM platforms.
// Plugin authors can use the exported mock instances to set expectations in tests.
//
//go:build !wasip1
package host
import "github.com/stretchr/testify/mock"
// QueueConfig represents the QueueConfig data structure.
// QueueConfig holds configuration for a task queue.
type QueueConfig struct {
Concurrency int32 `json:"concurrency"`
MaxRetries int32 `json:"maxRetries"`
BackoffMs int64 `json:"backoffMs"`
DelayMs int64 `json:"delayMs"`
RetentionMs int64 `json:"retentionMs"`
}
// TaskInfo represents the TaskInfo data structure.
// TaskInfo holds the current state of a task.
type TaskInfo struct {
Status string `json:"status"`
Message string `json:"message"`
Attempt int32 `json:"attempt"`
}
// mockTaskService is the mock implementation for testing.
type mockTaskService struct {
mock.Mock
}
// TaskMock is the auto-instantiated mock instance for testing.
// Use this to set expectations: host.TaskMock.On("MethodName", args...).Return(values...)
var TaskMock = &mockTaskService{}
// CreateQueue is the mock method for TaskCreateQueue.
func (m *mockTaskService) CreateQueue(name string, config QueueConfig) error {
args := m.Called(name, config)
return args.Error(0)
}
// TaskCreateQueue delegates to the mock instance.
// CreateQueue creates a named task queue with the given configuration.
// Zero-value fields in config use sensible defaults.
// If a queue with the same name already exists, returns an error.
// On startup, this also recovers any stale "running" tasks from a previous crash.
func TaskCreateQueue(name string, config QueueConfig) error {
return TaskMock.CreateQueue(name, config)
}
// Enqueue is the mock method for TaskEnqueue.
func (m *mockTaskService) Enqueue(queueName string, payload []byte) (string, error) {
args := m.Called(queueName, payload)
return args.String(0), args.Error(1)
}
// TaskEnqueue delegates to the mock instance.
// Enqueue adds a task to the named queue. Returns the task ID.
// payload is opaque bytes passed back to the plugin on execution.
func TaskEnqueue(queueName string, payload []byte) (string, error) {
return TaskMock.Enqueue(queueName, payload)
}
// Get is the mock method for TaskGet.
func (m *mockTaskService) Get(taskID string) (*TaskInfo, error) {
args := m.Called(taskID)
return args.Get(0).(*TaskInfo), args.Error(1)
}
// TaskGet delegates to the mock instance.
// Get returns the current state of a task including its status,
// message, and attempt count.
func TaskGet(taskID string) (*TaskInfo, error) {
return TaskMock.Get(taskID)
}
// Cancel is the mock method for TaskCancel.
func (m *mockTaskService) Cancel(taskID string) error {
args := m.Called(taskID)
return args.Error(0)
}
// TaskCancel delegates to the mock instance.
// Cancel cancels a pending task. Returns error if already
// running, completed, or failed.
func TaskCancel(taskID string) error {
return TaskMock.Cancel(taskID)
}

View File

@@ -1,79 +0,0 @@
// Code generated by ndpgen. DO NOT EDIT.
//
// This file contains export wrappers for the TaskWorker capability.
// It is intended for use in Navidrome plugins built with TinyGo.
//
//go:build wasip1
package taskworker
import (
"github.com/navidrome/navidrome/plugins/pdk/go/pdk"
)
// TaskExecuteRequest is the request provided when a task is ready to execute.
type TaskExecuteRequest struct {
// QueueName is the name of the queue this task belongs to.
QueueName string `json:"queueName"`
// TaskID is the unique identifier for this task.
TaskID string `json:"taskId"`
// Payload is the opaque data provided when the task was enqueued.
Payload []byte `json:"payload"`
// Attempt is the current attempt number (1-based: first attempt = 1).
Attempt int32 `json:"attempt"`
}
// TaskWorker is the marker interface for taskworker plugins.
// Implement one or more of the provider interfaces below.
// TaskWorker provides task execution handling.
// This capability allows plugins to receive callbacks when their queued tasks
// are ready to execute. Plugins that use the taskqueue host service must
// implement this capability.
type TaskWorker interface{}
// TaskExecuteProvider provides the OnTaskExecute function.
type TaskExecuteProvider interface {
OnTaskExecute(TaskExecuteRequest) (string, error)
} // Internal implementation holders
var (
taskExecuteImpl func(TaskExecuteRequest) (string, error)
)
// Register registers a taskworker implementation.
// The implementation is checked for optional provider interfaces.
func Register(impl TaskWorker) {
if p, ok := impl.(TaskExecuteProvider); ok {
taskExecuteImpl = p.OnTaskExecute
}
}
// NotImplementedCode is the standard return code for unimplemented functions.
// The host recognizes this and skips the plugin gracefully.
const NotImplementedCode int32 = -2
//go:wasmexport nd_task_execute
func _NdTaskExecute() int32 {
if taskExecuteImpl == nil {
// Return standard code - host will skip this plugin gracefully
return NotImplementedCode
}
var input TaskExecuteRequest
if err := pdk.InputJSON(&input); err != nil {
pdk.SetError(err)
return -1
}
output, err := taskExecuteImpl(input)
if err != nil {
pdk.SetError(err)
return -1
}
if err := pdk.OutputJSON(output); err != nil {
pdk.SetError(err)
return -1
}
return 0
}

View File

@@ -1,41 +0,0 @@
// Code generated by ndpgen. DO NOT EDIT.
//
// This file provides stub implementations for non-WASM platforms.
// It allows Go plugins to compile and run tests outside of WASM,
// but the actual functionality is only available in WASM builds.
//
//go:build !wasip1
package taskworker
// TaskExecuteRequest is the request provided when a task is ready to execute.
type TaskExecuteRequest struct {
// QueueName is the name of the queue this task belongs to.
QueueName string `json:"queueName"`
// TaskID is the unique identifier for this task.
TaskID string `json:"taskId"`
// Payload is the opaque data provided when the task was enqueued.
Payload []byte `json:"payload"`
// Attempt is the current attempt number (1-based: first attempt = 1).
Attempt int32 `json:"attempt"`
}
// TaskWorker is the marker interface for taskworker plugins.
// Implement one or more of the provider interfaces below.
// TaskWorker provides task execution handling.
// This capability allows plugins to receive callbacks when their queued tasks
// are ready to execute. Plugins that use the taskqueue host service must
// implement this capability.
type TaskWorker interface{}
// TaskExecuteProvider provides the OnTaskExecute function.
type TaskExecuteProvider interface {
OnTaskExecute(TaskExecuteRequest) (string, error)
}
// NotImplementedCode is the standard return code for unimplemented functions.
const NotImplementedCode int32 = -2
// Register is a no-op on non-WASM platforms.
// This stub allows code to compile outside of WASM.
func Register(_ TaskWorker) {}

View File

@@ -1,154 +0,0 @@
# Code generated by ndpgen. DO NOT EDIT.
#
# This file contains client wrappers for the Task host service.
# It is intended for use in Navidrome plugins built with extism-py.
#
# IMPORTANT: Due to a limitation in extism-py, you cannot import this file directly.
# The @extism.import_fn decorators are only detected when defined in the plugin's
# main __init__.py file. Copy the needed functions from this file into your plugin.
from dataclasses import dataclass
from typing import Any
import extism
import json
import base64
class HostFunctionError(Exception):
"""Raised when a host function returns an error."""
pass
@extism.import_fn("extism:host/user", "task_createqueue")
def _task_createqueue(offset: int) -> int:
"""Raw host function - do not call directly."""
...
@extism.import_fn("extism:host/user", "task_enqueue")
def _task_enqueue(offset: int) -> int:
"""Raw host function - do not call directly."""
...
@extism.import_fn("extism:host/user", "task_get")
def _task_get(offset: int) -> int:
"""Raw host function - do not call directly."""
...
@extism.import_fn("extism:host/user", "task_cancel")
def _task_cancel(offset: int) -> int:
"""Raw host function - do not call directly."""
...
def task_create_queue(name: str, config: Any) -> None:
"""CreateQueue creates a named task queue with the given configuration.
Zero-value fields in config use sensible defaults.
If a queue with the same name already exists, returns an error.
On startup, this also recovers any stale "running" tasks from a previous crash.
Args:
name: str parameter.
config: Any parameter.
Raises:
HostFunctionError: If the host function returns an error.
"""
request = {
"name": name,
"config": config,
}
request_bytes = json.dumps(request).encode("utf-8")
request_mem = extism.memory.alloc(request_bytes)
response_offset = _task_createqueue(request_mem.offset)
response_mem = extism.memory.find(response_offset)
response = json.loads(extism.memory.string(response_mem))
if response.get("error"):
raise HostFunctionError(response["error"])
def task_enqueue(queue_name: str, payload: bytes) -> str:
"""Enqueue adds a task to the named queue. Returns the task ID.
payload is opaque bytes passed back to the plugin on execution.
Args:
queue_name: str parameter.
payload: bytes parameter.
Returns:
str: The result value.
Raises:
HostFunctionError: If the host function returns an error.
"""
request = {
"queueName": queue_name,
"payload": base64.b64encode(payload).decode("ascii"),
}
request_bytes = json.dumps(request).encode("utf-8")
request_mem = extism.memory.alloc(request_bytes)
response_offset = _task_enqueue(request_mem.offset)
response_mem = extism.memory.find(response_offset)
response = json.loads(extism.memory.string(response_mem))
if response.get("error"):
raise HostFunctionError(response["error"])
return response.get("result", "")
def task_get(task_id: str) -> Any:
"""Get returns the current state of a task including its status,
message, and attempt count.
Args:
task_id: str parameter.
Returns:
Any: The result value.
Raises:
HostFunctionError: If the host function returns an error.
"""
request = {
"taskId": task_id,
}
request_bytes = json.dumps(request).encode("utf-8")
request_mem = extism.memory.alloc(request_bytes)
response_offset = _task_get(request_mem.offset)
response_mem = extism.memory.find(response_offset)
response = json.loads(extism.memory.string(response_mem))
if response.get("error"):
raise HostFunctionError(response["error"])
return response.get("result", None)
def task_cancel(task_id: str) -> None:
"""Cancel cancels a pending task. Returns error if already
running, completed, or failed.
Args:
task_id: str parameter.
Raises:
HostFunctionError: If the host function returns an error.
"""
request = {
"taskId": task_id,
}
request_bytes = json.dumps(request).encode("utf-8")
request_mem = extism.memory.alloc(request_bytes)
response_offset = _task_cancel(request_mem.offset)
response_mem = extism.memory.find(response_offset)
response = json.loads(extism.memory.string(response_mem))
if response.get("error"):
raise HostFunctionError(response["error"])

View File

@@ -9,5 +9,4 @@ pub mod lifecycle;
pub mod metadata;
pub mod scheduler;
pub mod scrobbler;
pub mod taskworker;
pub mod websocket;

View File

@@ -1,102 +0,0 @@
// Code generated by ndpgen. DO NOT EDIT.
//
// This file contains export wrappers for the TaskWorker capability.
// It is intended for use in Navidrome plugins built with extism-pdk.
use serde::{Deserialize, Serialize};
use base64::Engine as _;
use base64::engine::general_purpose::STANDARD as BASE64;
mod base64_bytes {
use serde::{self, Deserialize, Deserializer, Serializer};
use base64::Engine as _;
use base64::engine::general_purpose::STANDARD as BASE64;
pub fn serialize<S>(bytes: &Vec<u8>, serializer: S) -> Result<S::Ok, S::Error>
where
S: Serializer,
{
serializer.serialize_str(&BASE64.encode(bytes))
}
pub fn deserialize<'de, D>(deserializer: D) -> Result<Vec<u8>, D::Error>
where
D: Deserializer<'de>,
{
let s = String::deserialize(deserializer)?;
BASE64.decode(&s).map_err(serde::de::Error::custom)
}
}
// Helper functions for skip_serializing_if with numeric types
#[allow(dead_code)]
fn is_zero_i32(value: &i32) -> bool { *value == 0 }
#[allow(dead_code)]
fn is_zero_u32(value: &u32) -> bool { *value == 0 }
#[allow(dead_code)]
fn is_zero_i64(value: &i64) -> bool { *value == 0 }
#[allow(dead_code)]
fn is_zero_u64(value: &u64) -> bool { *value == 0 }
#[allow(dead_code)]
fn is_zero_f32(value: &f32) -> bool { *value == 0.0 }
#[allow(dead_code)]
fn is_zero_f64(value: &f64) -> bool { *value == 0.0 }
/// TaskExecuteRequest is the request provided when a task is ready to execute.
#[derive(Debug, Clone, Default, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct TaskExecuteRequest {
/// QueueName is the name of the queue this task belongs to.
#[serde(default)]
pub queue_name: String,
/// TaskID is the unique identifier for this task.
#[serde(default)]
pub task_id: String,
/// Payload is the opaque data provided when the task was enqueued.
#[serde(default)]
#[serde(with = "base64_bytes")]
pub payload: Vec<u8>,
/// Attempt is the current attempt number (1-based: first attempt = 1).
#[serde(default)]
pub attempt: i32,
}
/// Error represents an error from a capability method.
#[derive(Debug)]
pub struct Error {
pub message: String,
}
impl std::fmt::Display for Error {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{}", self.message)
}
}
impl std::error::Error for Error {}
impl Error {
pub fn new(message: impl Into<String>) -> Self {
Self { message: message.into() }
}
}
/// TaskExecuteProvider provides the OnTaskExecute function.
pub trait TaskExecuteProvider {
fn on_task_execute(&self, req: TaskExecuteRequest) -> Result<String, Error>;
}
/// Register the on_task_execute export.
/// This macro generates the WASM export function for this method.
#[macro_export]
macro_rules! register_taskworker_task_execute {
($plugin_type:ty) => {
#[extism_pdk::plugin_fn]
pub fn nd_task_execute(
req: extism_pdk::Json<$crate::taskworker::TaskExecuteRequest>
) -> extism_pdk::FnResult<extism_pdk::Json<String>> {
let plugin = <$plugin_type>::default();
let result = $crate::taskworker::TaskExecuteProvider::on_task_execute(&plugin, req.into_inner())?;
Ok(extism_pdk::Json(result))
}
};
}

View File

@@ -40,7 +40,6 @@
//! - [`library`] - provides access to music library metadata for plugins.
//! - [`scheduler`] - provides task scheduling capabilities for plugins.
//! - [`subsonicapi`] - provides access to Navidrome's Subsonic API from plugins.
//! - [`task`] - provides persistent task queues for plugins.
//! - [`users`] - provides access to user information for plugins.
//! - [`websocket`] - provides WebSocket communication capabilities for plugins.
@@ -100,13 +99,6 @@ pub mod subsonicapi {
pub use super::nd_host_subsonicapi::*;
}
#[doc(hidden)]
mod nd_host_task;
/// provides persistent task queues for plugins.
pub mod task {
pub use super::nd_host_task::*;
}
#[doc(hidden)]
mod nd_host_users;
/// provides access to user information for plugins.

View File

@@ -1,217 +0,0 @@
// Code generated by ndpgen. DO NOT EDIT.
//
// This file contains client wrappers for the Task host service.
// It is intended for use in Navidrome plugins built with extism-pdk.
use extism_pdk::*;
use serde::{Deserialize, Serialize};
use base64::Engine as _;
use base64::engine::general_purpose::STANDARD as BASE64;
mod base64_bytes {
use serde::{self, Deserialize, Deserializer, Serializer};
use base64::Engine as _;
use base64::engine::general_purpose::STANDARD as BASE64;
pub fn serialize<S>(bytes: &Vec<u8>, serializer: S) -> Result<S::Ok, S::Error>
where
S: Serializer,
{
serializer.serialize_str(&BASE64.encode(bytes))
}
pub fn deserialize<'de, D>(deserializer: D) -> Result<Vec<u8>, D::Error>
where
D: Deserializer<'de>,
{
let s = String::deserialize(deserializer)?;
BASE64.decode(&s).map_err(serde::de::Error::custom)
}
}
/// QueueConfig holds configuration for a task queue.
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct QueueConfig {
pub concurrency: i32,
pub max_retries: i32,
pub backoff_ms: i64,
pub delay_ms: i64,
pub retention_ms: i64,
}
/// TaskInfo holds the current state of a task.
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct TaskInfo {
pub status: String,
pub message: String,
pub attempt: i32,
}
#[derive(Debug, Clone, Serialize)]
#[serde(rename_all = "camelCase")]
struct TaskCreateQueueRequest {
name: String,
config: QueueConfig,
}
#[derive(Debug, Clone, Deserialize)]
#[serde(rename_all = "camelCase")]
struct TaskCreateQueueResponse {
#[serde(default)]
error: Option<String>,
}
#[derive(Debug, Clone, Serialize)]
#[serde(rename_all = "camelCase")]
struct TaskEnqueueRequest {
queue_name: String,
#[serde(with = "base64_bytes")]
payload: Vec<u8>,
}
#[derive(Debug, Clone, Deserialize)]
#[serde(rename_all = "camelCase")]
struct TaskEnqueueResponse {
#[serde(default)]
result: String,
#[serde(default)]
error: Option<String>,
}
#[derive(Debug, Clone, Serialize)]
#[serde(rename_all = "camelCase")]
struct TaskGetRequest {
task_id: String,
}
#[derive(Debug, Clone, Deserialize)]
#[serde(rename_all = "camelCase")]
struct TaskGetResponse {
#[serde(default)]
result: Option<TaskInfo>,
#[serde(default)]
error: Option<String>,
}
#[derive(Debug, Clone, Serialize)]
#[serde(rename_all = "camelCase")]
struct TaskCancelRequest {
task_id: String,
}
#[derive(Debug, Clone, Deserialize)]
#[serde(rename_all = "camelCase")]
struct TaskCancelResponse {
#[serde(default)]
error: Option<String>,
}
#[host_fn]
extern "ExtismHost" {
fn task_createqueue(input: Json<TaskCreateQueueRequest>) -> Json<TaskCreateQueueResponse>;
fn task_enqueue(input: Json<TaskEnqueueRequest>) -> Json<TaskEnqueueResponse>;
fn task_get(input: Json<TaskGetRequest>) -> Json<TaskGetResponse>;
fn task_cancel(input: Json<TaskCancelRequest>) -> Json<TaskCancelResponse>;
}
/// CreateQueue creates a named task queue with the given configuration.
/// Zero-value fields in config use sensible defaults.
/// If a queue with the same name already exists, returns an error.
/// On startup, this also recovers any stale "running" tasks from a previous crash.
///
/// # Arguments
/// * `name` - String parameter.
/// * `config` - QueueConfig parameter.
///
/// # Errors
/// Returns an error if the host function call fails.
pub fn create_queue(name: &str, config: QueueConfig) -> Result<(), Error> {
let response = unsafe {
task_createqueue(Json(TaskCreateQueueRequest {
name: name.to_owned(),
config: config,
}))?
};
if let Some(err) = response.0.error {
return Err(Error::msg(err));
}
Ok(())
}
/// Enqueue adds a task to the named queue. Returns the task ID.
/// payload is opaque bytes passed back to the plugin on execution.
///
/// # Arguments
/// * `queue_name` - String parameter.
/// * `payload` - Vec<u8> parameter.
///
/// # Returns
/// The result value.
///
/// # Errors
/// Returns an error if the host function call fails.
pub fn enqueue(queue_name: &str, payload: Vec<u8>) -> Result<String, Error> {
let response = unsafe {
task_enqueue(Json(TaskEnqueueRequest {
queue_name: queue_name.to_owned(),
payload: payload,
}))?
};
if let Some(err) = response.0.error {
return Err(Error::msg(err));
}
Ok(response.0.result)
}
/// Get returns the current state of a task including its status,
/// message, and attempt count.
///
/// # Arguments
/// * `task_id` - String parameter.
///
/// # Returns
/// The result value.
///
/// # Errors
/// Returns an error if the host function call fails.
pub fn get(task_id: &str) -> Result<Option<TaskInfo>, Error> {
let response = unsafe {
task_get(Json(TaskGetRequest {
task_id: task_id.to_owned(),
}))?
};
if let Some(err) = response.0.error {
return Err(Error::msg(err));
}
Ok(response.0.result)
}
/// Cancel cancels a pending task. Returns error if already
/// running, completed, or failed.
///
/// # Arguments
/// * `task_id` - String parameter.
///
/// # Errors
/// Returns an error if the host function call fails.
pub fn cancel(task_id: &str) -> Result<(), Error> {
let response = unsafe {
task_cancel(Json(TaskCancelRequest {
task_id: task_id.to_owned(),
}))?
};
if let Some(err) = response.0.error {
return Err(Error::msg(err));
}
Ok(())
}

View File

@@ -1,16 +0,0 @@
module test-taskqueue
go 1.25
require github.com/navidrome/navidrome/plugins/pdk/go v0.0.0
require (
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/extism/go-pdk v1.1.3 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/stretchr/objx v0.5.2 // indirect
github.com/stretchr/testify v1.11.1 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
)
replace github.com/navidrome/navidrome/plugins/pdk/go => ../../pdk/go

View File

@@ -1,14 +0,0 @@
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/extism/go-pdk v1.1.3 h1:hfViMPWrqjN6u67cIYRALZTZLk/enSPpNKa+rZ9X2SQ=
github.com/extism/go-pdk v1.1.3/go.mod h1:Gz+LIU/YCKnKXhgge8yo5Yu1F/lbv7KtKFkiCSzW/P4=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/stretchr/objx v0.5.2 h1:xuMeJ0Sdp5ZMRXx/aWO6RZxdr3beISkG5/G/aIRr3pY=
github.com/stretchr/objx v0.5.2/go.mod h1:FRsXN1f5AsAjCGJKqEizvkpNtU+EGNCLh3NxZ/8L+MA=
github.com/stretchr/testify v1.11.1 h1:7s2iGBzp5EwR7/aIZr8ao5+dra3wiQyKjjFuvgVKu7U=
github.com/stretchr/testify v1.11.1/go.mod h1:wZwfW3scLgRK+23gO65QZefKpKQRnfz6sD981Nm4B6U=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=

View File

@@ -1,104 +0,0 @@
// Test TaskQueue plugin for Navidrome plugin system integration tests.
// Build with: tinygo build -o ../test-taskqueue.wasm -target wasip1 -buildmode=c-shared .
package main
import (
"fmt"
"github.com/navidrome/navidrome/plugins/pdk/go/host"
"github.com/navidrome/navidrome/plugins/pdk/go/pdk"
"github.com/navidrome/navidrome/plugins/pdk/go/taskworker"
)
func init() {
taskworker.Register(&handler{})
}
type handler struct{}
func (h *handler) OnTaskExecute(req taskworker.TaskExecuteRequest) (string, error) {
payload := string(req.Payload)
if payload == "fail" {
return "", fmt.Errorf("task failed as instructed")
}
if payload == "fail-then-succeed" && req.Attempt < 2 {
return "", fmt.Errorf("transient failure")
}
return "completed successfully", nil
}
// Test helper types
type TestInput struct {
Operation string `json:"operation"`
QueueName string `json:"queueName,omitempty"`
Config *host.QueueConfig `json:"config,omitempty"`
Payload []byte `json:"payload,omitempty"`
TaskID string `json:"taskId,omitempty"`
}
type TestOutput struct {
TaskID string `json:"taskId,omitempty"`
Status string `json:"status,omitempty"`
Message string `json:"message,omitempty"`
Attempt int32 `json:"attempt,omitempty"`
Error *string `json:"error,omitempty"`
}
//go:wasmexport nd_test_taskqueue
func ndTestTaskQueue() int32 {
var input TestInput
if err := pdk.InputJSON(&input); err != nil {
errStr := err.Error()
pdk.OutputJSON(TestOutput{Error: &errStr})
return 0
}
switch input.Operation {
case "create_queue":
config := host.QueueConfig{}
if input.Config != nil {
config = *input.Config
}
err := host.TaskCreateQueue(input.QueueName, config)
if err != nil {
errStr := err.Error()
pdk.OutputJSON(TestOutput{Error: &errStr})
return 0
}
pdk.OutputJSON(TestOutput{})
case "enqueue":
taskID, err := host.TaskEnqueue(input.QueueName, input.Payload)
if err != nil {
errStr := err.Error()
pdk.OutputJSON(TestOutput{Error: &errStr})
return 0
}
pdk.OutputJSON(TestOutput{TaskID: taskID})
case "get_task_status":
info, err := host.TaskGet(input.TaskID)
if err != nil {
errStr := err.Error()
pdk.OutputJSON(TestOutput{Error: &errStr})
return 0
}
pdk.OutputJSON(TestOutput{Status: info.Status, Message: info.Message, Attempt: info.Attempt})
case "cancel_task":
err := host.TaskCancel(input.TaskID)
if err != nil {
errStr := err.Error()
pdk.OutputJSON(TestOutput{Error: &errStr})
return 0
}
pdk.OutputJSON(TestOutput{})
default:
errStr := "unknown operation: " + input.Operation
pdk.OutputJSON(TestOutput{Error: &errStr})
}
return 0
}
func main() {}

View File

@@ -1,12 +0,0 @@
{
"name": "Test TaskQueue Plugin",
"author": "Navidrome Test",
"version": "1.0.0",
"description": "A test plugin for TaskQueue integration testing",
"permissions": {
"taskqueue": {
"reason": "For testing task queue operations",
"maxConcurrency": 10
}
}
}

View File

@@ -353,7 +353,8 @@
"allUsers": "Permitir todos os usuários",
"selectedUsers": "Usuários selecionados",
"allLibraries": "Permitir todas as bibliotecas",
"selectedLibraries": "Bibliotecas selecionadas"
"selectedLibraries": "Bibliotecas selecionadas",
"allowWriteAccess": "Permitir acesso de escrita"
},
"sections": {
"status": "Status",
@@ -396,6 +397,7 @@
"allLibrariesHelp": "Quando habilitado, o plugin terá acesso a todas as bibliotecas, incluindo as criadas no futuro.",
"noLibraries": "Nenhuma biblioteca selecionada",
"librariesRequired": "Este plugin requer acesso a informações de bibliotecas. Selecione quais bibliotecas o plugin pode acessar, ou habilite 'Permitir todas as bibliotecas'.",
"allowWriteAccessHelp": "Quando habilitado, o plugin pode modificar arquivos nos diretórios das bibliotecas. Por padrão, plugins têm acesso somente leitura.",
"requiredHosts": "Hosts necessários",
"configValidationError": "Falha na validação da configuração:",
"schemaRenderError": "Não foi possível renderizar o formulário de configuração. O schema do plugin pode estar inválido."

View File

@@ -29,7 +29,7 @@ type PluginManager interface {
ValidatePluginConfig(ctx context.Context, id, configJSON string) error
UpdatePluginConfig(ctx context.Context, id, configJSON string) error
UpdatePluginUsers(ctx context.Context, id, usersJSON string, allUsers bool) error
UpdatePluginLibraries(ctx context.Context, id, librariesJSON string, allLibraries bool) error
UpdatePluginLibraries(ctx context.Context, id, librariesJSON string, allLibraries, allowWriteAccess bool) error
RescanPlugins(ctx context.Context) error
UnloadDisabledPlugins(ctx context.Context)
}

View File

@@ -56,12 +56,13 @@ func pluginsEnabledMiddleware(next http.Handler) http.Handler {
// PluginUpdateRequest represents the fields that can be updated via the API
type PluginUpdateRequest struct {
Enabled *bool `json:"enabled,omitempty"`
Config *string `json:"config,omitempty"`
Users *string `json:"users,omitempty"`
AllUsers *bool `json:"allUsers,omitempty"`
Libraries *string `json:"libraries,omitempty"`
AllLibraries *bool `json:"allLibraries,omitempty"`
Enabled *bool `json:"enabled,omitempty"`
Config *string `json:"config,omitempty"`
Users *string `json:"users,omitempty"`
AllUsers *bool `json:"allUsers,omitempty"`
Libraries *string `json:"libraries,omitempty"`
AllLibraries *bool `json:"allLibraries,omitempty"`
AllowWriteAccess *bool `json:"allowWriteAccess,omitempty"`
}
func (api *Router) updatePlugin(w http.ResponseWriter, r *http.Request) {
@@ -109,7 +110,7 @@ func (api *Router) updatePlugin(w http.ResponseWriter, r *http.Request) {
}
// Handle libraries permission update (if provided)
if req.Libraries != nil || req.AllLibraries != nil {
if req.Libraries != nil || req.AllLibraries != nil || req.AllowWriteAccess != nil {
if err := validateAndUpdateLibraries(ctx, api.pluginManager, repo, id, req, w); err != nil {
log.Error(ctx, "Error updating plugin libraries", err)
return
@@ -245,6 +246,7 @@ func validateAndUpdateLibraries(ctx context.Context, pm PluginManager, repo mode
librariesJSON := plugin.Libraries
allLibraries := plugin.AllLibraries
allowWriteAccess := plugin.AllowWriteAccess
if req.Libraries != nil {
if *req.Libraries != "" && !isValidJSON(*req.Libraries) {
@@ -256,8 +258,11 @@ func validateAndUpdateLibraries(ctx context.Context, pm PluginManager, repo mode
if req.AllLibraries != nil {
allLibraries = *req.AllLibraries
}
if req.AllowWriteAccess != nil {
allowWriteAccess = *req.AllowWriteAccess
}
if err := pm.UpdatePluginLibraries(ctx, id, librariesJSON, allLibraries); err != nil {
if err := pm.UpdatePluginLibraries(ctx, id, librariesJSON, allLibraries, allowWriteAccess); err != nil {
log.Error(ctx, "Error updating plugin libraries", "id", id, err)
http.Error(w, "Error updating plugin libraries: "+err.Error(), http.StatusInternalServerError)
return err

View File

@@ -18,7 +18,7 @@ type MockPluginManager struct {
// UpdatePluginUsersFn is called when UpdatePluginUsers is invoked. If nil, returns UsersError.
UpdatePluginUsersFn func(ctx context.Context, id, usersJSON string, allUsers bool) error
// UpdatePluginLibrariesFn is called when UpdatePluginLibraries is invoked. If nil, returns LibrariesError.
UpdatePluginLibrariesFn func(ctx context.Context, id, librariesJSON string, allLibraries bool) error
UpdatePluginLibrariesFn func(ctx context.Context, id, librariesJSON string, allLibraries, allowWriteAccess bool) error
// RescanPluginsFn is called when RescanPlugins is invoked. If nil, returns RescanError.
RescanPluginsFn func(ctx context.Context) error
@@ -48,9 +48,10 @@ type MockPluginManager struct {
AllUsers bool
}
UpdatePluginLibrariesCalls []struct {
ID string
LibrariesJSON string
AllLibraries bool
ID string
LibrariesJSON string
AllLibraries bool
AllowWriteAccess bool
}
RescanPluginsCalls int
}
@@ -105,14 +106,15 @@ func (m *MockPluginManager) UpdatePluginUsers(ctx context.Context, id, usersJSON
return m.UsersError
}
func (m *MockPluginManager) UpdatePluginLibraries(ctx context.Context, id, librariesJSON string, allLibraries bool) error {
func (m *MockPluginManager) UpdatePluginLibraries(ctx context.Context, id, librariesJSON string, allLibraries, allowWriteAccess bool) error {
m.UpdatePluginLibrariesCalls = append(m.UpdatePluginLibrariesCalls, struct {
ID string
LibrariesJSON string
AllLibraries bool
}{ID: id, LibrariesJSON: librariesJSON, AllLibraries: allLibraries})
ID string
LibrariesJSON string
AllLibraries bool
AllowWriteAccess bool
}{ID: id, LibrariesJSON: librariesJSON, AllLibraries: allLibraries, AllowWriteAccess: allowWriteAccess})
if m.UpdatePluginLibrariesFn != nil {
return m.UpdatePluginLibrariesFn(ctx, id, librariesJSON, allLibraries)
return m.UpdatePluginLibrariesFn(ctx, id, librariesJSON, allLibraries, allowWriteAccess)
}
return m.LibrariesError
}

View File

@@ -355,7 +355,8 @@
"allUsers": "Allow all users",
"selectedUsers": "Selected users",
"allLibraries": "Allow all libraries",
"selectedLibraries": "Selected libraries"
"selectedLibraries": "Selected libraries",
"allowWriteAccess": "Allow write access"
},
"sections": {
"status": "Status",
@@ -400,6 +401,7 @@
"allLibrariesHelp": "When enabled, the plugin will have access to all libraries, including those created in the future.",
"noLibraries": "No libraries selected",
"librariesRequired": "This plugin requires access to library information. Select which libraries the plugin can access, or enable 'Allow all libraries'.",
"allowWriteAccessHelp": "When enabled, the plugin can modify files in the library directories. By default, plugins have read-only access.",
"requiredHosts": "Required hosts"
},
"placeholders": {

View File

@@ -23,8 +23,10 @@ export const LibraryPermissionCard = ({
classes,
selectedLibraries,
allLibraries,
allowWriteAccess,
onSelectedLibrariesChange,
onAllLibrariesChange,
onAllowWriteAccessChange,
}) => {
const translate = useTranslate()
@@ -58,9 +60,17 @@ export const LibraryPermissionCard = ({
[onAllLibrariesChange],
)
const handleAllowWriteAccessToggle = React.useCallback(
(event) => {
onAllowWriteAccessChange(event.target.checked)
},
[onAllowWriteAccessChange],
)
// Get permission reason from manifest
const libraryPermission = manifest?.permissions?.library
const reason = libraryPermission?.reason
const hasFilesystem = libraryPermission?.filesystem === true
// Check if permission is required but not configured
const isConfigurationRequired =
@@ -107,6 +117,24 @@ export const LibraryPermissionCard = ({
</Typography>
</Box>
{hasFilesystem && (
<Box mb={2}>
<FormControlLabel
control={
<Switch
checked={allowWriteAccess}
onChange={handleAllowWriteAccessToggle}
color="primary"
/>
}
label={translate('resources.plugin.fields.allowWriteAccess')}
/>
<Typography variant="body2" color="textSecondary">
{translate('resources.plugin.messages.allowWriteAccessHelp')}
</Typography>
</Box>
)}
{!allLibraries && (
<Box className={classes.usersList}>
<Typography variant="subtitle2" gutterBottom>
@@ -166,6 +194,8 @@ LibraryPermissionCard.propTypes = {
classes: PropTypes.object.isRequired,
selectedLibraries: PropTypes.array.isRequired,
allLibraries: PropTypes.bool.isRequired,
allowWriteAccess: PropTypes.bool.isRequired,
onSelectedLibrariesChange: PropTypes.func.isRequired,
onAllLibrariesChange: PropTypes.func.isRequired,
onAllowWriteAccessChange: PropTypes.func.isRequired,
}

View File

@@ -48,8 +48,11 @@ const PluginShowLayout = () => {
// Libraries permission state
const [selectedLibraries, setSelectedLibraries] = useState([])
const [allLibraries, setAllLibraries] = useState(false)
const [allowWriteAccess, setAllowWriteAccess] = useState(false)
const [lastRecordLibraries, setLastRecordLibraries] = useState(null)
const [lastRecordAllLibraries, setLastRecordAllLibraries] = useState(null)
const [lastRecordAllowWriteAccess, setLastRecordAllowWriteAccess] =
useState(null)
// Parse JSON config to object
const jsonToObject = useCallback((jsonString) => {
@@ -99,10 +102,12 @@ const PluginShowLayout = () => {
if (record && !isDirty) {
const recordLibraries = record.libraries || ''
const recordAllLibraries = record.allLibraries || false
const recordAllowWriteAccess = record.allowWriteAccess || false
if (
recordLibraries !== lastRecordLibraries ||
recordAllLibraries !== lastRecordAllLibraries
recordAllLibraries !== lastRecordAllLibraries ||
recordAllowWriteAccess !== lastRecordAllowWriteAccess
) {
try {
setSelectedLibraries(
@@ -112,11 +117,19 @@ const PluginShowLayout = () => {
setSelectedLibraries([])
}
setAllLibraries(recordAllLibraries)
setAllowWriteAccess(recordAllowWriteAccess)
setLastRecordLibraries(recordLibraries)
setLastRecordAllLibraries(recordAllLibraries)
setLastRecordAllowWriteAccess(recordAllowWriteAccess)
}
}
}, [record, lastRecordLibraries, lastRecordAllLibraries, isDirty])
}, [
record,
lastRecordLibraries,
lastRecordAllLibraries,
lastRecordAllowWriteAccess,
isDirty,
])
const handleConfigDataChange = useCallback(
(newData, errors) => {
@@ -152,6 +165,11 @@ const PluginShowLayout = () => {
setIsDirty(true)
}, [])
const handleAllowWriteAccessChange = useCallback((newAllowWriteAccess) => {
setAllowWriteAccess(newAllowWriteAccess)
setIsDirty(true)
}, [])
const [updatePlugin, { loading }] = useUpdate(
'plugin',
record?.id,
@@ -167,6 +185,7 @@ const PluginShowLayout = () => {
setLastRecordAllUsers(null)
setLastRecordLibraries(null)
setLastRecordAllLibraries(null)
setLastRecordAllowWriteAccess(null)
notify('resources.plugin.notifications.updated', 'info')
},
onFailure: (err) => {
@@ -199,6 +218,7 @@ const PluginShowLayout = () => {
if (parsedManifest?.permissions?.library) {
data.libraries = JSON.stringify(selectedLibraries)
data.allLibraries = allLibraries
data.allowWriteAccess = allowWriteAccess
}
updatePlugin('plugin', record.id, data, record)
@@ -210,6 +230,7 @@ const PluginShowLayout = () => {
allUsers,
selectedLibraries,
allLibraries,
allowWriteAccess,
])
// Parse manifest
@@ -294,8 +315,10 @@ const PluginShowLayout = () => {
classes={classes}
selectedLibraries={selectedLibraries}
allLibraries={allLibraries}
allowWriteAccess={allowWriteAccess}
onSelectedLibrariesChange={handleSelectedLibrariesChange}
onAllLibrariesChange={handleAllLibrariesChange}
onAllowWriteAccessChange={handleAllowWriteAccessChange}
/>
<Box display="flex" justifyContent="flex-end">