mirror of
https://github.com/navidrome/navidrome.git
synced 2026-02-28 04:46:18 -05:00
Compare commits
2 Commits
feat/plugi
...
rw-librari
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
b545574c37 | ||
|
|
2c245e5446 |
@@ -0,0 +1,5 @@
|
|||||||
|
-- +goose Up
|
||||||
|
ALTER TABLE plugin ADD COLUMN allow_write_access BOOL NOT NULL DEFAULT false;
|
||||||
|
|
||||||
|
-- +goose Down
|
||||||
|
ALTER TABLE plugin DROP COLUMN allow_write_access;
|
||||||
@@ -3,19 +3,20 @@ package model
|
|||||||
import "time"
|
import "time"
|
||||||
|
|
||||||
type Plugin struct {
|
type Plugin struct {
|
||||||
ID string `structs:"id" json:"id"`
|
ID string `structs:"id" json:"id"`
|
||||||
Path string `structs:"path" json:"path"`
|
Path string `structs:"path" json:"path"`
|
||||||
Manifest string `structs:"manifest" json:"manifest"`
|
Manifest string `structs:"manifest" json:"manifest"`
|
||||||
Config string `structs:"config" json:"config,omitempty"`
|
Config string `structs:"config" json:"config,omitempty"`
|
||||||
Users string `structs:"users" json:"users,omitempty"`
|
Users string `structs:"users" json:"users,omitempty"`
|
||||||
AllUsers bool `structs:"all_users" json:"allUsers,omitempty"`
|
AllUsers bool `structs:"all_users" json:"allUsers,omitempty"`
|
||||||
Libraries string `structs:"libraries" json:"libraries,omitempty"`
|
Libraries string `structs:"libraries" json:"libraries,omitempty"`
|
||||||
AllLibraries bool `structs:"all_libraries" json:"allLibraries,omitempty"`
|
AllLibraries bool `structs:"all_libraries" json:"allLibraries,omitempty"`
|
||||||
Enabled bool `structs:"enabled" json:"enabled"`
|
AllowWriteAccess bool `structs:"allow_write_access" json:"allowWriteAccess,omitempty"`
|
||||||
LastError string `structs:"last_error" json:"lastError,omitempty"`
|
Enabled bool `structs:"enabled" json:"enabled"`
|
||||||
SHA256 string `structs:"sha256" json:"sha256"`
|
LastError string `structs:"last_error" json:"lastError,omitempty"`
|
||||||
CreatedAt time.Time `structs:"created_at" json:"createdAt"`
|
SHA256 string `structs:"sha256" json:"sha256"`
|
||||||
UpdatedAt time.Time `structs:"updated_at" json:"updatedAt"`
|
CreatedAt time.Time `structs:"created_at" json:"createdAt"`
|
||||||
|
UpdatedAt time.Time `structs:"updated_at" json:"updatedAt"`
|
||||||
}
|
}
|
||||||
|
|
||||||
type Plugins []Plugin
|
type Plugins []Plugin
|
||||||
|
|||||||
@@ -79,8 +79,8 @@ func (r *pluginRepository) Put(plugin *model.Plugin) error {
|
|||||||
|
|
||||||
// Upsert using INSERT ... ON CONFLICT for atomic operation
|
// Upsert using INSERT ... ON CONFLICT for atomic operation
|
||||||
_, err := r.db.NewQuery(`
|
_, err := r.db.NewQuery(`
|
||||||
INSERT INTO plugin (id, path, manifest, config, users, all_users, libraries, all_libraries, enabled, last_error, sha256, created_at, updated_at)
|
INSERT INTO plugin (id, path, manifest, config, users, all_users, libraries, all_libraries, allow_write_access, enabled, last_error, sha256, created_at, updated_at)
|
||||||
VALUES ({:id}, {:path}, {:manifest}, {:config}, {:users}, {:all_users}, {:libraries}, {:all_libraries}, {:enabled}, {:last_error}, {:sha256}, {:created_at}, {:updated_at})
|
VALUES ({:id}, {:path}, {:manifest}, {:config}, {:users}, {:all_users}, {:libraries}, {:all_libraries}, {:allow_write_access}, {:enabled}, {:last_error}, {:sha256}, {:created_at}, {:updated_at})
|
||||||
ON CONFLICT(id) DO UPDATE SET
|
ON CONFLICT(id) DO UPDATE SET
|
||||||
path = excluded.path,
|
path = excluded.path,
|
||||||
manifest = excluded.manifest,
|
manifest = excluded.manifest,
|
||||||
@@ -89,24 +89,26 @@ func (r *pluginRepository) Put(plugin *model.Plugin) error {
|
|||||||
all_users = excluded.all_users,
|
all_users = excluded.all_users,
|
||||||
libraries = excluded.libraries,
|
libraries = excluded.libraries,
|
||||||
all_libraries = excluded.all_libraries,
|
all_libraries = excluded.all_libraries,
|
||||||
|
allow_write_access = excluded.allow_write_access,
|
||||||
enabled = excluded.enabled,
|
enabled = excluded.enabled,
|
||||||
last_error = excluded.last_error,
|
last_error = excluded.last_error,
|
||||||
sha256 = excluded.sha256,
|
sha256 = excluded.sha256,
|
||||||
updated_at = excluded.updated_at
|
updated_at = excluded.updated_at
|
||||||
`).Bind(dbx.Params{
|
`).Bind(dbx.Params{
|
||||||
"id": plugin.ID,
|
"id": plugin.ID,
|
||||||
"path": plugin.Path,
|
"path": plugin.Path,
|
||||||
"manifest": plugin.Manifest,
|
"manifest": plugin.Manifest,
|
||||||
"config": plugin.Config,
|
"config": plugin.Config,
|
||||||
"users": plugin.Users,
|
"users": plugin.Users,
|
||||||
"all_users": plugin.AllUsers,
|
"all_users": plugin.AllUsers,
|
||||||
"libraries": plugin.Libraries,
|
"libraries": plugin.Libraries,
|
||||||
"all_libraries": plugin.AllLibraries,
|
"all_libraries": plugin.AllLibraries,
|
||||||
"enabled": plugin.Enabled,
|
"allow_write_access": plugin.AllowWriteAccess,
|
||||||
"last_error": plugin.LastError,
|
"enabled": plugin.Enabled,
|
||||||
"sha256": plugin.SHA256,
|
"last_error": plugin.LastError,
|
||||||
"created_at": time.Now(),
|
"sha256": plugin.SHA256,
|
||||||
"updated_at": plugin.UpdatedAt,
|
"created_at": time.Now(),
|
||||||
|
"updated_at": plugin.UpdatedAt,
|
||||||
}).Execute()
|
}).Execute()
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -1,27 +0,0 @@
|
|||||||
package capabilities
|
|
||||||
|
|
||||||
// TaskWorker provides task execution handling.
|
|
||||||
// This capability allows plugins to receive callbacks when their queued tasks
|
|
||||||
// are ready to execute. Plugins that use the taskqueue host service must
|
|
||||||
// implement this capability.
|
|
||||||
//
|
|
||||||
//nd:capability name=taskworker
|
|
||||||
type TaskWorker interface {
|
|
||||||
// OnTaskExecute is called when a queued task is ready to run.
|
|
||||||
// The returned string is a status/result message stored in the tasks table.
|
|
||||||
// Return an error to trigger retry (if retries are configured).
|
|
||||||
//nd:export name=nd_task_execute
|
|
||||||
OnTaskExecute(TaskExecuteRequest) (string, error)
|
|
||||||
}
|
|
||||||
|
|
||||||
// TaskExecuteRequest is the request provided when a task is ready to execute.
|
|
||||||
type TaskExecuteRequest struct {
|
|
||||||
// QueueName is the name of the queue this task belongs to.
|
|
||||||
QueueName string `json:"queueName"`
|
|
||||||
// TaskID is the unique identifier for this task.
|
|
||||||
TaskID string `json:"taskId"`
|
|
||||||
// Payload is the opaque data provided when the task was enqueued.
|
|
||||||
Payload []byte `json:"payload"`
|
|
||||||
// Attempt is the current attempt number (1-based: first attempt = 1).
|
|
||||||
Attempt int32 `json:"attempt"`
|
|
||||||
}
|
|
||||||
@@ -1,38 +0,0 @@
|
|||||||
version: v1-draft
|
|
||||||
exports:
|
|
||||||
nd_task_execute:
|
|
||||||
description: |-
|
|
||||||
OnTaskExecute is called when a queued task is ready to run.
|
|
||||||
The returned string is a status/result message stored in the tasks table.
|
|
||||||
Return an error to trigger retry (if retries are configured).
|
|
||||||
input:
|
|
||||||
$ref: '#/components/schemas/TaskExecuteRequest'
|
|
||||||
contentType: application/json
|
|
||||||
output:
|
|
||||||
type: string
|
|
||||||
contentType: application/json
|
|
||||||
components:
|
|
||||||
schemas:
|
|
||||||
TaskExecuteRequest:
|
|
||||||
description: TaskExecuteRequest is the request provided when a task is ready to execute.
|
|
||||||
properties:
|
|
||||||
queueName:
|
|
||||||
type: string
|
|
||||||
description: QueueName is the name of the queue this task belongs to.
|
|
||||||
taskId:
|
|
||||||
type: string
|
|
||||||
description: TaskID is the unique identifier for this task.
|
|
||||||
payload:
|
|
||||||
type: array
|
|
||||||
description: Payload is the opaque data provided when the task was enqueued.
|
|
||||||
items:
|
|
||||||
type: object
|
|
||||||
attempt:
|
|
||||||
type: integer
|
|
||||||
format: int32
|
|
||||||
description: 'Attempt is the current attempt number (1-based: first attempt = 1).'
|
|
||||||
required:
|
|
||||||
- queueName
|
|
||||||
- taskId
|
|
||||||
- payload
|
|
||||||
- attempt
|
|
||||||
@@ -1,68 +0,0 @@
|
|||||||
package host
|
|
||||||
|
|
||||||
import "context"
|
|
||||||
|
|
||||||
// TaskInfo holds the current state of a task.
|
|
||||||
type TaskInfo struct {
|
|
||||||
// Status is the current task status: "pending", "running",
|
|
||||||
// "completed", "failed", or "cancelled".
|
|
||||||
Status string `json:"status"`
|
|
||||||
// Message is the status/result message returned by the plugin callback.
|
|
||||||
Message string `json:"message"`
|
|
||||||
// Attempt is the current or last attempt number (1-based).
|
|
||||||
Attempt int32 `json:"attempt"`
|
|
||||||
}
|
|
||||||
|
|
||||||
// QueueConfig holds configuration for a task queue.
|
|
||||||
type QueueConfig struct {
|
|
||||||
// Concurrency is the max number of parallel workers. Default: 1.
|
|
||||||
// Capped by the plugin's manifest maxConcurrency.
|
|
||||||
Concurrency int32 `json:"concurrency"`
|
|
||||||
|
|
||||||
// MaxRetries is the number of times to retry a failed task. Default: 0.
|
|
||||||
MaxRetries int32 `json:"maxRetries"`
|
|
||||||
|
|
||||||
// BackoffMs is the initial backoff between retries in milliseconds.
|
|
||||||
// Doubles each retry (exponential: backoffMs * 2^(attempt-1)). Default: 1000.
|
|
||||||
BackoffMs int64 `json:"backoffMs"`
|
|
||||||
|
|
||||||
// DelayMs is the minimum delay between starting consecutive tasks
|
|
||||||
// in milliseconds. Useful for rate limiting. Default: 0.
|
|
||||||
DelayMs int64 `json:"delayMs"`
|
|
||||||
|
|
||||||
// RetentionMs is how long completed/failed/cancelled tasks are kept
|
|
||||||
// in milliseconds. Default: 3600000 (1h). Min: 60000 (1m). Max: 604800000 (1w).
|
|
||||||
RetentionMs int64 `json:"retentionMs"`
|
|
||||||
}
|
|
||||||
|
|
||||||
// TaskService provides persistent task queues for plugins.
|
|
||||||
//
|
|
||||||
// This service allows plugins to create named queues with configurable concurrency,
|
|
||||||
// retry policies, and rate limiting. Tasks are persisted to SQLite and survive
|
|
||||||
// server restarts. When a task is ready to execute, the host calls the plugin's
|
|
||||||
// nd_task_execute callback function.
|
|
||||||
//
|
|
||||||
//nd:hostservice name=Task permission=taskqueue
|
|
||||||
type TaskService interface {
|
|
||||||
// CreateQueue creates a named task queue with the given configuration.
|
|
||||||
// Zero-value fields in config use sensible defaults.
|
|
||||||
// If a queue with the same name already exists, returns an error.
|
|
||||||
// On startup, this also recovers any stale "running" tasks from a previous crash.
|
|
||||||
//nd:hostfunc
|
|
||||||
CreateQueue(ctx context.Context, name string, config QueueConfig) error
|
|
||||||
|
|
||||||
// Enqueue adds a task to the named queue. Returns the task ID.
|
|
||||||
// payload is opaque bytes passed back to the plugin on execution.
|
|
||||||
//nd:hostfunc
|
|
||||||
Enqueue(ctx context.Context, queueName string, payload []byte) (string, error)
|
|
||||||
|
|
||||||
// Get returns the current state of a task including its status,
|
|
||||||
// message, and attempt count.
|
|
||||||
//nd:hostfunc
|
|
||||||
Get(ctx context.Context, taskID string) (*TaskInfo, error)
|
|
||||||
|
|
||||||
// Cancel cancels a pending task. Returns error if already
|
|
||||||
// running, completed, or failed.
|
|
||||||
//nd:hostfunc
|
|
||||||
Cancel(ctx context.Context, taskID string) error
|
|
||||||
}
|
|
||||||
@@ -1,220 +0,0 @@
|
|||||||
// Code generated by ndpgen. DO NOT EDIT.
|
|
||||||
|
|
||||||
package host
|
|
||||||
|
|
||||||
import (
|
|
||||||
"context"
|
|
||||||
"encoding/json"
|
|
||||||
|
|
||||||
extism "github.com/extism/go-sdk"
|
|
||||||
)
|
|
||||||
|
|
||||||
// TaskCreateQueueRequest is the request type for Task.CreateQueue.
|
|
||||||
type TaskCreateQueueRequest struct {
|
|
||||||
Name string `json:"name"`
|
|
||||||
Config QueueConfig `json:"config"`
|
|
||||||
}
|
|
||||||
|
|
||||||
// TaskCreateQueueResponse is the response type for Task.CreateQueue.
|
|
||||||
type TaskCreateQueueResponse struct {
|
|
||||||
Error string `json:"error,omitempty"`
|
|
||||||
}
|
|
||||||
|
|
||||||
// TaskEnqueueRequest is the request type for Task.Enqueue.
|
|
||||||
type TaskEnqueueRequest struct {
|
|
||||||
QueueName string `json:"queueName"`
|
|
||||||
Payload []byte `json:"payload"`
|
|
||||||
}
|
|
||||||
|
|
||||||
// TaskEnqueueResponse is the response type for Task.Enqueue.
|
|
||||||
type TaskEnqueueResponse struct {
|
|
||||||
Result string `json:"result,omitempty"`
|
|
||||||
Error string `json:"error,omitempty"`
|
|
||||||
}
|
|
||||||
|
|
||||||
// TaskGetRequest is the request type for Task.Get.
|
|
||||||
type TaskGetRequest struct {
|
|
||||||
TaskID string `json:"taskId"`
|
|
||||||
}
|
|
||||||
|
|
||||||
// TaskGetResponse is the response type for Task.Get.
|
|
||||||
type TaskGetResponse struct {
|
|
||||||
Result *TaskInfo `json:"result,omitempty"`
|
|
||||||
Error string `json:"error,omitempty"`
|
|
||||||
}
|
|
||||||
|
|
||||||
// TaskCancelRequest is the request type for Task.Cancel.
|
|
||||||
type TaskCancelRequest struct {
|
|
||||||
TaskID string `json:"taskId"`
|
|
||||||
}
|
|
||||||
|
|
||||||
// TaskCancelResponse is the response type for Task.Cancel.
|
|
||||||
type TaskCancelResponse struct {
|
|
||||||
Error string `json:"error,omitempty"`
|
|
||||||
}
|
|
||||||
|
|
||||||
// RegisterTaskHostFunctions registers Task service host functions.
|
|
||||||
// The returned host functions should be added to the plugin's configuration.
|
|
||||||
func RegisterTaskHostFunctions(service TaskService) []extism.HostFunction {
|
|
||||||
return []extism.HostFunction{
|
|
||||||
newTaskCreateQueueHostFunction(service),
|
|
||||||
newTaskEnqueueHostFunction(service),
|
|
||||||
newTaskGetHostFunction(service),
|
|
||||||
newTaskCancelHostFunction(service),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func newTaskCreateQueueHostFunction(service TaskService) extism.HostFunction {
|
|
||||||
return extism.NewHostFunctionWithStack(
|
|
||||||
"task_createqueue",
|
|
||||||
func(ctx context.Context, p *extism.CurrentPlugin, stack []uint64) {
|
|
||||||
// Read JSON request from plugin memory
|
|
||||||
reqBytes, err := p.ReadBytes(stack[0])
|
|
||||||
if err != nil {
|
|
||||||
taskWriteError(p, stack, err)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
var req TaskCreateQueueRequest
|
|
||||||
if err := json.Unmarshal(reqBytes, &req); err != nil {
|
|
||||||
taskWriteError(p, stack, err)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
// Call the service method
|
|
||||||
if svcErr := service.CreateQueue(ctx, req.Name, req.Config); svcErr != nil {
|
|
||||||
taskWriteError(p, stack, svcErr)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
// Write JSON response to plugin memory
|
|
||||||
resp := TaskCreateQueueResponse{}
|
|
||||||
taskWriteResponse(p, stack, resp)
|
|
||||||
},
|
|
||||||
[]extism.ValueType{extism.ValueTypePTR},
|
|
||||||
[]extism.ValueType{extism.ValueTypePTR},
|
|
||||||
)
|
|
||||||
}
|
|
||||||
|
|
||||||
func newTaskEnqueueHostFunction(service TaskService) extism.HostFunction {
|
|
||||||
return extism.NewHostFunctionWithStack(
|
|
||||||
"task_enqueue",
|
|
||||||
func(ctx context.Context, p *extism.CurrentPlugin, stack []uint64) {
|
|
||||||
// Read JSON request from plugin memory
|
|
||||||
reqBytes, err := p.ReadBytes(stack[0])
|
|
||||||
if err != nil {
|
|
||||||
taskWriteError(p, stack, err)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
var req TaskEnqueueRequest
|
|
||||||
if err := json.Unmarshal(reqBytes, &req); err != nil {
|
|
||||||
taskWriteError(p, stack, err)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
// Call the service method
|
|
||||||
result, svcErr := service.Enqueue(ctx, req.QueueName, req.Payload)
|
|
||||||
if svcErr != nil {
|
|
||||||
taskWriteError(p, stack, svcErr)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
// Write JSON response to plugin memory
|
|
||||||
resp := TaskEnqueueResponse{
|
|
||||||
Result: result,
|
|
||||||
}
|
|
||||||
taskWriteResponse(p, stack, resp)
|
|
||||||
},
|
|
||||||
[]extism.ValueType{extism.ValueTypePTR},
|
|
||||||
[]extism.ValueType{extism.ValueTypePTR},
|
|
||||||
)
|
|
||||||
}
|
|
||||||
|
|
||||||
func newTaskGetHostFunction(service TaskService) extism.HostFunction {
|
|
||||||
return extism.NewHostFunctionWithStack(
|
|
||||||
"task_get",
|
|
||||||
func(ctx context.Context, p *extism.CurrentPlugin, stack []uint64) {
|
|
||||||
// Read JSON request from plugin memory
|
|
||||||
reqBytes, err := p.ReadBytes(stack[0])
|
|
||||||
if err != nil {
|
|
||||||
taskWriteError(p, stack, err)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
var req TaskGetRequest
|
|
||||||
if err := json.Unmarshal(reqBytes, &req); err != nil {
|
|
||||||
taskWriteError(p, stack, err)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
// Call the service method
|
|
||||||
result, svcErr := service.Get(ctx, req.TaskID)
|
|
||||||
if svcErr != nil {
|
|
||||||
taskWriteError(p, stack, svcErr)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
// Write JSON response to plugin memory
|
|
||||||
resp := TaskGetResponse{
|
|
||||||
Result: result,
|
|
||||||
}
|
|
||||||
taskWriteResponse(p, stack, resp)
|
|
||||||
},
|
|
||||||
[]extism.ValueType{extism.ValueTypePTR},
|
|
||||||
[]extism.ValueType{extism.ValueTypePTR},
|
|
||||||
)
|
|
||||||
}
|
|
||||||
|
|
||||||
func newTaskCancelHostFunction(service TaskService) extism.HostFunction {
|
|
||||||
return extism.NewHostFunctionWithStack(
|
|
||||||
"task_cancel",
|
|
||||||
func(ctx context.Context, p *extism.CurrentPlugin, stack []uint64) {
|
|
||||||
// Read JSON request from plugin memory
|
|
||||||
reqBytes, err := p.ReadBytes(stack[0])
|
|
||||||
if err != nil {
|
|
||||||
taskWriteError(p, stack, err)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
var req TaskCancelRequest
|
|
||||||
if err := json.Unmarshal(reqBytes, &req); err != nil {
|
|
||||||
taskWriteError(p, stack, err)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
// Call the service method
|
|
||||||
if svcErr := service.Cancel(ctx, req.TaskID); svcErr != nil {
|
|
||||||
taskWriteError(p, stack, svcErr)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
// Write JSON response to plugin memory
|
|
||||||
resp := TaskCancelResponse{}
|
|
||||||
taskWriteResponse(p, stack, resp)
|
|
||||||
},
|
|
||||||
[]extism.ValueType{extism.ValueTypePTR},
|
|
||||||
[]extism.ValueType{extism.ValueTypePTR},
|
|
||||||
)
|
|
||||||
}
|
|
||||||
|
|
||||||
// taskWriteResponse writes a JSON response to plugin memory.
|
|
||||||
func taskWriteResponse(p *extism.CurrentPlugin, stack []uint64, resp any) {
|
|
||||||
respBytes, err := json.Marshal(resp)
|
|
||||||
if err != nil {
|
|
||||||
taskWriteError(p, stack, err)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
respPtr, err := p.WriteBytes(respBytes)
|
|
||||||
if err != nil {
|
|
||||||
stack[0] = 0
|
|
||||||
return
|
|
||||||
}
|
|
||||||
stack[0] = respPtr
|
|
||||||
}
|
|
||||||
|
|
||||||
// taskWriteError writes an error response to plugin memory.
|
|
||||||
func taskWriteError(p *extism.CurrentPlugin, stack []uint64, err error) {
|
|
||||||
errResp := struct {
|
|
||||||
Error string `json:"error"`
|
|
||||||
}{Error: err.Error()}
|
|
||||||
respBytes, _ := json.Marshal(errResp)
|
|
||||||
respPtr, _ := p.WriteBytes(respBytes)
|
|
||||||
stack[0] = respPtr
|
|
||||||
}
|
|
||||||
@@ -1,566 +0,0 @@
|
|||||||
package plugins
|
|
||||||
|
|
||||||
import (
|
|
||||||
"context"
|
|
||||||
"database/sql"
|
|
||||||
"errors"
|
|
||||||
"fmt"
|
|
||||||
"io"
|
|
||||||
"os"
|
|
||||||
"path/filepath"
|
|
||||||
"sync"
|
|
||||||
"time"
|
|
||||||
|
|
||||||
_ "github.com/mattn/go-sqlite3"
|
|
||||||
"github.com/navidrome/navidrome/conf"
|
|
||||||
"github.com/navidrome/navidrome/log"
|
|
||||||
"github.com/navidrome/navidrome/model/id"
|
|
||||||
"github.com/navidrome/navidrome/plugins/capabilities"
|
|
||||||
"github.com/navidrome/navidrome/plugins/host"
|
|
||||||
"golang.org/x/time/rate"
|
|
||||||
)
|
|
||||||
|
|
||||||
const (
|
|
||||||
defaultConcurrency int32 = 1
|
|
||||||
defaultBackoffMs int64 = 1000
|
|
||||||
defaultRetentionMs int64 = 3_600_000 // 1 hour
|
|
||||||
minRetentionMs int64 = 60_000 // 1 minute
|
|
||||||
maxRetentionMs int64 = 604_800_000 // 1 week
|
|
||||||
maxQueueNameLength = 128
|
|
||||||
maxPayloadSize = 1 * 1024 * 1024 // 1MB
|
|
||||||
maxBackoffMs int64 = 3_600_000 // 1 hour
|
|
||||||
cleanupInterval = 5 * time.Minute
|
|
||||||
pollInterval = 5 * time.Second
|
|
||||||
shutdownTimeout = 10 * time.Second
|
|
||||||
|
|
||||||
taskStatusPending = "pending"
|
|
||||||
taskStatusRunning = "running"
|
|
||||||
taskStatusCompleted = "completed"
|
|
||||||
taskStatusFailed = "failed"
|
|
||||||
taskStatusCancelled = "cancelled"
|
|
||||||
)
|
|
||||||
|
|
||||||
// CapabilityTaskWorker indicates the plugin can receive task execution callbacks.
|
|
||||||
const CapabilityTaskWorker Capability = "TaskWorker"
|
|
||||||
|
|
||||||
const FuncTaskWorkerCallback = "nd_task_execute"
|
|
||||||
|
|
||||||
func init() {
|
|
||||||
registerCapability(CapabilityTaskWorker, FuncTaskWorkerCallback)
|
|
||||||
}
|
|
||||||
|
|
||||||
type queueState struct {
|
|
||||||
config host.QueueConfig
|
|
||||||
signal chan struct{}
|
|
||||||
limiter *rate.Limiter
|
|
||||||
}
|
|
||||||
|
|
||||||
// notifyWorkers sends a non-blocking signal to wake up queue workers.
|
|
||||||
func (qs *queueState) notifyWorkers() {
|
|
||||||
select {
|
|
||||||
case qs.signal <- struct{}{}:
|
|
||||||
default:
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// taskQueueServiceImpl implements host.TaskQueueService with SQLite persistence
|
|
||||||
// and background worker goroutines for task execution.
|
|
||||||
type taskQueueServiceImpl struct {
|
|
||||||
pluginName string
|
|
||||||
manager *Manager
|
|
||||||
maxConcurrency int32
|
|
||||||
db *sql.DB
|
|
||||||
ctx context.Context
|
|
||||||
cancel context.CancelFunc
|
|
||||||
wg sync.WaitGroup
|
|
||||||
mu sync.Mutex
|
|
||||||
queues map[string]*queueState
|
|
||||||
|
|
||||||
// For testing: override how callbacks are invoked
|
|
||||||
invokeCallbackFn func(ctx context.Context, queueName, taskID string, payload []byte, attempt int32) (string, error)
|
|
||||||
}
|
|
||||||
|
|
||||||
// newTaskQueueService creates a new taskQueueServiceImpl with its own SQLite database.
|
|
||||||
func newTaskQueueService(pluginName string, manager *Manager, maxConcurrency int32) (*taskQueueServiceImpl, error) {
|
|
||||||
dataDir := filepath.Join(conf.Server.DataFolder, "plugins", pluginName)
|
|
||||||
if err := os.MkdirAll(dataDir, 0700); err != nil {
|
|
||||||
return nil, fmt.Errorf("creating plugin data directory: %w", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
dbPath := filepath.Join(dataDir, "taskqueue.db")
|
|
||||||
db, err := sql.Open("sqlite3", dbPath+"?_busy_timeout=5000&_journal_mode=WAL&_foreign_keys=off")
|
|
||||||
if err != nil {
|
|
||||||
return nil, fmt.Errorf("opening taskqueue database: %w", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
db.SetMaxOpenConns(3)
|
|
||||||
db.SetMaxIdleConns(1)
|
|
||||||
|
|
||||||
if err := createTaskQueueSchema(db); err != nil {
|
|
||||||
db.Close()
|
|
||||||
return nil, fmt.Errorf("creating taskqueue schema: %w", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
ctx, cancel := context.WithCancel(manager.ctx)
|
|
||||||
|
|
||||||
s := &taskQueueServiceImpl{
|
|
||||||
pluginName: pluginName,
|
|
||||||
manager: manager,
|
|
||||||
maxConcurrency: maxConcurrency,
|
|
||||||
db: db,
|
|
||||||
ctx: ctx,
|
|
||||||
cancel: cancel,
|
|
||||||
queues: make(map[string]*queueState),
|
|
||||||
}
|
|
||||||
s.invokeCallbackFn = s.defaultInvokeCallback
|
|
||||||
|
|
||||||
s.wg.Go(s.cleanupLoop)
|
|
||||||
|
|
||||||
log.Debug("Initialized plugin taskqueue", "plugin", pluginName, "path", dbPath, "maxConcurrency", maxConcurrency)
|
|
||||||
return s, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func createTaskQueueSchema(db *sql.DB) error {
|
|
||||||
_, err := db.Exec(`
|
|
||||||
CREATE TABLE IF NOT EXISTS queues (
|
|
||||||
name TEXT PRIMARY KEY,
|
|
||||||
concurrency INTEGER NOT NULL DEFAULT 1,
|
|
||||||
max_retries INTEGER NOT NULL DEFAULT 0,
|
|
||||||
backoff_ms INTEGER NOT NULL DEFAULT 1000,
|
|
||||||
delay_ms INTEGER NOT NULL DEFAULT 0,
|
|
||||||
retention_ms INTEGER NOT NULL DEFAULT 3600000
|
|
||||||
);
|
|
||||||
|
|
||||||
CREATE TABLE IF NOT EXISTS tasks (
|
|
||||||
id TEXT PRIMARY KEY,
|
|
||||||
queue_name TEXT NOT NULL REFERENCES queues(name),
|
|
||||||
payload BLOB NOT NULL,
|
|
||||||
status TEXT NOT NULL DEFAULT 'pending',
|
|
||||||
attempt INTEGER NOT NULL DEFAULT 0,
|
|
||||||
max_retries INTEGER NOT NULL,
|
|
||||||
next_run_at INTEGER NOT NULL,
|
|
||||||
created_at INTEGER NOT NULL,
|
|
||||||
updated_at INTEGER NOT NULL,
|
|
||||||
message TEXT NOT NULL DEFAULT ''
|
|
||||||
);
|
|
||||||
|
|
||||||
CREATE INDEX IF NOT EXISTS idx_tasks_dequeue ON tasks(queue_name, status, next_run_at);
|
|
||||||
`)
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
// applyConfigDefaults fills zero-value config fields with sensible defaults
|
|
||||||
// and clamps values to valid ranges, logging warnings for clamped values.
|
|
||||||
func (s *taskQueueServiceImpl) applyConfigDefaults(ctx context.Context, name string, config *host.QueueConfig) {
|
|
||||||
if config.Concurrency <= 0 {
|
|
||||||
config.Concurrency = defaultConcurrency
|
|
||||||
}
|
|
||||||
if config.BackoffMs <= 0 {
|
|
||||||
config.BackoffMs = defaultBackoffMs
|
|
||||||
}
|
|
||||||
if config.RetentionMs <= 0 {
|
|
||||||
config.RetentionMs = defaultRetentionMs
|
|
||||||
}
|
|
||||||
|
|
||||||
if config.RetentionMs < minRetentionMs {
|
|
||||||
log.Warn(ctx, "TaskQueue retention clamped to minimum", "plugin", s.pluginName, "queue", name,
|
|
||||||
"requested", config.RetentionMs, "min", minRetentionMs)
|
|
||||||
config.RetentionMs = minRetentionMs
|
|
||||||
}
|
|
||||||
if config.RetentionMs > maxRetentionMs {
|
|
||||||
log.Warn(ctx, "TaskQueue retention clamped to maximum", "plugin", s.pluginName, "queue", name,
|
|
||||||
"requested", config.RetentionMs, "max", maxRetentionMs)
|
|
||||||
config.RetentionMs = maxRetentionMs
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// clampConcurrency reduces config.Concurrency if it exceeds the remaining budget.
|
|
||||||
// Returns an error when the concurrency budget is fully exhausted.
|
|
||||||
// Must be called with s.mu held.
|
|
||||||
func (s *taskQueueServiceImpl) clampConcurrency(ctx context.Context, name string, config *host.QueueConfig) error {
|
|
||||||
var allocated int32
|
|
||||||
for _, qs := range s.queues {
|
|
||||||
allocated += qs.config.Concurrency
|
|
||||||
}
|
|
||||||
available := s.maxConcurrency - allocated
|
|
||||||
if available <= 0 {
|
|
||||||
log.Warn(ctx, "TaskQueue concurrency budget exhausted", "plugin", s.pluginName, "queue", name,
|
|
||||||
"allocated", allocated, "maxConcurrency", s.maxConcurrency)
|
|
||||||
return fmt.Errorf("concurrency budget exhausted (%d/%d allocated)", allocated, s.maxConcurrency)
|
|
||||||
}
|
|
||||||
if config.Concurrency > available {
|
|
||||||
log.Warn(ctx, "TaskQueue concurrency clamped", "plugin", s.pluginName, "queue", name,
|
|
||||||
"requested", config.Concurrency, "available", available, "maxConcurrency", s.maxConcurrency)
|
|
||||||
config.Concurrency = available
|
|
||||||
}
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (s *taskQueueServiceImpl) CreateQueue(ctx context.Context, name string, config host.QueueConfig) error {
|
|
||||||
if len(name) == 0 {
|
|
||||||
return fmt.Errorf("queue name cannot be empty")
|
|
||||||
}
|
|
||||||
if len(name) > maxQueueNameLength {
|
|
||||||
return fmt.Errorf("queue name exceeds maximum length of %d bytes", maxQueueNameLength)
|
|
||||||
}
|
|
||||||
|
|
||||||
s.applyConfigDefaults(ctx, name, &config)
|
|
||||||
|
|
||||||
s.mu.Lock()
|
|
||||||
defer s.mu.Unlock()
|
|
||||||
|
|
||||||
if err := s.clampConcurrency(ctx, name, &config); err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
if _, exists := s.queues[name]; exists {
|
|
||||||
return fmt.Errorf("queue %q already exists", name)
|
|
||||||
}
|
|
||||||
|
|
||||||
// Upsert into queues table (idempotent across restarts)
|
|
||||||
_, err := s.db.ExecContext(ctx, `
|
|
||||||
INSERT INTO queues (name, concurrency, max_retries, backoff_ms, delay_ms, retention_ms)
|
|
||||||
VALUES (?, ?, ?, ?, ?, ?)
|
|
||||||
ON CONFLICT(name) DO UPDATE SET
|
|
||||||
concurrency = excluded.concurrency,
|
|
||||||
max_retries = excluded.max_retries,
|
|
||||||
backoff_ms = excluded.backoff_ms,
|
|
||||||
delay_ms = excluded.delay_ms,
|
|
||||||
retention_ms = excluded.retention_ms
|
|
||||||
`, name, config.Concurrency, config.MaxRetries, config.BackoffMs, config.DelayMs, config.RetentionMs)
|
|
||||||
if err != nil {
|
|
||||||
return fmt.Errorf("creating queue: %w", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
// Reset stale running tasks from previous crash
|
|
||||||
now := time.Now().UnixMilli()
|
|
||||||
_, err = s.db.ExecContext(ctx, `
|
|
||||||
UPDATE tasks SET status = ?, updated_at = ? WHERE queue_name = ? AND status = ?
|
|
||||||
`, taskStatusPending, now, name, taskStatusRunning)
|
|
||||||
if err != nil {
|
|
||||||
return fmt.Errorf("resetting stale tasks: %w", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
qs := &queueState{
|
|
||||||
config: config,
|
|
||||||
signal: make(chan struct{}, 1),
|
|
||||||
}
|
|
||||||
if config.DelayMs > 0 {
|
|
||||||
// Rate limit dispatches to enforce delay between tasks.
|
|
||||||
// Burst of 1 allows one immediate dispatch, then enforces the delay interval.
|
|
||||||
qs.limiter = rate.NewLimiter(rate.Every(time.Duration(config.DelayMs)*time.Millisecond), 1)
|
|
||||||
}
|
|
||||||
s.queues[name] = qs
|
|
||||||
|
|
||||||
for i := int32(0); i < config.Concurrency; i++ {
|
|
||||||
s.wg.Go(func() { s.worker(name, qs) })
|
|
||||||
}
|
|
||||||
|
|
||||||
log.Debug(ctx, "Created task queue", "plugin", s.pluginName, "queue", name,
|
|
||||||
"concurrency", config.Concurrency, "maxRetries", config.MaxRetries,
|
|
||||||
"backoffMs", config.BackoffMs, "delayMs", config.DelayMs, "retentionMs", config.RetentionMs)
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (s *taskQueueServiceImpl) Enqueue(ctx context.Context, queueName string, payload []byte) (string, error) {
|
|
||||||
s.mu.Lock()
|
|
||||||
qs, exists := s.queues[queueName]
|
|
||||||
s.mu.Unlock()
|
|
||||||
|
|
||||||
if !exists {
|
|
||||||
return "", fmt.Errorf("queue %q does not exist", queueName)
|
|
||||||
}
|
|
||||||
if len(payload) > maxPayloadSize {
|
|
||||||
return "", fmt.Errorf("payload size %d exceeds maximum of %d bytes", len(payload), maxPayloadSize)
|
|
||||||
}
|
|
||||||
|
|
||||||
taskID := id.NewRandom()
|
|
||||||
now := time.Now().UnixMilli()
|
|
||||||
|
|
||||||
_, err := s.db.ExecContext(ctx, `
|
|
||||||
INSERT INTO tasks (id, queue_name, payload, status, attempt, max_retries, next_run_at, created_at, updated_at)
|
|
||||||
VALUES (?, ?, ?, ?, 0, ?, ?, ?, ?)
|
|
||||||
`, taskID, queueName, payload, taskStatusPending, qs.config.MaxRetries, now, now, now)
|
|
||||||
if err != nil {
|
|
||||||
return "", fmt.Errorf("enqueuing task: %w", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
qs.notifyWorkers()
|
|
||||||
log.Trace(ctx, "Enqueued task", "plugin", s.pluginName, "queue", queueName, "taskID", taskID)
|
|
||||||
return taskID, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// Get returns the current state of a task.
|
|
||||||
func (s *taskQueueServiceImpl) Get(ctx context.Context, taskID string) (*host.TaskInfo, error) {
|
|
||||||
var info host.TaskInfo
|
|
||||||
err := s.db.QueryRowContext(ctx, `SELECT status, message, attempt FROM tasks WHERE id = ?`, taskID).
|
|
||||||
Scan(&info.Status, &info.Message, &info.Attempt)
|
|
||||||
if errors.Is(err, sql.ErrNoRows) {
|
|
||||||
return nil, fmt.Errorf("task %q not found", taskID)
|
|
||||||
}
|
|
||||||
if err != nil {
|
|
||||||
return nil, fmt.Errorf("getting task info: %w", err)
|
|
||||||
}
|
|
||||||
return &info, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// Cancel cancels a pending task.
|
|
||||||
func (s *taskQueueServiceImpl) Cancel(ctx context.Context, taskID string) error {
|
|
||||||
now := time.Now().UnixMilli()
|
|
||||||
result, err := s.db.ExecContext(ctx, `
|
|
||||||
UPDATE tasks SET status = ?, updated_at = ? WHERE id = ? AND status = ?
|
|
||||||
`, taskStatusCancelled, now, taskID, taskStatusPending)
|
|
||||||
if err != nil {
|
|
||||||
return fmt.Errorf("cancelling task: %w", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
rowsAffected, err := result.RowsAffected()
|
|
||||||
if err != nil {
|
|
||||||
return fmt.Errorf("checking cancel result: %w", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
if rowsAffected == 0 {
|
|
||||||
// Check if task exists at all
|
|
||||||
var status string
|
|
||||||
err := s.db.QueryRowContext(ctx, `SELECT status FROM tasks WHERE id = ?`, taskID).Scan(&status)
|
|
||||||
if errors.Is(err, sql.ErrNoRows) {
|
|
||||||
return fmt.Errorf("task %q not found", taskID)
|
|
||||||
}
|
|
||||||
if err != nil {
|
|
||||||
return fmt.Errorf("checking task existence: %w", err)
|
|
||||||
}
|
|
||||||
return fmt.Errorf("task %q cannot be cancelled (status: %s)", taskID, status)
|
|
||||||
}
|
|
||||||
|
|
||||||
log.Trace(ctx, "Cancelled task", "plugin", s.pluginName, "taskID", taskID)
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// worker is the main loop for a single worker goroutine.
|
|
||||||
func (s *taskQueueServiceImpl) worker(queueName string, qs *queueState) {
|
|
||||||
// Process any existing pending tasks immediately on startup
|
|
||||||
s.drainQueue(queueName, qs)
|
|
||||||
|
|
||||||
ticker := time.NewTicker(pollInterval)
|
|
||||||
defer ticker.Stop()
|
|
||||||
|
|
||||||
for {
|
|
||||||
select {
|
|
||||||
case <-s.ctx.Done():
|
|
||||||
return
|
|
||||||
case <-qs.signal:
|
|
||||||
s.drainQueue(queueName, qs)
|
|
||||||
case <-ticker.C:
|
|
||||||
s.drainQueue(queueName, qs)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func (s *taskQueueServiceImpl) drainQueue(queueName string, qs *queueState) {
|
|
||||||
for s.ctx.Err() == nil && s.processTask(queueName, qs) {
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// processTask dequeues and processes a single task. Returns true if a task was processed.
|
|
||||||
func (s *taskQueueServiceImpl) processTask(queueName string, qs *queueState) bool {
|
|
||||||
now := time.Now().UnixMilli()
|
|
||||||
|
|
||||||
// Atomically dequeue a task
|
|
||||||
var taskID string
|
|
||||||
var payload []byte
|
|
||||||
var attempt, maxRetries int32
|
|
||||||
err := s.db.QueryRowContext(s.ctx, `
|
|
||||||
UPDATE tasks SET status = ?, attempt = attempt + 1, updated_at = ?
|
|
||||||
WHERE id = (
|
|
||||||
SELECT id FROM tasks
|
|
||||||
WHERE queue_name = ? AND status = ? AND next_run_at <= ?
|
|
||||||
ORDER BY next_run_at, created_at LIMIT 1
|
|
||||||
)
|
|
||||||
RETURNING id, payload, attempt, max_retries
|
|
||||||
`, taskStatusRunning, now, queueName, taskStatusPending, now).Scan(&taskID, &payload, &attempt, &maxRetries)
|
|
||||||
if errors.Is(err, sql.ErrNoRows) {
|
|
||||||
return false
|
|
||||||
}
|
|
||||||
if err != nil {
|
|
||||||
log.Error(s.ctx, "Failed to dequeue task", "plugin", s.pluginName, "queue", queueName, err)
|
|
||||||
return false
|
|
||||||
}
|
|
||||||
|
|
||||||
// Enforce delay between task dispatches using a rate limiter.
|
|
||||||
// This is done after dequeue so that empty polls don't consume rate tokens.
|
|
||||||
if qs.limiter != nil {
|
|
||||||
if err := qs.limiter.Wait(s.ctx); err != nil {
|
|
||||||
// Context cancelled during wait — revert task to pending for recovery
|
|
||||||
s.revertTaskToPending(taskID)
|
|
||||||
return false
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// Invoke callback
|
|
||||||
log.Debug(s.ctx, "Executing task", "plugin", s.pluginName, "queue", queueName, "taskID", taskID, "attempt", attempt)
|
|
||||||
message, callbackErr := s.invokeCallbackFn(s.ctx, queueName, taskID, payload, attempt)
|
|
||||||
|
|
||||||
// If context was cancelled (shutdown), revert task to pending for recovery
|
|
||||||
if s.ctx.Err() != nil {
|
|
||||||
s.revertTaskToPending(taskID)
|
|
||||||
return false
|
|
||||||
}
|
|
||||||
|
|
||||||
if callbackErr == nil {
|
|
||||||
s.completeTask(queueName, taskID, message)
|
|
||||||
} else {
|
|
||||||
s.handleTaskFailure(queueName, taskID, attempt, maxRetries, qs, callbackErr, message)
|
|
||||||
}
|
|
||||||
return true
|
|
||||||
}
|
|
||||||
|
|
||||||
func (s *taskQueueServiceImpl) completeTask(queueName, taskID, message string) {
|
|
||||||
now := time.Now().UnixMilli()
|
|
||||||
if _, err := s.db.ExecContext(s.ctx, `UPDATE tasks SET status = ?, message = ?, updated_at = ? WHERE id = ?`, taskStatusCompleted, message, now, taskID); err != nil {
|
|
||||||
log.Error(s.ctx, "Failed to mark task as completed", "plugin", s.pluginName, "taskID", taskID, err)
|
|
||||||
}
|
|
||||||
log.Debug(s.ctx, "Task completed", "plugin", s.pluginName, "queue", queueName, "taskID", taskID)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (s *taskQueueServiceImpl) handleTaskFailure(queueName, taskID string, attempt, maxRetries int32, qs *queueState, callbackErr error, message string) {
|
|
||||||
log.Warn(s.ctx, "Task execution failed", "plugin", s.pluginName, "queue", queueName,
|
|
||||||
"taskID", taskID, "attempt", attempt, "maxRetries", maxRetries, "err", callbackErr)
|
|
||||||
|
|
||||||
// Use error message as fallback if no message was provided
|
|
||||||
if message == "" {
|
|
||||||
message = callbackErr.Error()
|
|
||||||
}
|
|
||||||
|
|
||||||
now := time.Now().UnixMilli()
|
|
||||||
if attempt > maxRetries {
|
|
||||||
if _, err := s.db.ExecContext(s.ctx, `UPDATE tasks SET status = ?, message = ?, updated_at = ? WHERE id = ?`, taskStatusFailed, message, now, taskID); err != nil {
|
|
||||||
log.Error(s.ctx, "Failed to mark task as failed", "plugin", s.pluginName, "taskID", taskID, err)
|
|
||||||
}
|
|
||||||
log.Warn(s.ctx, "Task failed after all retries", "plugin", s.pluginName, "queue", queueName, "taskID", taskID)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
// Exponential backoff: backoffMs * 2^(attempt-1)
|
|
||||||
backoff := qs.config.BackoffMs << (attempt - 1)
|
|
||||||
if backoff <= 0 || backoff > maxBackoffMs {
|
|
||||||
backoff = maxBackoffMs
|
|
||||||
}
|
|
||||||
nextRunAt := now + backoff
|
|
||||||
if _, err := s.db.ExecContext(s.ctx, `
|
|
||||||
UPDATE tasks SET status = ?, next_run_at = ?, updated_at = ? WHERE id = ?
|
|
||||||
`, taskStatusPending, nextRunAt, now, taskID); err != nil {
|
|
||||||
log.Error(s.ctx, "Failed to reschedule task for retry", "plugin", s.pluginName, "taskID", taskID, err)
|
|
||||||
}
|
|
||||||
|
|
||||||
// Wake worker after backoff expires
|
|
||||||
time.AfterFunc(time.Duration(backoff)*time.Millisecond, func() {
|
|
||||||
qs.notifyWorkers()
|
|
||||||
})
|
|
||||||
}
|
|
||||||
|
|
||||||
// revertTaskToPending puts a running task back to pending status and decrements the attempt
|
|
||||||
// counter (used during shutdown to ensure the interrupted attempt doesn't count).
|
|
||||||
func (s *taskQueueServiceImpl) revertTaskToPending(taskID string) {
|
|
||||||
now := time.Now().UnixMilli()
|
|
||||||
_, err := s.db.Exec(`UPDATE tasks SET status = ?, attempt = MAX(attempt - 1, 0), updated_at = ? WHERE id = ? AND status = ?`, taskStatusPending, now, taskID, taskStatusRunning)
|
|
||||||
if err != nil {
|
|
||||||
log.Error("Failed to revert task to pending", "plugin", s.pluginName, "taskID", taskID, err)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// defaultInvokeCallback calls the plugin's nd_task_execute function.
|
|
||||||
func (s *taskQueueServiceImpl) defaultInvokeCallback(ctx context.Context, queueName, taskID string, payload []byte, attempt int32) (string, error) {
|
|
||||||
s.manager.mu.RLock()
|
|
||||||
p, ok := s.manager.plugins[s.pluginName]
|
|
||||||
s.manager.mu.RUnlock()
|
|
||||||
|
|
||||||
if !ok {
|
|
||||||
return "", fmt.Errorf("plugin %s not loaded", s.pluginName)
|
|
||||||
}
|
|
||||||
|
|
||||||
input := capabilities.TaskExecuteRequest{
|
|
||||||
QueueName: queueName,
|
|
||||||
TaskID: taskID,
|
|
||||||
Payload: payload,
|
|
||||||
Attempt: attempt,
|
|
||||||
}
|
|
||||||
|
|
||||||
message, err := callPluginFunction[capabilities.TaskExecuteRequest, string](ctx, p, FuncTaskWorkerCallback, input)
|
|
||||||
if err != nil {
|
|
||||||
return "", err
|
|
||||||
}
|
|
||||||
return message, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// cleanupLoop periodically removes terminal tasks past their retention period.
|
|
||||||
func (s *taskQueueServiceImpl) cleanupLoop() {
|
|
||||||
ticker := time.NewTicker(cleanupInterval)
|
|
||||||
defer ticker.Stop()
|
|
||||||
|
|
||||||
for {
|
|
||||||
select {
|
|
||||||
case <-s.ctx.Done():
|
|
||||||
return
|
|
||||||
case <-ticker.C:
|
|
||||||
s.runCleanup()
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// runCleanup deletes terminal tasks past their retention period.
|
|
||||||
func (s *taskQueueServiceImpl) runCleanup() {
|
|
||||||
s.mu.Lock()
|
|
||||||
queues := make(map[string]*queueState, len(s.queues))
|
|
||||||
for k, v := range s.queues {
|
|
||||||
queues[k] = v
|
|
||||||
}
|
|
||||||
s.mu.Unlock()
|
|
||||||
|
|
||||||
now := time.Now().UnixMilli()
|
|
||||||
for name, qs := range queues {
|
|
||||||
result, err := s.db.ExecContext(s.ctx, `
|
|
||||||
DELETE FROM tasks WHERE queue_name = ? AND status IN (?, ?, ?) AND updated_at + ? < ?
|
|
||||||
`, name, taskStatusCompleted, taskStatusFailed, taskStatusCancelled, qs.config.RetentionMs, now)
|
|
||||||
if err != nil {
|
|
||||||
log.Error(s.ctx, "Failed to cleanup tasks", "plugin", s.pluginName, "queue", name, err)
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
if deleted, _ := result.RowsAffected(); deleted > 0 {
|
|
||||||
log.Debug(s.ctx, "Cleaned up terminal tasks", "plugin", s.pluginName, "queue", name, "deleted", deleted)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// Close shuts down the task queue service, stopping all workers and closing the database.
|
|
||||||
func (s *taskQueueServiceImpl) Close() error {
|
|
||||||
// Cancel context to signal all goroutines
|
|
||||||
s.cancel()
|
|
||||||
|
|
||||||
// Wait for goroutines with timeout
|
|
||||||
done := make(chan struct{})
|
|
||||||
go func() {
|
|
||||||
s.wg.Wait()
|
|
||||||
close(done)
|
|
||||||
}()
|
|
||||||
|
|
||||||
select {
|
|
||||||
case <-done:
|
|
||||||
case <-time.After(shutdownTimeout):
|
|
||||||
log.Warn("TaskQueue shutdown timed out", "plugin", s.pluginName)
|
|
||||||
}
|
|
||||||
|
|
||||||
// Mark running tasks as pending for recovery on next startup
|
|
||||||
if s.db != nil {
|
|
||||||
now := time.Now().UnixMilli()
|
|
||||||
if _, err := s.db.Exec(`UPDATE tasks SET status = ?, updated_at = ? WHERE status = ?`, taskStatusPending, now, taskStatusRunning); err != nil {
|
|
||||||
log.Error("Failed to reset running tasks on shutdown", "plugin", s.pluginName, err)
|
|
||||||
}
|
|
||||||
log.Debug("Closing plugin taskqueue", "plugin", s.pluginName)
|
|
||||||
return s.db.Close()
|
|
||||||
}
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// Compile-time verification
|
|
||||||
var _ host.TaskService = (*taskQueueServiceImpl)(nil)
|
|
||||||
var _ io.Closer = (*taskQueueServiceImpl)(nil)
|
|
||||||
File diff suppressed because it is too large
Load Diff
@@ -428,10 +428,11 @@ func (m *Manager) UpdatePluginUsers(ctx context.Context, id, usersJSON string, a
|
|||||||
// If the plugin is enabled, it will be reloaded with the new settings.
|
// If the plugin is enabled, it will be reloaded with the new settings.
|
||||||
// If the plugin requires library permission and no libraries are configured (and allLibraries is false),
|
// If the plugin requires library permission and no libraries are configured (and allLibraries is false),
|
||||||
// the plugin will be automatically disabled.
|
// the plugin will be automatically disabled.
|
||||||
func (m *Manager) UpdatePluginLibraries(ctx context.Context, id, librariesJSON string, allLibraries bool) error {
|
func (m *Manager) UpdatePluginLibraries(ctx context.Context, id, librariesJSON string, allLibraries, allowWriteAccess bool) error {
|
||||||
return m.updatePluginSettings(ctx, id, func(p *model.Plugin) {
|
return m.updatePluginSettings(ctx, id, func(p *model.Plugin) {
|
||||||
p.Libraries = librariesJSON
|
p.Libraries = librariesJSON
|
||||||
p.AllLibraries = allLibraries
|
p.AllLibraries = allLibraries
|
||||||
|
p.AllowWriteAccess = allowWriteAccess
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -128,23 +128,6 @@ var hostServices = []hostServiceEntry{
|
|||||||
return host.RegisterHTTPHostFunctions(service), nil
|
return host.RegisterHTTPHostFunctions(service), nil
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
{
|
|
||||||
name: "Task",
|
|
||||||
hasPermission: func(p *Permissions) bool { return p != nil && p.Taskqueue != nil },
|
|
||||||
create: func(ctx *serviceContext) ([]extism.HostFunction, io.Closer) {
|
|
||||||
perm := ctx.permissions.Taskqueue
|
|
||||||
maxConcurrency := int32(1)
|
|
||||||
if perm.MaxConcurrency > 0 {
|
|
||||||
maxConcurrency = int32(perm.MaxConcurrency)
|
|
||||||
}
|
|
||||||
service, err := newTaskQueueService(ctx.pluginName, ctx.manager, maxConcurrency)
|
|
||||||
if err != nil {
|
|
||||||
log.Error("Failed to create Task service", "plugin", ctx.pluginName, err)
|
|
||||||
return nil, nil
|
|
||||||
}
|
|
||||||
return host.RegisterTaskHostFunctions(service), service
|
|
||||||
},
|
|
||||||
},
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// extractManifest reads manifest from an .ndp package and computes its SHA-256 hash.
|
// extractManifest reads manifest from an .ndp package and computes its SHA-256 hash.
|
||||||
@@ -243,6 +226,8 @@ func (m *Manager) loadEnabledPlugins(ctx context.Context) error {
|
|||||||
// loadPluginWithConfig loads a plugin with configuration from DB.
|
// loadPluginWithConfig loads a plugin with configuration from DB.
|
||||||
// The p.Path should point to an .ndp package file.
|
// The p.Path should point to an .ndp package file.
|
||||||
func (m *Manager) loadPluginWithConfig(p *model.Plugin) error {
|
func (m *Manager) loadPluginWithConfig(p *model.Plugin) error {
|
||||||
|
ctx := log.NewContext(m.ctx, "plugin", p.ID)
|
||||||
|
|
||||||
if m.stopped.Load() {
|
if m.stopped.Load() {
|
||||||
return fmt.Errorf("manager is stopped")
|
return fmt.Errorf("manager is stopped")
|
||||||
}
|
}
|
||||||
@@ -300,27 +285,13 @@ func (m *Manager) loadPluginWithConfig(p *model.Plugin) error {
|
|||||||
|
|
||||||
// Configure filesystem access for library permission
|
// Configure filesystem access for library permission
|
||||||
if pkg.Manifest.Permissions != nil && pkg.Manifest.Permissions.Library != nil && pkg.Manifest.Permissions.Library.Filesystem {
|
if pkg.Manifest.Permissions != nil && pkg.Manifest.Permissions.Library != nil && pkg.Manifest.Permissions.Library.Filesystem {
|
||||||
adminCtx := adminContext(m.ctx)
|
adminCtx := adminContext(ctx)
|
||||||
libraries, err := m.ds.Library(adminCtx).GetAll()
|
libraries, err := m.ds.Library(adminCtx).GetAll()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("failed to get libraries for filesystem access: %w", err)
|
return fmt.Errorf("failed to get libraries for filesystem access: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Build a set of allowed library IDs for fast lookup
|
allowedPaths := buildAllowedPaths(ctx, libraries, allowedLibraries, p.AllLibraries, p.AllowWriteAccess)
|
||||||
allowedLibrarySet := make(map[int]struct{}, len(allowedLibraries))
|
|
||||||
for _, id := range allowedLibraries {
|
|
||||||
allowedLibrarySet[id] = struct{}{}
|
|
||||||
}
|
|
||||||
|
|
||||||
allowedPaths := make(map[string]string)
|
|
||||||
for _, lib := range libraries {
|
|
||||||
// Only mount if allLibraries is true or library is in the allowed list
|
|
||||||
if p.AllLibraries {
|
|
||||||
allowedPaths[lib.Path] = toPluginMountPoint(int32(lib.ID))
|
|
||||||
} else if _, ok := allowedLibrarySet[lib.ID]; ok {
|
|
||||||
allowedPaths[lib.Path] = toPluginMountPoint(int32(lib.ID))
|
|
||||||
}
|
|
||||||
}
|
|
||||||
pluginManifest.AllowedPaths = allowedPaths
|
pluginManifest.AllowedPaths = allowedPaths
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -356,7 +327,7 @@ func (m *Manager) loadPluginWithConfig(p *model.Plugin) error {
|
|||||||
// Enable experimental threads if requested in manifest
|
// Enable experimental threads if requested in manifest
|
||||||
if pkg.Manifest.HasExperimentalThreads() {
|
if pkg.Manifest.HasExperimentalThreads() {
|
||||||
runtimeConfig = runtimeConfig.WithCoreFeatures(api.CoreFeaturesV2 | experimental.CoreFeaturesThreads)
|
runtimeConfig = runtimeConfig.WithCoreFeatures(api.CoreFeaturesV2 | experimental.CoreFeaturesThreads)
|
||||||
log.Debug(m.ctx, "Enabling experimental threads support", "plugin", p.ID)
|
log.Debug(ctx, "Enabling experimental threads support")
|
||||||
}
|
}
|
||||||
|
|
||||||
extismConfig := extism.PluginConfig{
|
extismConfig := extism.PluginConfig{
|
||||||
@@ -364,24 +335,24 @@ func (m *Manager) loadPluginWithConfig(p *model.Plugin) error {
|
|||||||
RuntimeConfig: runtimeConfig,
|
RuntimeConfig: runtimeConfig,
|
||||||
EnableHttpResponseHeaders: true,
|
EnableHttpResponseHeaders: true,
|
||||||
}
|
}
|
||||||
compiled, err := extism.NewCompiledPlugin(m.ctx, pluginManifest, extismConfig, hostFunctions)
|
compiled, err := extism.NewCompiledPlugin(ctx, pluginManifest, extismConfig, hostFunctions)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("compiling plugin: %w", err)
|
return fmt.Errorf("compiling plugin: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Create instance to detect capabilities
|
// Create instance to detect capabilities
|
||||||
instance, err := compiled.Instance(m.ctx, extism.PluginInstanceConfig{})
|
instance, err := compiled.Instance(ctx, extism.PluginInstanceConfig{})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
compiled.Close(m.ctx)
|
compiled.Close(ctx)
|
||||||
return fmt.Errorf("creating instance: %w", err)
|
return fmt.Errorf("creating instance: %w", err)
|
||||||
}
|
}
|
||||||
instance.SetLogger(extismLogger(p.ID))
|
instance.SetLogger(extismLogger(p.ID))
|
||||||
capabilities := detectCapabilities(instance)
|
capabilities := detectCapabilities(instance)
|
||||||
instance.Close(m.ctx)
|
instance.Close(ctx)
|
||||||
|
|
||||||
// Validate manifest against detected capabilities
|
// Validate manifest against detected capabilities
|
||||||
if err := ValidateWithCapabilities(pkg.Manifest, capabilities); err != nil {
|
if err := ValidateWithCapabilities(pkg.Manifest, capabilities); err != nil {
|
||||||
compiled.Close(m.ctx)
|
compiled.Close(ctx)
|
||||||
return fmt.Errorf("manifest validation: %w", err)
|
return fmt.Errorf("manifest validation: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -400,7 +371,7 @@ func (m *Manager) loadPluginWithConfig(p *model.Plugin) error {
|
|||||||
m.mu.Unlock()
|
m.mu.Unlock()
|
||||||
|
|
||||||
// Call plugin init function
|
// Call plugin init function
|
||||||
callPluginInit(m.ctx, m.plugins[p.ID])
|
callPluginInit(ctx, m.plugins[p.ID])
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
@@ -431,3 +402,29 @@ func parsePluginConfig(configJSON string) (map[string]string, error) {
|
|||||||
}
|
}
|
||||||
return pluginConfig, nil
|
return pluginConfig, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// buildAllowedPaths constructs the extism AllowedPaths map for filesystem access.
|
||||||
|
// When allowWriteAccess is false (default), paths are prefixed with "ro:" for read-only.
|
||||||
|
// Only libraries that match the allowed set (or all libraries if allLibraries is true) are included.
|
||||||
|
func buildAllowedPaths(ctx context.Context, libraries model.Libraries, allowedLibraryIDs []int, allLibraries, allowWriteAccess bool) map[string]string {
|
||||||
|
allowedLibrarySet := make(map[int]struct{}, len(allowedLibraryIDs))
|
||||||
|
for _, id := range allowedLibraryIDs {
|
||||||
|
allowedLibrarySet[id] = struct{}{}
|
||||||
|
}
|
||||||
|
|
||||||
|
allowedPaths := make(map[string]string)
|
||||||
|
for _, lib := range libraries {
|
||||||
|
_, allowed := allowedLibrarySet[lib.ID]
|
||||||
|
if allLibraries || allowed {
|
||||||
|
mountPoint := toPluginMountPoint(int32(lib.ID))
|
||||||
|
if allowWriteAccess {
|
||||||
|
log.Info(ctx, "Granting read-write filesystem access to library", "libraryID", lib.ID, "mountPoint", mountPoint)
|
||||||
|
allowedPaths[lib.Path] = mountPoint
|
||||||
|
} else {
|
||||||
|
log.Debug(ctx, "Granting read-only filesystem access to library", "libraryID", lib.ID, "mountPoint", mountPoint)
|
||||||
|
allowedPaths["ro:"+lib.Path] = mountPoint
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return allowedPaths
|
||||||
|
}
|
||||||
|
|||||||
@@ -3,6 +3,7 @@
|
|||||||
package plugins
|
package plugins
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"github.com/navidrome/navidrome/model"
|
||||||
. "github.com/onsi/ginkgo/v2"
|
. "github.com/onsi/ginkgo/v2"
|
||||||
. "github.com/onsi/gomega"
|
. "github.com/onsi/gomega"
|
||||||
)
|
)
|
||||||
@@ -58,3 +59,66 @@ var _ = Describe("parsePluginConfig", func() {
|
|||||||
Expect(result).ToNot(BeNil())
|
Expect(result).ToNot(BeNil())
|
||||||
})
|
})
|
||||||
})
|
})
|
||||||
|
|
||||||
|
var _ = Describe("buildAllowedPaths", func() {
|
||||||
|
var libraries model.Libraries
|
||||||
|
|
||||||
|
BeforeEach(func() {
|
||||||
|
libraries = model.Libraries{
|
||||||
|
{ID: 1, Path: "/music/library1"},
|
||||||
|
{ID: 2, Path: "/music/library2"},
|
||||||
|
{ID: 3, Path: "/music/library3"},
|
||||||
|
}
|
||||||
|
})
|
||||||
|
|
||||||
|
Context("read-only (default)", func() {
|
||||||
|
It("mounts all libraries with ro: prefix when allLibraries is true", func() {
|
||||||
|
result := buildAllowedPaths(nil, libraries, nil, true, false)
|
||||||
|
Expect(result).To(HaveLen(3))
|
||||||
|
Expect(result).To(HaveKeyWithValue("ro:/music/library1", "/libraries/1"))
|
||||||
|
Expect(result).To(HaveKeyWithValue("ro:/music/library2", "/libraries/2"))
|
||||||
|
Expect(result).To(HaveKeyWithValue("ro:/music/library3", "/libraries/3"))
|
||||||
|
})
|
||||||
|
|
||||||
|
It("mounts only selected libraries with ro: prefix", func() {
|
||||||
|
result := buildAllowedPaths(nil, libraries, []int{1, 3}, false, false)
|
||||||
|
Expect(result).To(HaveLen(2))
|
||||||
|
Expect(result).To(HaveKeyWithValue("ro:/music/library1", "/libraries/1"))
|
||||||
|
Expect(result).To(HaveKeyWithValue("ro:/music/library3", "/libraries/3"))
|
||||||
|
Expect(result).ToNot(HaveKey("ro:/music/library2"))
|
||||||
|
})
|
||||||
|
})
|
||||||
|
|
||||||
|
Context("read-write (allowWriteAccess=true)", func() {
|
||||||
|
It("mounts all libraries without ro: prefix when allLibraries is true", func() {
|
||||||
|
result := buildAllowedPaths(nil, libraries, nil, true, true)
|
||||||
|
Expect(result).To(HaveLen(3))
|
||||||
|
Expect(result).To(HaveKeyWithValue("/music/library1", "/libraries/1"))
|
||||||
|
Expect(result).To(HaveKeyWithValue("/music/library2", "/libraries/2"))
|
||||||
|
Expect(result).To(HaveKeyWithValue("/music/library3", "/libraries/3"))
|
||||||
|
})
|
||||||
|
|
||||||
|
It("mounts only selected libraries without ro: prefix", func() {
|
||||||
|
result := buildAllowedPaths(nil, libraries, []int{2}, false, true)
|
||||||
|
Expect(result).To(HaveLen(1))
|
||||||
|
Expect(result).To(HaveKeyWithValue("/music/library2", "/libraries/2"))
|
||||||
|
})
|
||||||
|
})
|
||||||
|
|
||||||
|
Context("edge cases", func() {
|
||||||
|
It("returns empty map when no libraries match", func() {
|
||||||
|
result := buildAllowedPaths(nil, libraries, []int{99}, false, false)
|
||||||
|
Expect(result).To(BeEmpty())
|
||||||
|
})
|
||||||
|
|
||||||
|
It("returns empty map when libraries list is empty", func() {
|
||||||
|
result := buildAllowedPaths(nil, nil, []int{1}, false, false)
|
||||||
|
Expect(result).To(BeEmpty())
|
||||||
|
})
|
||||||
|
|
||||||
|
It("returns empty map when allLibraries is false and no IDs provided", func() {
|
||||||
|
result := buildAllowedPaths(nil, libraries, nil, false, false)
|
||||||
|
Expect(result).To(BeEmpty())
|
||||||
|
})
|
||||||
|
})
|
||||||
|
})
|
||||||
|
|||||||
@@ -110,9 +110,6 @@
|
|||||||
},
|
},
|
||||||
"users": {
|
"users": {
|
||||||
"$ref": "#/$defs/UsersPermission"
|
"$ref": "#/$defs/UsersPermission"
|
||||||
},
|
|
||||||
"taskqueue": {
|
|
||||||
"$ref": "#/$defs/TaskQueuePermission"
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
},
|
},
|
||||||
@@ -227,23 +224,6 @@
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
},
|
},
|
||||||
"TaskQueuePermission": {
|
|
||||||
"type": "object",
|
|
||||||
"description": "Task queue permissions for background task processing",
|
|
||||||
"additionalProperties": false,
|
|
||||||
"properties": {
|
|
||||||
"reason": {
|
|
||||||
"type": "string",
|
|
||||||
"description": "Explanation for why task queue access is needed"
|
|
||||||
},
|
|
||||||
"maxConcurrency": {
|
|
||||||
"type": "integer",
|
|
||||||
"description": "Maximum total concurrent workers across all queues. Default: 1",
|
|
||||||
"minimum": 1,
|
|
||||||
"default": 1
|
|
||||||
}
|
|
||||||
}
|
|
||||||
},
|
|
||||||
"UsersPermission": {
|
"UsersPermission": {
|
||||||
"type": "object",
|
"type": "object",
|
||||||
"description": "Users service permissions for accessing user information",
|
"description": "Users service permissions for accessing user information",
|
||||||
|
|||||||
@@ -72,13 +72,6 @@ func ValidateWithCapabilities(m *Manifest, capabilities []Capability) error {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Task (taskqueue) permission requires TaskWorker capability
|
|
||||||
if m.Permissions != nil && m.Permissions.Taskqueue != nil {
|
|
||||||
if !hasCapability(capabilities, CapabilityTaskWorker) {
|
|
||||||
return fmt.Errorf("'taskqueue' permission requires plugin to export '%s' function", FuncTaskWorkerCallback)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -181,9 +181,6 @@ type Permissions struct {
|
|||||||
// Subsonicapi corresponds to the JSON schema field "subsonicapi".
|
// Subsonicapi corresponds to the JSON schema field "subsonicapi".
|
||||||
Subsonicapi *SubsonicAPIPermission `json:"subsonicapi,omitempty" yaml:"subsonicapi,omitempty" mapstructure:"subsonicapi,omitempty"`
|
Subsonicapi *SubsonicAPIPermission `json:"subsonicapi,omitempty" yaml:"subsonicapi,omitempty" mapstructure:"subsonicapi,omitempty"`
|
||||||
|
|
||||||
// Taskqueue corresponds to the JSON schema field "taskqueue".
|
|
||||||
Taskqueue *TaskQueuePermission `json:"taskqueue,omitempty" yaml:"taskqueue,omitempty" mapstructure:"taskqueue,omitempty"`
|
|
||||||
|
|
||||||
// Users corresponds to the JSON schema field "users".
|
// Users corresponds to the JSON schema field "users".
|
||||||
Users *UsersPermission `json:"users,omitempty" yaml:"users,omitempty" mapstructure:"users,omitempty"`
|
Users *UsersPermission `json:"users,omitempty" yaml:"users,omitempty" mapstructure:"users,omitempty"`
|
||||||
|
|
||||||
@@ -203,36 +200,6 @@ type SubsonicAPIPermission struct {
|
|||||||
Reason *string `json:"reason,omitempty" yaml:"reason,omitempty" mapstructure:"reason,omitempty"`
|
Reason *string `json:"reason,omitempty" yaml:"reason,omitempty" mapstructure:"reason,omitempty"`
|
||||||
}
|
}
|
||||||
|
|
||||||
// Task queue permissions for background task processing
|
|
||||||
type TaskQueuePermission struct {
|
|
||||||
// Maximum total concurrent workers across all queues. Default: 1
|
|
||||||
MaxConcurrency int `json:"maxConcurrency,omitempty" yaml:"maxConcurrency,omitempty" mapstructure:"maxConcurrency,omitempty"`
|
|
||||||
|
|
||||||
// Explanation for why task queue access is needed
|
|
||||||
Reason *string `json:"reason,omitempty" yaml:"reason,omitempty" mapstructure:"reason,omitempty"`
|
|
||||||
}
|
|
||||||
|
|
||||||
// UnmarshalJSON implements json.Unmarshaler.
|
|
||||||
func (j *TaskQueuePermission) UnmarshalJSON(value []byte) error {
|
|
||||||
var raw map[string]interface{}
|
|
||||||
if err := json.Unmarshal(value, &raw); err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
type Plain TaskQueuePermission
|
|
||||||
var plain Plain
|
|
||||||
if err := json.Unmarshal(value, &plain); err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
if v, ok := raw["maxConcurrency"]; !ok || v == nil {
|
|
||||||
plain.MaxConcurrency = 1.0
|
|
||||||
}
|
|
||||||
if 1 > plain.MaxConcurrency {
|
|
||||||
return fmt.Errorf("field %s: must be >= %v", "maxConcurrency", 1)
|
|
||||||
}
|
|
||||||
*j = TaskQueuePermission(plain)
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// Enable experimental WebAssembly threads support
|
// Enable experimental WebAssembly threads support
|
||||||
type ThreadsFeature struct {
|
type ThreadsFeature struct {
|
||||||
// Explanation for why threads support is needed
|
// Explanation for why threads support is needed
|
||||||
|
|||||||
@@ -43,7 +43,6 @@ The following host services are available:
|
|||||||
- Library: provides access to music library metadata for plugins.
|
- Library: provides access to music library metadata for plugins.
|
||||||
- Scheduler: provides task scheduling capabilities for plugins.
|
- Scheduler: provides task scheduling capabilities for plugins.
|
||||||
- SubsonicAPI: provides access to Navidrome's Subsonic API from plugins.
|
- SubsonicAPI: provides access to Navidrome's Subsonic API from plugins.
|
||||||
- Task: provides persistent task queues for plugins.
|
|
||||||
- Users: provides access to user information for plugins.
|
- Users: provides access to user information for plugins.
|
||||||
- WebSocket: provides WebSocket communication capabilities for plugins.
|
- WebSocket: provides WebSocket communication capabilities for plugins.
|
||||||
|
|
||||||
|
|||||||
@@ -1,227 +0,0 @@
|
|||||||
// Code generated by ndpgen. DO NOT EDIT.
|
|
||||||
//
|
|
||||||
// This file contains client wrappers for the Task host service.
|
|
||||||
// It is intended for use in Navidrome plugins built with TinyGo.
|
|
||||||
//
|
|
||||||
//go:build wasip1
|
|
||||||
|
|
||||||
package host
|
|
||||||
|
|
||||||
import (
|
|
||||||
"encoding/json"
|
|
||||||
"errors"
|
|
||||||
|
|
||||||
"github.com/navidrome/navidrome/plugins/pdk/go/pdk"
|
|
||||||
)
|
|
||||||
|
|
||||||
// QueueConfig represents the QueueConfig data structure.
|
|
||||||
// QueueConfig holds configuration for a task queue.
|
|
||||||
type QueueConfig struct {
|
|
||||||
Concurrency int32 `json:"concurrency"`
|
|
||||||
MaxRetries int32 `json:"maxRetries"`
|
|
||||||
BackoffMs int64 `json:"backoffMs"`
|
|
||||||
DelayMs int64 `json:"delayMs"`
|
|
||||||
RetentionMs int64 `json:"retentionMs"`
|
|
||||||
}
|
|
||||||
|
|
||||||
// TaskInfo represents the TaskInfo data structure.
|
|
||||||
// TaskInfo holds the current state of a task.
|
|
||||||
type TaskInfo struct {
|
|
||||||
Status string `json:"status"`
|
|
||||||
Message string `json:"message"`
|
|
||||||
Attempt int32 `json:"attempt"`
|
|
||||||
}
|
|
||||||
|
|
||||||
// task_createqueue is the host function provided by Navidrome.
|
|
||||||
//
|
|
||||||
//go:wasmimport extism:host/user task_createqueue
|
|
||||||
func task_createqueue(uint64) uint64
|
|
||||||
|
|
||||||
// task_enqueue is the host function provided by Navidrome.
|
|
||||||
//
|
|
||||||
//go:wasmimport extism:host/user task_enqueue
|
|
||||||
func task_enqueue(uint64) uint64
|
|
||||||
|
|
||||||
// task_get is the host function provided by Navidrome.
|
|
||||||
//
|
|
||||||
//go:wasmimport extism:host/user task_get
|
|
||||||
func task_get(uint64) uint64
|
|
||||||
|
|
||||||
// task_cancel is the host function provided by Navidrome.
|
|
||||||
//
|
|
||||||
//go:wasmimport extism:host/user task_cancel
|
|
||||||
func task_cancel(uint64) uint64
|
|
||||||
|
|
||||||
type taskCreateQueueRequest struct {
|
|
||||||
Name string `json:"name"`
|
|
||||||
Config QueueConfig `json:"config"`
|
|
||||||
}
|
|
||||||
|
|
||||||
type taskEnqueueRequest struct {
|
|
||||||
QueueName string `json:"queueName"`
|
|
||||||
Payload []byte `json:"payload"`
|
|
||||||
}
|
|
||||||
|
|
||||||
type taskEnqueueResponse struct {
|
|
||||||
Result string `json:"result,omitempty"`
|
|
||||||
Error string `json:"error,omitempty"`
|
|
||||||
}
|
|
||||||
|
|
||||||
type taskGetRequest struct {
|
|
||||||
TaskID string `json:"taskId"`
|
|
||||||
}
|
|
||||||
|
|
||||||
type taskGetResponse struct {
|
|
||||||
Result *TaskInfo `json:"result,omitempty"`
|
|
||||||
Error string `json:"error,omitempty"`
|
|
||||||
}
|
|
||||||
|
|
||||||
type taskCancelRequest struct {
|
|
||||||
TaskID string `json:"taskId"`
|
|
||||||
}
|
|
||||||
|
|
||||||
// TaskCreateQueue calls the task_createqueue host function.
|
|
||||||
// CreateQueue creates a named task queue with the given configuration.
|
|
||||||
// Zero-value fields in config use sensible defaults.
|
|
||||||
// If a queue with the same name already exists, returns an error.
|
|
||||||
// On startup, this also recovers any stale "running" tasks from a previous crash.
|
|
||||||
func TaskCreateQueue(name string, config QueueConfig) error {
|
|
||||||
// Marshal request to JSON
|
|
||||||
req := taskCreateQueueRequest{
|
|
||||||
Name: name,
|
|
||||||
Config: config,
|
|
||||||
}
|
|
||||||
reqBytes, err := json.Marshal(req)
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
reqMem := pdk.AllocateBytes(reqBytes)
|
|
||||||
defer reqMem.Free()
|
|
||||||
|
|
||||||
// Call the host function
|
|
||||||
responsePtr := task_createqueue(reqMem.Offset())
|
|
||||||
|
|
||||||
// Read the response from memory
|
|
||||||
responseMem := pdk.FindMemory(responsePtr)
|
|
||||||
responseBytes := responseMem.ReadBytes()
|
|
||||||
|
|
||||||
// Parse error-only response
|
|
||||||
var response struct {
|
|
||||||
Error string `json:"error,omitempty"`
|
|
||||||
}
|
|
||||||
if err := json.Unmarshal(responseBytes, &response); err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
if response.Error != "" {
|
|
||||||
return errors.New(response.Error)
|
|
||||||
}
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// TaskEnqueue calls the task_enqueue host function.
|
|
||||||
// Enqueue adds a task to the named queue. Returns the task ID.
|
|
||||||
// payload is opaque bytes passed back to the plugin on execution.
|
|
||||||
func TaskEnqueue(queueName string, payload []byte) (string, error) {
|
|
||||||
// Marshal request to JSON
|
|
||||||
req := taskEnqueueRequest{
|
|
||||||
QueueName: queueName,
|
|
||||||
Payload: payload,
|
|
||||||
}
|
|
||||||
reqBytes, err := json.Marshal(req)
|
|
||||||
if err != nil {
|
|
||||||
return "", err
|
|
||||||
}
|
|
||||||
reqMem := pdk.AllocateBytes(reqBytes)
|
|
||||||
defer reqMem.Free()
|
|
||||||
|
|
||||||
// Call the host function
|
|
||||||
responsePtr := task_enqueue(reqMem.Offset())
|
|
||||||
|
|
||||||
// Read the response from memory
|
|
||||||
responseMem := pdk.FindMemory(responsePtr)
|
|
||||||
responseBytes := responseMem.ReadBytes()
|
|
||||||
|
|
||||||
// Parse the response
|
|
||||||
var response taskEnqueueResponse
|
|
||||||
if err := json.Unmarshal(responseBytes, &response); err != nil {
|
|
||||||
return "", err
|
|
||||||
}
|
|
||||||
|
|
||||||
// Convert Error field to Go error
|
|
||||||
if response.Error != "" {
|
|
||||||
return "", errors.New(response.Error)
|
|
||||||
}
|
|
||||||
|
|
||||||
return response.Result, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// TaskGet calls the task_get host function.
|
|
||||||
// Get returns the current state of a task including its status,
|
|
||||||
// message, and attempt count.
|
|
||||||
func TaskGet(taskID string) (*TaskInfo, error) {
|
|
||||||
// Marshal request to JSON
|
|
||||||
req := taskGetRequest{
|
|
||||||
TaskID: taskID,
|
|
||||||
}
|
|
||||||
reqBytes, err := json.Marshal(req)
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
reqMem := pdk.AllocateBytes(reqBytes)
|
|
||||||
defer reqMem.Free()
|
|
||||||
|
|
||||||
// Call the host function
|
|
||||||
responsePtr := task_get(reqMem.Offset())
|
|
||||||
|
|
||||||
// Read the response from memory
|
|
||||||
responseMem := pdk.FindMemory(responsePtr)
|
|
||||||
responseBytes := responseMem.ReadBytes()
|
|
||||||
|
|
||||||
// Parse the response
|
|
||||||
var response taskGetResponse
|
|
||||||
if err := json.Unmarshal(responseBytes, &response); err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
|
|
||||||
// Convert Error field to Go error
|
|
||||||
if response.Error != "" {
|
|
||||||
return nil, errors.New(response.Error)
|
|
||||||
}
|
|
||||||
|
|
||||||
return response.Result, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// TaskCancel calls the task_cancel host function.
|
|
||||||
// Cancel cancels a pending task. Returns error if already
|
|
||||||
// running, completed, or failed.
|
|
||||||
func TaskCancel(taskID string) error {
|
|
||||||
// Marshal request to JSON
|
|
||||||
req := taskCancelRequest{
|
|
||||||
TaskID: taskID,
|
|
||||||
}
|
|
||||||
reqBytes, err := json.Marshal(req)
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
reqMem := pdk.AllocateBytes(reqBytes)
|
|
||||||
defer reqMem.Free()
|
|
||||||
|
|
||||||
// Call the host function
|
|
||||||
responsePtr := task_cancel(reqMem.Offset())
|
|
||||||
|
|
||||||
// Read the response from memory
|
|
||||||
responseMem := pdk.FindMemory(responsePtr)
|
|
||||||
responseBytes := responseMem.ReadBytes()
|
|
||||||
|
|
||||||
// Parse error-only response
|
|
||||||
var response struct {
|
|
||||||
Error string `json:"error,omitempty"`
|
|
||||||
}
|
|
||||||
if err := json.Unmarshal(responseBytes, &response); err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
if response.Error != "" {
|
|
||||||
return errors.New(response.Error)
|
|
||||||
}
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
@@ -1,92 +0,0 @@
|
|||||||
// Code generated by ndpgen. DO NOT EDIT.
|
|
||||||
//
|
|
||||||
// This file contains mock implementations for non-WASM builds.
|
|
||||||
// These mocks allow IDE support, compilation, and unit testing on non-WASM platforms.
|
|
||||||
// Plugin authors can use the exported mock instances to set expectations in tests.
|
|
||||||
//
|
|
||||||
//go:build !wasip1
|
|
||||||
|
|
||||||
package host
|
|
||||||
|
|
||||||
import "github.com/stretchr/testify/mock"
|
|
||||||
|
|
||||||
// QueueConfig represents the QueueConfig data structure.
|
|
||||||
// QueueConfig holds configuration for a task queue.
|
|
||||||
type QueueConfig struct {
|
|
||||||
Concurrency int32 `json:"concurrency"`
|
|
||||||
MaxRetries int32 `json:"maxRetries"`
|
|
||||||
BackoffMs int64 `json:"backoffMs"`
|
|
||||||
DelayMs int64 `json:"delayMs"`
|
|
||||||
RetentionMs int64 `json:"retentionMs"`
|
|
||||||
}
|
|
||||||
|
|
||||||
// TaskInfo represents the TaskInfo data structure.
|
|
||||||
// TaskInfo holds the current state of a task.
|
|
||||||
type TaskInfo struct {
|
|
||||||
Status string `json:"status"`
|
|
||||||
Message string `json:"message"`
|
|
||||||
Attempt int32 `json:"attempt"`
|
|
||||||
}
|
|
||||||
|
|
||||||
// mockTaskService is the mock implementation for testing.
|
|
||||||
type mockTaskService struct {
|
|
||||||
mock.Mock
|
|
||||||
}
|
|
||||||
|
|
||||||
// TaskMock is the auto-instantiated mock instance for testing.
|
|
||||||
// Use this to set expectations: host.TaskMock.On("MethodName", args...).Return(values...)
|
|
||||||
var TaskMock = &mockTaskService{}
|
|
||||||
|
|
||||||
// CreateQueue is the mock method for TaskCreateQueue.
|
|
||||||
func (m *mockTaskService) CreateQueue(name string, config QueueConfig) error {
|
|
||||||
args := m.Called(name, config)
|
|
||||||
return args.Error(0)
|
|
||||||
}
|
|
||||||
|
|
||||||
// TaskCreateQueue delegates to the mock instance.
|
|
||||||
// CreateQueue creates a named task queue with the given configuration.
|
|
||||||
// Zero-value fields in config use sensible defaults.
|
|
||||||
// If a queue with the same name already exists, returns an error.
|
|
||||||
// On startup, this also recovers any stale "running" tasks from a previous crash.
|
|
||||||
func TaskCreateQueue(name string, config QueueConfig) error {
|
|
||||||
return TaskMock.CreateQueue(name, config)
|
|
||||||
}
|
|
||||||
|
|
||||||
// Enqueue is the mock method for TaskEnqueue.
|
|
||||||
func (m *mockTaskService) Enqueue(queueName string, payload []byte) (string, error) {
|
|
||||||
args := m.Called(queueName, payload)
|
|
||||||
return args.String(0), args.Error(1)
|
|
||||||
}
|
|
||||||
|
|
||||||
// TaskEnqueue delegates to the mock instance.
|
|
||||||
// Enqueue adds a task to the named queue. Returns the task ID.
|
|
||||||
// payload is opaque bytes passed back to the plugin on execution.
|
|
||||||
func TaskEnqueue(queueName string, payload []byte) (string, error) {
|
|
||||||
return TaskMock.Enqueue(queueName, payload)
|
|
||||||
}
|
|
||||||
|
|
||||||
// Get is the mock method for TaskGet.
|
|
||||||
func (m *mockTaskService) Get(taskID string) (*TaskInfo, error) {
|
|
||||||
args := m.Called(taskID)
|
|
||||||
return args.Get(0).(*TaskInfo), args.Error(1)
|
|
||||||
}
|
|
||||||
|
|
||||||
// TaskGet delegates to the mock instance.
|
|
||||||
// Get returns the current state of a task including its status,
|
|
||||||
// message, and attempt count.
|
|
||||||
func TaskGet(taskID string) (*TaskInfo, error) {
|
|
||||||
return TaskMock.Get(taskID)
|
|
||||||
}
|
|
||||||
|
|
||||||
// Cancel is the mock method for TaskCancel.
|
|
||||||
func (m *mockTaskService) Cancel(taskID string) error {
|
|
||||||
args := m.Called(taskID)
|
|
||||||
return args.Error(0)
|
|
||||||
}
|
|
||||||
|
|
||||||
// TaskCancel delegates to the mock instance.
|
|
||||||
// Cancel cancels a pending task. Returns error if already
|
|
||||||
// running, completed, or failed.
|
|
||||||
func TaskCancel(taskID string) error {
|
|
||||||
return TaskMock.Cancel(taskID)
|
|
||||||
}
|
|
||||||
@@ -1,79 +0,0 @@
|
|||||||
// Code generated by ndpgen. DO NOT EDIT.
|
|
||||||
//
|
|
||||||
// This file contains export wrappers for the TaskWorker capability.
|
|
||||||
// It is intended for use in Navidrome plugins built with TinyGo.
|
|
||||||
//
|
|
||||||
//go:build wasip1
|
|
||||||
|
|
||||||
package taskworker
|
|
||||||
|
|
||||||
import (
|
|
||||||
"github.com/navidrome/navidrome/plugins/pdk/go/pdk"
|
|
||||||
)
|
|
||||||
|
|
||||||
// TaskExecuteRequest is the request provided when a task is ready to execute.
|
|
||||||
type TaskExecuteRequest struct {
|
|
||||||
// QueueName is the name of the queue this task belongs to.
|
|
||||||
QueueName string `json:"queueName"`
|
|
||||||
// TaskID is the unique identifier for this task.
|
|
||||||
TaskID string `json:"taskId"`
|
|
||||||
// Payload is the opaque data provided when the task was enqueued.
|
|
||||||
Payload []byte `json:"payload"`
|
|
||||||
// Attempt is the current attempt number (1-based: first attempt = 1).
|
|
||||||
Attempt int32 `json:"attempt"`
|
|
||||||
}
|
|
||||||
|
|
||||||
// TaskWorker is the marker interface for taskworker plugins.
|
|
||||||
// Implement one or more of the provider interfaces below.
|
|
||||||
// TaskWorker provides task execution handling.
|
|
||||||
// This capability allows plugins to receive callbacks when their queued tasks
|
|
||||||
// are ready to execute. Plugins that use the taskqueue host service must
|
|
||||||
// implement this capability.
|
|
||||||
type TaskWorker interface{}
|
|
||||||
|
|
||||||
// TaskExecuteProvider provides the OnTaskExecute function.
|
|
||||||
type TaskExecuteProvider interface {
|
|
||||||
OnTaskExecute(TaskExecuteRequest) (string, error)
|
|
||||||
} // Internal implementation holders
|
|
||||||
var (
|
|
||||||
taskExecuteImpl func(TaskExecuteRequest) (string, error)
|
|
||||||
)
|
|
||||||
|
|
||||||
// Register registers a taskworker implementation.
|
|
||||||
// The implementation is checked for optional provider interfaces.
|
|
||||||
func Register(impl TaskWorker) {
|
|
||||||
if p, ok := impl.(TaskExecuteProvider); ok {
|
|
||||||
taskExecuteImpl = p.OnTaskExecute
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// NotImplementedCode is the standard return code for unimplemented functions.
|
|
||||||
// The host recognizes this and skips the plugin gracefully.
|
|
||||||
const NotImplementedCode int32 = -2
|
|
||||||
|
|
||||||
//go:wasmexport nd_task_execute
|
|
||||||
func _NdTaskExecute() int32 {
|
|
||||||
if taskExecuteImpl == nil {
|
|
||||||
// Return standard code - host will skip this plugin gracefully
|
|
||||||
return NotImplementedCode
|
|
||||||
}
|
|
||||||
|
|
||||||
var input TaskExecuteRequest
|
|
||||||
if err := pdk.InputJSON(&input); err != nil {
|
|
||||||
pdk.SetError(err)
|
|
||||||
return -1
|
|
||||||
}
|
|
||||||
|
|
||||||
output, err := taskExecuteImpl(input)
|
|
||||||
if err != nil {
|
|
||||||
pdk.SetError(err)
|
|
||||||
return -1
|
|
||||||
}
|
|
||||||
|
|
||||||
if err := pdk.OutputJSON(output); err != nil {
|
|
||||||
pdk.SetError(err)
|
|
||||||
return -1
|
|
||||||
}
|
|
||||||
|
|
||||||
return 0
|
|
||||||
}
|
|
||||||
@@ -1,41 +0,0 @@
|
|||||||
// Code generated by ndpgen. DO NOT EDIT.
|
|
||||||
//
|
|
||||||
// This file provides stub implementations for non-WASM platforms.
|
|
||||||
// It allows Go plugins to compile and run tests outside of WASM,
|
|
||||||
// but the actual functionality is only available in WASM builds.
|
|
||||||
//
|
|
||||||
//go:build !wasip1
|
|
||||||
|
|
||||||
package taskworker
|
|
||||||
|
|
||||||
// TaskExecuteRequest is the request provided when a task is ready to execute.
|
|
||||||
type TaskExecuteRequest struct {
|
|
||||||
// QueueName is the name of the queue this task belongs to.
|
|
||||||
QueueName string `json:"queueName"`
|
|
||||||
// TaskID is the unique identifier for this task.
|
|
||||||
TaskID string `json:"taskId"`
|
|
||||||
// Payload is the opaque data provided when the task was enqueued.
|
|
||||||
Payload []byte `json:"payload"`
|
|
||||||
// Attempt is the current attempt number (1-based: first attempt = 1).
|
|
||||||
Attempt int32 `json:"attempt"`
|
|
||||||
}
|
|
||||||
|
|
||||||
// TaskWorker is the marker interface for taskworker plugins.
|
|
||||||
// Implement one or more of the provider interfaces below.
|
|
||||||
// TaskWorker provides task execution handling.
|
|
||||||
// This capability allows plugins to receive callbacks when their queued tasks
|
|
||||||
// are ready to execute. Plugins that use the taskqueue host service must
|
|
||||||
// implement this capability.
|
|
||||||
type TaskWorker interface{}
|
|
||||||
|
|
||||||
// TaskExecuteProvider provides the OnTaskExecute function.
|
|
||||||
type TaskExecuteProvider interface {
|
|
||||||
OnTaskExecute(TaskExecuteRequest) (string, error)
|
|
||||||
}
|
|
||||||
|
|
||||||
// NotImplementedCode is the standard return code for unimplemented functions.
|
|
||||||
const NotImplementedCode int32 = -2
|
|
||||||
|
|
||||||
// Register is a no-op on non-WASM platforms.
|
|
||||||
// This stub allows code to compile outside of WASM.
|
|
||||||
func Register(_ TaskWorker) {}
|
|
||||||
@@ -1,154 +0,0 @@
|
|||||||
# Code generated by ndpgen. DO NOT EDIT.
|
|
||||||
#
|
|
||||||
# This file contains client wrappers for the Task host service.
|
|
||||||
# It is intended for use in Navidrome plugins built with extism-py.
|
|
||||||
#
|
|
||||||
# IMPORTANT: Due to a limitation in extism-py, you cannot import this file directly.
|
|
||||||
# The @extism.import_fn decorators are only detected when defined in the plugin's
|
|
||||||
# main __init__.py file. Copy the needed functions from this file into your plugin.
|
|
||||||
|
|
||||||
from dataclasses import dataclass
|
|
||||||
from typing import Any
|
|
||||||
|
|
||||||
import extism
|
|
||||||
import json
|
|
||||||
import base64
|
|
||||||
|
|
||||||
|
|
||||||
class HostFunctionError(Exception):
|
|
||||||
"""Raised when a host function returns an error."""
|
|
||||||
pass
|
|
||||||
|
|
||||||
|
|
||||||
@extism.import_fn("extism:host/user", "task_createqueue")
|
|
||||||
def _task_createqueue(offset: int) -> int:
|
|
||||||
"""Raw host function - do not call directly."""
|
|
||||||
...
|
|
||||||
|
|
||||||
|
|
||||||
@extism.import_fn("extism:host/user", "task_enqueue")
|
|
||||||
def _task_enqueue(offset: int) -> int:
|
|
||||||
"""Raw host function - do not call directly."""
|
|
||||||
...
|
|
||||||
|
|
||||||
|
|
||||||
@extism.import_fn("extism:host/user", "task_get")
|
|
||||||
def _task_get(offset: int) -> int:
|
|
||||||
"""Raw host function - do not call directly."""
|
|
||||||
...
|
|
||||||
|
|
||||||
|
|
||||||
@extism.import_fn("extism:host/user", "task_cancel")
|
|
||||||
def _task_cancel(offset: int) -> int:
|
|
||||||
"""Raw host function - do not call directly."""
|
|
||||||
...
|
|
||||||
|
|
||||||
|
|
||||||
def task_create_queue(name: str, config: Any) -> None:
|
|
||||||
"""CreateQueue creates a named task queue with the given configuration.
|
|
||||||
Zero-value fields in config use sensible defaults.
|
|
||||||
If a queue with the same name already exists, returns an error.
|
|
||||||
On startup, this also recovers any stale "running" tasks from a previous crash.
|
|
||||||
|
|
||||||
Args:
|
|
||||||
name: str parameter.
|
|
||||||
config: Any parameter.
|
|
||||||
|
|
||||||
Raises:
|
|
||||||
HostFunctionError: If the host function returns an error.
|
|
||||||
"""
|
|
||||||
request = {
|
|
||||||
"name": name,
|
|
||||||
"config": config,
|
|
||||||
}
|
|
||||||
request_bytes = json.dumps(request).encode("utf-8")
|
|
||||||
request_mem = extism.memory.alloc(request_bytes)
|
|
||||||
response_offset = _task_createqueue(request_mem.offset)
|
|
||||||
response_mem = extism.memory.find(response_offset)
|
|
||||||
response = json.loads(extism.memory.string(response_mem))
|
|
||||||
|
|
||||||
if response.get("error"):
|
|
||||||
raise HostFunctionError(response["error"])
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
def task_enqueue(queue_name: str, payload: bytes) -> str:
|
|
||||||
"""Enqueue adds a task to the named queue. Returns the task ID.
|
|
||||||
payload is opaque bytes passed back to the plugin on execution.
|
|
||||||
|
|
||||||
Args:
|
|
||||||
queue_name: str parameter.
|
|
||||||
payload: bytes parameter.
|
|
||||||
|
|
||||||
Returns:
|
|
||||||
str: The result value.
|
|
||||||
|
|
||||||
Raises:
|
|
||||||
HostFunctionError: If the host function returns an error.
|
|
||||||
"""
|
|
||||||
request = {
|
|
||||||
"queueName": queue_name,
|
|
||||||
"payload": base64.b64encode(payload).decode("ascii"),
|
|
||||||
}
|
|
||||||
request_bytes = json.dumps(request).encode("utf-8")
|
|
||||||
request_mem = extism.memory.alloc(request_bytes)
|
|
||||||
response_offset = _task_enqueue(request_mem.offset)
|
|
||||||
response_mem = extism.memory.find(response_offset)
|
|
||||||
response = json.loads(extism.memory.string(response_mem))
|
|
||||||
|
|
||||||
if response.get("error"):
|
|
||||||
raise HostFunctionError(response["error"])
|
|
||||||
|
|
||||||
return response.get("result", "")
|
|
||||||
|
|
||||||
|
|
||||||
def task_get(task_id: str) -> Any:
|
|
||||||
"""Get returns the current state of a task including its status,
|
|
||||||
message, and attempt count.
|
|
||||||
|
|
||||||
Args:
|
|
||||||
task_id: str parameter.
|
|
||||||
|
|
||||||
Returns:
|
|
||||||
Any: The result value.
|
|
||||||
|
|
||||||
Raises:
|
|
||||||
HostFunctionError: If the host function returns an error.
|
|
||||||
"""
|
|
||||||
request = {
|
|
||||||
"taskId": task_id,
|
|
||||||
}
|
|
||||||
request_bytes = json.dumps(request).encode("utf-8")
|
|
||||||
request_mem = extism.memory.alloc(request_bytes)
|
|
||||||
response_offset = _task_get(request_mem.offset)
|
|
||||||
response_mem = extism.memory.find(response_offset)
|
|
||||||
response = json.loads(extism.memory.string(response_mem))
|
|
||||||
|
|
||||||
if response.get("error"):
|
|
||||||
raise HostFunctionError(response["error"])
|
|
||||||
|
|
||||||
return response.get("result", None)
|
|
||||||
|
|
||||||
|
|
||||||
def task_cancel(task_id: str) -> None:
|
|
||||||
"""Cancel cancels a pending task. Returns error if already
|
|
||||||
running, completed, or failed.
|
|
||||||
|
|
||||||
Args:
|
|
||||||
task_id: str parameter.
|
|
||||||
|
|
||||||
Raises:
|
|
||||||
HostFunctionError: If the host function returns an error.
|
|
||||||
"""
|
|
||||||
request = {
|
|
||||||
"taskId": task_id,
|
|
||||||
}
|
|
||||||
request_bytes = json.dumps(request).encode("utf-8")
|
|
||||||
request_mem = extism.memory.alloc(request_bytes)
|
|
||||||
response_offset = _task_cancel(request_mem.offset)
|
|
||||||
response_mem = extism.memory.find(response_offset)
|
|
||||||
response = json.loads(extism.memory.string(response_mem))
|
|
||||||
|
|
||||||
if response.get("error"):
|
|
||||||
raise HostFunctionError(response["error"])
|
|
||||||
|
|
||||||
@@ -9,5 +9,4 @@ pub mod lifecycle;
|
|||||||
pub mod metadata;
|
pub mod metadata;
|
||||||
pub mod scheduler;
|
pub mod scheduler;
|
||||||
pub mod scrobbler;
|
pub mod scrobbler;
|
||||||
pub mod taskworker;
|
|
||||||
pub mod websocket;
|
pub mod websocket;
|
||||||
|
|||||||
@@ -1,102 +0,0 @@
|
|||||||
// Code generated by ndpgen. DO NOT EDIT.
|
|
||||||
//
|
|
||||||
// This file contains export wrappers for the TaskWorker capability.
|
|
||||||
// It is intended for use in Navidrome plugins built with extism-pdk.
|
|
||||||
|
|
||||||
use serde::{Deserialize, Serialize};
|
|
||||||
use base64::Engine as _;
|
|
||||||
use base64::engine::general_purpose::STANDARD as BASE64;
|
|
||||||
|
|
||||||
mod base64_bytes {
|
|
||||||
use serde::{self, Deserialize, Deserializer, Serializer};
|
|
||||||
use base64::Engine as _;
|
|
||||||
use base64::engine::general_purpose::STANDARD as BASE64;
|
|
||||||
|
|
||||||
pub fn serialize<S>(bytes: &Vec<u8>, serializer: S) -> Result<S::Ok, S::Error>
|
|
||||||
where
|
|
||||||
S: Serializer,
|
|
||||||
{
|
|
||||||
serializer.serialize_str(&BASE64.encode(bytes))
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn deserialize<'de, D>(deserializer: D) -> Result<Vec<u8>, D::Error>
|
|
||||||
where
|
|
||||||
D: Deserializer<'de>,
|
|
||||||
{
|
|
||||||
let s = String::deserialize(deserializer)?;
|
|
||||||
BASE64.decode(&s).map_err(serde::de::Error::custom)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// Helper functions for skip_serializing_if with numeric types
|
|
||||||
#[allow(dead_code)]
|
|
||||||
fn is_zero_i32(value: &i32) -> bool { *value == 0 }
|
|
||||||
#[allow(dead_code)]
|
|
||||||
fn is_zero_u32(value: &u32) -> bool { *value == 0 }
|
|
||||||
#[allow(dead_code)]
|
|
||||||
fn is_zero_i64(value: &i64) -> bool { *value == 0 }
|
|
||||||
#[allow(dead_code)]
|
|
||||||
fn is_zero_u64(value: &u64) -> bool { *value == 0 }
|
|
||||||
#[allow(dead_code)]
|
|
||||||
fn is_zero_f32(value: &f32) -> bool { *value == 0.0 }
|
|
||||||
#[allow(dead_code)]
|
|
||||||
fn is_zero_f64(value: &f64) -> bool { *value == 0.0 }
|
|
||||||
/// TaskExecuteRequest is the request provided when a task is ready to execute.
|
|
||||||
#[derive(Debug, Clone, Default, Serialize, Deserialize)]
|
|
||||||
#[serde(rename_all = "camelCase")]
|
|
||||||
pub struct TaskExecuteRequest {
|
|
||||||
/// QueueName is the name of the queue this task belongs to.
|
|
||||||
#[serde(default)]
|
|
||||||
pub queue_name: String,
|
|
||||||
/// TaskID is the unique identifier for this task.
|
|
||||||
#[serde(default)]
|
|
||||||
pub task_id: String,
|
|
||||||
/// Payload is the opaque data provided when the task was enqueued.
|
|
||||||
#[serde(default)]
|
|
||||||
#[serde(with = "base64_bytes")]
|
|
||||||
pub payload: Vec<u8>,
|
|
||||||
/// Attempt is the current attempt number (1-based: first attempt = 1).
|
|
||||||
#[serde(default)]
|
|
||||||
pub attempt: i32,
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Error represents an error from a capability method.
|
|
||||||
#[derive(Debug)]
|
|
||||||
pub struct Error {
|
|
||||||
pub message: String,
|
|
||||||
}
|
|
||||||
|
|
||||||
impl std::fmt::Display for Error {
|
|
||||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
|
||||||
write!(f, "{}", self.message)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
impl std::error::Error for Error {}
|
|
||||||
|
|
||||||
impl Error {
|
|
||||||
pub fn new(message: impl Into<String>) -> Self {
|
|
||||||
Self { message: message.into() }
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/// TaskExecuteProvider provides the OnTaskExecute function.
|
|
||||||
pub trait TaskExecuteProvider {
|
|
||||||
fn on_task_execute(&self, req: TaskExecuteRequest) -> Result<String, Error>;
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Register the on_task_execute export.
|
|
||||||
/// This macro generates the WASM export function for this method.
|
|
||||||
#[macro_export]
|
|
||||||
macro_rules! register_taskworker_task_execute {
|
|
||||||
($plugin_type:ty) => {
|
|
||||||
#[extism_pdk::plugin_fn]
|
|
||||||
pub fn nd_task_execute(
|
|
||||||
req: extism_pdk::Json<$crate::taskworker::TaskExecuteRequest>
|
|
||||||
) -> extism_pdk::FnResult<extism_pdk::Json<String>> {
|
|
||||||
let plugin = <$plugin_type>::default();
|
|
||||||
let result = $crate::taskworker::TaskExecuteProvider::on_task_execute(&plugin, req.into_inner())?;
|
|
||||||
Ok(extism_pdk::Json(result))
|
|
||||||
}
|
|
||||||
};
|
|
||||||
}
|
|
||||||
@@ -40,7 +40,6 @@
|
|||||||
//! - [`library`] - provides access to music library metadata for plugins.
|
//! - [`library`] - provides access to music library metadata for plugins.
|
||||||
//! - [`scheduler`] - provides task scheduling capabilities for plugins.
|
//! - [`scheduler`] - provides task scheduling capabilities for plugins.
|
||||||
//! - [`subsonicapi`] - provides access to Navidrome's Subsonic API from plugins.
|
//! - [`subsonicapi`] - provides access to Navidrome's Subsonic API from plugins.
|
||||||
//! - [`task`] - provides persistent task queues for plugins.
|
|
||||||
//! - [`users`] - provides access to user information for plugins.
|
//! - [`users`] - provides access to user information for plugins.
|
||||||
//! - [`websocket`] - provides WebSocket communication capabilities for plugins.
|
//! - [`websocket`] - provides WebSocket communication capabilities for plugins.
|
||||||
|
|
||||||
@@ -100,13 +99,6 @@ pub mod subsonicapi {
|
|||||||
pub use super::nd_host_subsonicapi::*;
|
pub use super::nd_host_subsonicapi::*;
|
||||||
}
|
}
|
||||||
|
|
||||||
#[doc(hidden)]
|
|
||||||
mod nd_host_task;
|
|
||||||
/// provides persistent task queues for plugins.
|
|
||||||
pub mod task {
|
|
||||||
pub use super::nd_host_task::*;
|
|
||||||
}
|
|
||||||
|
|
||||||
#[doc(hidden)]
|
#[doc(hidden)]
|
||||||
mod nd_host_users;
|
mod nd_host_users;
|
||||||
/// provides access to user information for plugins.
|
/// provides access to user information for plugins.
|
||||||
|
|||||||
@@ -1,217 +0,0 @@
|
|||||||
// Code generated by ndpgen. DO NOT EDIT.
|
|
||||||
//
|
|
||||||
// This file contains client wrappers for the Task host service.
|
|
||||||
// It is intended for use in Navidrome plugins built with extism-pdk.
|
|
||||||
|
|
||||||
use extism_pdk::*;
|
|
||||||
use serde::{Deserialize, Serialize};
|
|
||||||
use base64::Engine as _;
|
|
||||||
use base64::engine::general_purpose::STANDARD as BASE64;
|
|
||||||
|
|
||||||
mod base64_bytes {
|
|
||||||
use serde::{self, Deserialize, Deserializer, Serializer};
|
|
||||||
use base64::Engine as _;
|
|
||||||
use base64::engine::general_purpose::STANDARD as BASE64;
|
|
||||||
|
|
||||||
pub fn serialize<S>(bytes: &Vec<u8>, serializer: S) -> Result<S::Ok, S::Error>
|
|
||||||
where
|
|
||||||
S: Serializer,
|
|
||||||
{
|
|
||||||
serializer.serialize_str(&BASE64.encode(bytes))
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn deserialize<'de, D>(deserializer: D) -> Result<Vec<u8>, D::Error>
|
|
||||||
where
|
|
||||||
D: Deserializer<'de>,
|
|
||||||
{
|
|
||||||
let s = String::deserialize(deserializer)?;
|
|
||||||
BASE64.decode(&s).map_err(serde::de::Error::custom)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/// QueueConfig holds configuration for a task queue.
|
|
||||||
#[derive(Debug, Clone, Serialize, Deserialize)]
|
|
||||||
#[serde(rename_all = "camelCase")]
|
|
||||||
pub struct QueueConfig {
|
|
||||||
pub concurrency: i32,
|
|
||||||
pub max_retries: i32,
|
|
||||||
pub backoff_ms: i64,
|
|
||||||
pub delay_ms: i64,
|
|
||||||
pub retention_ms: i64,
|
|
||||||
}
|
|
||||||
|
|
||||||
/// TaskInfo holds the current state of a task.
|
|
||||||
#[derive(Debug, Clone, Serialize, Deserialize)]
|
|
||||||
#[serde(rename_all = "camelCase")]
|
|
||||||
pub struct TaskInfo {
|
|
||||||
pub status: String,
|
|
||||||
pub message: String,
|
|
||||||
pub attempt: i32,
|
|
||||||
}
|
|
||||||
|
|
||||||
#[derive(Debug, Clone, Serialize)]
|
|
||||||
#[serde(rename_all = "camelCase")]
|
|
||||||
struct TaskCreateQueueRequest {
|
|
||||||
name: String,
|
|
||||||
config: QueueConfig,
|
|
||||||
}
|
|
||||||
|
|
||||||
#[derive(Debug, Clone, Deserialize)]
|
|
||||||
#[serde(rename_all = "camelCase")]
|
|
||||||
struct TaskCreateQueueResponse {
|
|
||||||
#[serde(default)]
|
|
||||||
error: Option<String>,
|
|
||||||
}
|
|
||||||
|
|
||||||
#[derive(Debug, Clone, Serialize)]
|
|
||||||
#[serde(rename_all = "camelCase")]
|
|
||||||
struct TaskEnqueueRequest {
|
|
||||||
queue_name: String,
|
|
||||||
#[serde(with = "base64_bytes")]
|
|
||||||
payload: Vec<u8>,
|
|
||||||
}
|
|
||||||
|
|
||||||
#[derive(Debug, Clone, Deserialize)]
|
|
||||||
#[serde(rename_all = "camelCase")]
|
|
||||||
struct TaskEnqueueResponse {
|
|
||||||
#[serde(default)]
|
|
||||||
result: String,
|
|
||||||
#[serde(default)]
|
|
||||||
error: Option<String>,
|
|
||||||
}
|
|
||||||
|
|
||||||
#[derive(Debug, Clone, Serialize)]
|
|
||||||
#[serde(rename_all = "camelCase")]
|
|
||||||
struct TaskGetRequest {
|
|
||||||
task_id: String,
|
|
||||||
}
|
|
||||||
|
|
||||||
#[derive(Debug, Clone, Deserialize)]
|
|
||||||
#[serde(rename_all = "camelCase")]
|
|
||||||
struct TaskGetResponse {
|
|
||||||
#[serde(default)]
|
|
||||||
result: Option<TaskInfo>,
|
|
||||||
#[serde(default)]
|
|
||||||
error: Option<String>,
|
|
||||||
}
|
|
||||||
|
|
||||||
#[derive(Debug, Clone, Serialize)]
|
|
||||||
#[serde(rename_all = "camelCase")]
|
|
||||||
struct TaskCancelRequest {
|
|
||||||
task_id: String,
|
|
||||||
}
|
|
||||||
|
|
||||||
#[derive(Debug, Clone, Deserialize)]
|
|
||||||
#[serde(rename_all = "camelCase")]
|
|
||||||
struct TaskCancelResponse {
|
|
||||||
#[serde(default)]
|
|
||||||
error: Option<String>,
|
|
||||||
}
|
|
||||||
|
|
||||||
#[host_fn]
|
|
||||||
extern "ExtismHost" {
|
|
||||||
fn task_createqueue(input: Json<TaskCreateQueueRequest>) -> Json<TaskCreateQueueResponse>;
|
|
||||||
fn task_enqueue(input: Json<TaskEnqueueRequest>) -> Json<TaskEnqueueResponse>;
|
|
||||||
fn task_get(input: Json<TaskGetRequest>) -> Json<TaskGetResponse>;
|
|
||||||
fn task_cancel(input: Json<TaskCancelRequest>) -> Json<TaskCancelResponse>;
|
|
||||||
}
|
|
||||||
|
|
||||||
/// CreateQueue creates a named task queue with the given configuration.
|
|
||||||
/// Zero-value fields in config use sensible defaults.
|
|
||||||
/// If a queue with the same name already exists, returns an error.
|
|
||||||
/// On startup, this also recovers any stale "running" tasks from a previous crash.
|
|
||||||
///
|
|
||||||
/// # Arguments
|
|
||||||
/// * `name` - String parameter.
|
|
||||||
/// * `config` - QueueConfig parameter.
|
|
||||||
///
|
|
||||||
/// # Errors
|
|
||||||
/// Returns an error if the host function call fails.
|
|
||||||
pub fn create_queue(name: &str, config: QueueConfig) -> Result<(), Error> {
|
|
||||||
let response = unsafe {
|
|
||||||
task_createqueue(Json(TaskCreateQueueRequest {
|
|
||||||
name: name.to_owned(),
|
|
||||||
config: config,
|
|
||||||
}))?
|
|
||||||
};
|
|
||||||
|
|
||||||
if let Some(err) = response.0.error {
|
|
||||||
return Err(Error::msg(err));
|
|
||||||
}
|
|
||||||
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Enqueue adds a task to the named queue. Returns the task ID.
|
|
||||||
/// payload is opaque bytes passed back to the plugin on execution.
|
|
||||||
///
|
|
||||||
/// # Arguments
|
|
||||||
/// * `queue_name` - String parameter.
|
|
||||||
/// * `payload` - Vec<u8> parameter.
|
|
||||||
///
|
|
||||||
/// # Returns
|
|
||||||
/// The result value.
|
|
||||||
///
|
|
||||||
/// # Errors
|
|
||||||
/// Returns an error if the host function call fails.
|
|
||||||
pub fn enqueue(queue_name: &str, payload: Vec<u8>) -> Result<String, Error> {
|
|
||||||
let response = unsafe {
|
|
||||||
task_enqueue(Json(TaskEnqueueRequest {
|
|
||||||
queue_name: queue_name.to_owned(),
|
|
||||||
payload: payload,
|
|
||||||
}))?
|
|
||||||
};
|
|
||||||
|
|
||||||
if let Some(err) = response.0.error {
|
|
||||||
return Err(Error::msg(err));
|
|
||||||
}
|
|
||||||
|
|
||||||
Ok(response.0.result)
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Get returns the current state of a task including its status,
|
|
||||||
/// message, and attempt count.
|
|
||||||
///
|
|
||||||
/// # Arguments
|
|
||||||
/// * `task_id` - String parameter.
|
|
||||||
///
|
|
||||||
/// # Returns
|
|
||||||
/// The result value.
|
|
||||||
///
|
|
||||||
/// # Errors
|
|
||||||
/// Returns an error if the host function call fails.
|
|
||||||
pub fn get(task_id: &str) -> Result<Option<TaskInfo>, Error> {
|
|
||||||
let response = unsafe {
|
|
||||||
task_get(Json(TaskGetRequest {
|
|
||||||
task_id: task_id.to_owned(),
|
|
||||||
}))?
|
|
||||||
};
|
|
||||||
|
|
||||||
if let Some(err) = response.0.error {
|
|
||||||
return Err(Error::msg(err));
|
|
||||||
}
|
|
||||||
|
|
||||||
Ok(response.0.result)
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Cancel cancels a pending task. Returns error if already
|
|
||||||
/// running, completed, or failed.
|
|
||||||
///
|
|
||||||
/// # Arguments
|
|
||||||
/// * `task_id` - String parameter.
|
|
||||||
///
|
|
||||||
/// # Errors
|
|
||||||
/// Returns an error if the host function call fails.
|
|
||||||
pub fn cancel(task_id: &str) -> Result<(), Error> {
|
|
||||||
let response = unsafe {
|
|
||||||
task_cancel(Json(TaskCancelRequest {
|
|
||||||
task_id: task_id.to_owned(),
|
|
||||||
}))?
|
|
||||||
};
|
|
||||||
|
|
||||||
if let Some(err) = response.0.error {
|
|
||||||
return Err(Error::msg(err));
|
|
||||||
}
|
|
||||||
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
16
plugins/testdata/test-taskqueue/go.mod
vendored
16
plugins/testdata/test-taskqueue/go.mod
vendored
@@ -1,16 +0,0 @@
|
|||||||
module test-taskqueue
|
|
||||||
|
|
||||||
go 1.25
|
|
||||||
|
|
||||||
require github.com/navidrome/navidrome/plugins/pdk/go v0.0.0
|
|
||||||
|
|
||||||
require (
|
|
||||||
github.com/davecgh/go-spew v1.1.1 // indirect
|
|
||||||
github.com/extism/go-pdk v1.1.3 // indirect
|
|
||||||
github.com/pmezard/go-difflib v1.0.0 // indirect
|
|
||||||
github.com/stretchr/objx v0.5.2 // indirect
|
|
||||||
github.com/stretchr/testify v1.11.1 // indirect
|
|
||||||
gopkg.in/yaml.v3 v3.0.1 // indirect
|
|
||||||
)
|
|
||||||
|
|
||||||
replace github.com/navidrome/navidrome/plugins/pdk/go => ../../pdk/go
|
|
||||||
14
plugins/testdata/test-taskqueue/go.sum
vendored
14
plugins/testdata/test-taskqueue/go.sum
vendored
@@ -1,14 +0,0 @@
|
|||||||
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
|
|
||||||
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
|
|
||||||
github.com/extism/go-pdk v1.1.3 h1:hfViMPWrqjN6u67cIYRALZTZLk/enSPpNKa+rZ9X2SQ=
|
|
||||||
github.com/extism/go-pdk v1.1.3/go.mod h1:Gz+LIU/YCKnKXhgge8yo5Yu1F/lbv7KtKFkiCSzW/P4=
|
|
||||||
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
|
|
||||||
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
|
|
||||||
github.com/stretchr/objx v0.5.2 h1:xuMeJ0Sdp5ZMRXx/aWO6RZxdr3beISkG5/G/aIRr3pY=
|
|
||||||
github.com/stretchr/objx v0.5.2/go.mod h1:FRsXN1f5AsAjCGJKqEizvkpNtU+EGNCLh3NxZ/8L+MA=
|
|
||||||
github.com/stretchr/testify v1.11.1 h1:7s2iGBzp5EwR7/aIZr8ao5+dra3wiQyKjjFuvgVKu7U=
|
|
||||||
github.com/stretchr/testify v1.11.1/go.mod h1:wZwfW3scLgRK+23gO65QZefKpKQRnfz6sD981Nm4B6U=
|
|
||||||
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM=
|
|
||||||
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
|
|
||||||
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
|
|
||||||
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
|
|
||||||
104
plugins/testdata/test-taskqueue/main.go
vendored
104
plugins/testdata/test-taskqueue/main.go
vendored
@@ -1,104 +0,0 @@
|
|||||||
// Test TaskQueue plugin for Navidrome plugin system integration tests.
|
|
||||||
// Build with: tinygo build -o ../test-taskqueue.wasm -target wasip1 -buildmode=c-shared .
|
|
||||||
package main
|
|
||||||
|
|
||||||
import (
|
|
||||||
"fmt"
|
|
||||||
|
|
||||||
"github.com/navidrome/navidrome/plugins/pdk/go/host"
|
|
||||||
"github.com/navidrome/navidrome/plugins/pdk/go/pdk"
|
|
||||||
"github.com/navidrome/navidrome/plugins/pdk/go/taskworker"
|
|
||||||
)
|
|
||||||
|
|
||||||
func init() {
|
|
||||||
taskworker.Register(&handler{})
|
|
||||||
}
|
|
||||||
|
|
||||||
type handler struct{}
|
|
||||||
|
|
||||||
func (h *handler) OnTaskExecute(req taskworker.TaskExecuteRequest) (string, error) {
|
|
||||||
payload := string(req.Payload)
|
|
||||||
if payload == "fail" {
|
|
||||||
return "", fmt.Errorf("task failed as instructed")
|
|
||||||
}
|
|
||||||
if payload == "fail-then-succeed" && req.Attempt < 2 {
|
|
||||||
return "", fmt.Errorf("transient failure")
|
|
||||||
}
|
|
||||||
return "completed successfully", nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// Test helper types
|
|
||||||
type TestInput struct {
|
|
||||||
Operation string `json:"operation"`
|
|
||||||
QueueName string `json:"queueName,omitempty"`
|
|
||||||
Config *host.QueueConfig `json:"config,omitempty"`
|
|
||||||
Payload []byte `json:"payload,omitempty"`
|
|
||||||
TaskID string `json:"taskId,omitempty"`
|
|
||||||
}
|
|
||||||
|
|
||||||
type TestOutput struct {
|
|
||||||
TaskID string `json:"taskId,omitempty"`
|
|
||||||
Status string `json:"status,omitempty"`
|
|
||||||
Message string `json:"message,omitempty"`
|
|
||||||
Attempt int32 `json:"attempt,omitempty"`
|
|
||||||
Error *string `json:"error,omitempty"`
|
|
||||||
}
|
|
||||||
|
|
||||||
//go:wasmexport nd_test_taskqueue
|
|
||||||
func ndTestTaskQueue() int32 {
|
|
||||||
var input TestInput
|
|
||||||
if err := pdk.InputJSON(&input); err != nil {
|
|
||||||
errStr := err.Error()
|
|
||||||
pdk.OutputJSON(TestOutput{Error: &errStr})
|
|
||||||
return 0
|
|
||||||
}
|
|
||||||
|
|
||||||
switch input.Operation {
|
|
||||||
case "create_queue":
|
|
||||||
config := host.QueueConfig{}
|
|
||||||
if input.Config != nil {
|
|
||||||
config = *input.Config
|
|
||||||
}
|
|
||||||
err := host.TaskCreateQueue(input.QueueName, config)
|
|
||||||
if err != nil {
|
|
||||||
errStr := err.Error()
|
|
||||||
pdk.OutputJSON(TestOutput{Error: &errStr})
|
|
||||||
return 0
|
|
||||||
}
|
|
||||||
pdk.OutputJSON(TestOutput{})
|
|
||||||
|
|
||||||
case "enqueue":
|
|
||||||
taskID, err := host.TaskEnqueue(input.QueueName, input.Payload)
|
|
||||||
if err != nil {
|
|
||||||
errStr := err.Error()
|
|
||||||
pdk.OutputJSON(TestOutput{Error: &errStr})
|
|
||||||
return 0
|
|
||||||
}
|
|
||||||
pdk.OutputJSON(TestOutput{TaskID: taskID})
|
|
||||||
|
|
||||||
case "get_task_status":
|
|
||||||
info, err := host.TaskGet(input.TaskID)
|
|
||||||
if err != nil {
|
|
||||||
errStr := err.Error()
|
|
||||||
pdk.OutputJSON(TestOutput{Error: &errStr})
|
|
||||||
return 0
|
|
||||||
}
|
|
||||||
pdk.OutputJSON(TestOutput{Status: info.Status, Message: info.Message, Attempt: info.Attempt})
|
|
||||||
|
|
||||||
case "cancel_task":
|
|
||||||
err := host.TaskCancel(input.TaskID)
|
|
||||||
if err != nil {
|
|
||||||
errStr := err.Error()
|
|
||||||
pdk.OutputJSON(TestOutput{Error: &errStr})
|
|
||||||
return 0
|
|
||||||
}
|
|
||||||
pdk.OutputJSON(TestOutput{})
|
|
||||||
|
|
||||||
default:
|
|
||||||
errStr := "unknown operation: " + input.Operation
|
|
||||||
pdk.OutputJSON(TestOutput{Error: &errStr})
|
|
||||||
}
|
|
||||||
return 0
|
|
||||||
}
|
|
||||||
|
|
||||||
func main() {}
|
|
||||||
12
plugins/testdata/test-taskqueue/manifest.json
vendored
12
plugins/testdata/test-taskqueue/manifest.json
vendored
@@ -1,12 +0,0 @@
|
|||||||
{
|
|
||||||
"name": "Test TaskQueue Plugin",
|
|
||||||
"author": "Navidrome Test",
|
|
||||||
"version": "1.0.0",
|
|
||||||
"description": "A test plugin for TaskQueue integration testing",
|
|
||||||
"permissions": {
|
|
||||||
"taskqueue": {
|
|
||||||
"reason": "For testing task queue operations",
|
|
||||||
"maxConcurrency": 10
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
@@ -353,7 +353,8 @@
|
|||||||
"allUsers": "Permitir todos os usuários",
|
"allUsers": "Permitir todos os usuários",
|
||||||
"selectedUsers": "Usuários selecionados",
|
"selectedUsers": "Usuários selecionados",
|
||||||
"allLibraries": "Permitir todas as bibliotecas",
|
"allLibraries": "Permitir todas as bibliotecas",
|
||||||
"selectedLibraries": "Bibliotecas selecionadas"
|
"selectedLibraries": "Bibliotecas selecionadas",
|
||||||
|
"allowWriteAccess": "Permitir acesso de escrita"
|
||||||
},
|
},
|
||||||
"sections": {
|
"sections": {
|
||||||
"status": "Status",
|
"status": "Status",
|
||||||
@@ -396,6 +397,7 @@
|
|||||||
"allLibrariesHelp": "Quando habilitado, o plugin terá acesso a todas as bibliotecas, incluindo as criadas no futuro.",
|
"allLibrariesHelp": "Quando habilitado, o plugin terá acesso a todas as bibliotecas, incluindo as criadas no futuro.",
|
||||||
"noLibraries": "Nenhuma biblioteca selecionada",
|
"noLibraries": "Nenhuma biblioteca selecionada",
|
||||||
"librariesRequired": "Este plugin requer acesso a informações de bibliotecas. Selecione quais bibliotecas o plugin pode acessar, ou habilite 'Permitir todas as bibliotecas'.",
|
"librariesRequired": "Este plugin requer acesso a informações de bibliotecas. Selecione quais bibliotecas o plugin pode acessar, ou habilite 'Permitir todas as bibliotecas'.",
|
||||||
|
"allowWriteAccessHelp": "Quando habilitado, o plugin pode modificar arquivos nos diretórios das bibliotecas. Por padrão, plugins têm acesso somente leitura.",
|
||||||
"requiredHosts": "Hosts necessários",
|
"requiredHosts": "Hosts necessários",
|
||||||
"configValidationError": "Falha na validação da configuração:",
|
"configValidationError": "Falha na validação da configuração:",
|
||||||
"schemaRenderError": "Não foi possível renderizar o formulário de configuração. O schema do plugin pode estar inválido."
|
"schemaRenderError": "Não foi possível renderizar o formulário de configuração. O schema do plugin pode estar inválido."
|
||||||
|
|||||||
@@ -29,7 +29,7 @@ type PluginManager interface {
|
|||||||
ValidatePluginConfig(ctx context.Context, id, configJSON string) error
|
ValidatePluginConfig(ctx context.Context, id, configJSON string) error
|
||||||
UpdatePluginConfig(ctx context.Context, id, configJSON string) error
|
UpdatePluginConfig(ctx context.Context, id, configJSON string) error
|
||||||
UpdatePluginUsers(ctx context.Context, id, usersJSON string, allUsers bool) error
|
UpdatePluginUsers(ctx context.Context, id, usersJSON string, allUsers bool) error
|
||||||
UpdatePluginLibraries(ctx context.Context, id, librariesJSON string, allLibraries bool) error
|
UpdatePluginLibraries(ctx context.Context, id, librariesJSON string, allLibraries, allowWriteAccess bool) error
|
||||||
RescanPlugins(ctx context.Context) error
|
RescanPlugins(ctx context.Context) error
|
||||||
UnloadDisabledPlugins(ctx context.Context)
|
UnloadDisabledPlugins(ctx context.Context)
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -56,12 +56,13 @@ func pluginsEnabledMiddleware(next http.Handler) http.Handler {
|
|||||||
|
|
||||||
// PluginUpdateRequest represents the fields that can be updated via the API
|
// PluginUpdateRequest represents the fields that can be updated via the API
|
||||||
type PluginUpdateRequest struct {
|
type PluginUpdateRequest struct {
|
||||||
Enabled *bool `json:"enabled,omitempty"`
|
Enabled *bool `json:"enabled,omitempty"`
|
||||||
Config *string `json:"config,omitempty"`
|
Config *string `json:"config,omitempty"`
|
||||||
Users *string `json:"users,omitempty"`
|
Users *string `json:"users,omitempty"`
|
||||||
AllUsers *bool `json:"allUsers,omitempty"`
|
AllUsers *bool `json:"allUsers,omitempty"`
|
||||||
Libraries *string `json:"libraries,omitempty"`
|
Libraries *string `json:"libraries,omitempty"`
|
||||||
AllLibraries *bool `json:"allLibraries,omitempty"`
|
AllLibraries *bool `json:"allLibraries,omitempty"`
|
||||||
|
AllowWriteAccess *bool `json:"allowWriteAccess,omitempty"`
|
||||||
}
|
}
|
||||||
|
|
||||||
func (api *Router) updatePlugin(w http.ResponseWriter, r *http.Request) {
|
func (api *Router) updatePlugin(w http.ResponseWriter, r *http.Request) {
|
||||||
@@ -109,7 +110,7 @@ func (api *Router) updatePlugin(w http.ResponseWriter, r *http.Request) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Handle libraries permission update (if provided)
|
// Handle libraries permission update (if provided)
|
||||||
if req.Libraries != nil || req.AllLibraries != nil {
|
if req.Libraries != nil || req.AllLibraries != nil || req.AllowWriteAccess != nil {
|
||||||
if err := validateAndUpdateLibraries(ctx, api.pluginManager, repo, id, req, w); err != nil {
|
if err := validateAndUpdateLibraries(ctx, api.pluginManager, repo, id, req, w); err != nil {
|
||||||
log.Error(ctx, "Error updating plugin libraries", err)
|
log.Error(ctx, "Error updating plugin libraries", err)
|
||||||
return
|
return
|
||||||
@@ -245,6 +246,7 @@ func validateAndUpdateLibraries(ctx context.Context, pm PluginManager, repo mode
|
|||||||
|
|
||||||
librariesJSON := plugin.Libraries
|
librariesJSON := plugin.Libraries
|
||||||
allLibraries := plugin.AllLibraries
|
allLibraries := plugin.AllLibraries
|
||||||
|
allowWriteAccess := plugin.AllowWriteAccess
|
||||||
|
|
||||||
if req.Libraries != nil {
|
if req.Libraries != nil {
|
||||||
if *req.Libraries != "" && !isValidJSON(*req.Libraries) {
|
if *req.Libraries != "" && !isValidJSON(*req.Libraries) {
|
||||||
@@ -256,8 +258,11 @@ func validateAndUpdateLibraries(ctx context.Context, pm PluginManager, repo mode
|
|||||||
if req.AllLibraries != nil {
|
if req.AllLibraries != nil {
|
||||||
allLibraries = *req.AllLibraries
|
allLibraries = *req.AllLibraries
|
||||||
}
|
}
|
||||||
|
if req.AllowWriteAccess != nil {
|
||||||
|
allowWriteAccess = *req.AllowWriteAccess
|
||||||
|
}
|
||||||
|
|
||||||
if err := pm.UpdatePluginLibraries(ctx, id, librariesJSON, allLibraries); err != nil {
|
if err := pm.UpdatePluginLibraries(ctx, id, librariesJSON, allLibraries, allowWriteAccess); err != nil {
|
||||||
log.Error(ctx, "Error updating plugin libraries", "id", id, err)
|
log.Error(ctx, "Error updating plugin libraries", "id", id, err)
|
||||||
http.Error(w, "Error updating plugin libraries: "+err.Error(), http.StatusInternalServerError)
|
http.Error(w, "Error updating plugin libraries: "+err.Error(), http.StatusInternalServerError)
|
||||||
return err
|
return err
|
||||||
|
|||||||
@@ -18,7 +18,7 @@ type MockPluginManager struct {
|
|||||||
// UpdatePluginUsersFn is called when UpdatePluginUsers is invoked. If nil, returns UsersError.
|
// UpdatePluginUsersFn is called when UpdatePluginUsers is invoked. If nil, returns UsersError.
|
||||||
UpdatePluginUsersFn func(ctx context.Context, id, usersJSON string, allUsers bool) error
|
UpdatePluginUsersFn func(ctx context.Context, id, usersJSON string, allUsers bool) error
|
||||||
// UpdatePluginLibrariesFn is called when UpdatePluginLibraries is invoked. If nil, returns LibrariesError.
|
// UpdatePluginLibrariesFn is called when UpdatePluginLibraries is invoked. If nil, returns LibrariesError.
|
||||||
UpdatePluginLibrariesFn func(ctx context.Context, id, librariesJSON string, allLibraries bool) error
|
UpdatePluginLibrariesFn func(ctx context.Context, id, librariesJSON string, allLibraries, allowWriteAccess bool) error
|
||||||
// RescanPluginsFn is called when RescanPlugins is invoked. If nil, returns RescanError.
|
// RescanPluginsFn is called when RescanPlugins is invoked. If nil, returns RescanError.
|
||||||
RescanPluginsFn func(ctx context.Context) error
|
RescanPluginsFn func(ctx context.Context) error
|
||||||
|
|
||||||
@@ -48,9 +48,10 @@ type MockPluginManager struct {
|
|||||||
AllUsers bool
|
AllUsers bool
|
||||||
}
|
}
|
||||||
UpdatePluginLibrariesCalls []struct {
|
UpdatePluginLibrariesCalls []struct {
|
||||||
ID string
|
ID string
|
||||||
LibrariesJSON string
|
LibrariesJSON string
|
||||||
AllLibraries bool
|
AllLibraries bool
|
||||||
|
AllowWriteAccess bool
|
||||||
}
|
}
|
||||||
RescanPluginsCalls int
|
RescanPluginsCalls int
|
||||||
}
|
}
|
||||||
@@ -105,14 +106,15 @@ func (m *MockPluginManager) UpdatePluginUsers(ctx context.Context, id, usersJSON
|
|||||||
return m.UsersError
|
return m.UsersError
|
||||||
}
|
}
|
||||||
|
|
||||||
func (m *MockPluginManager) UpdatePluginLibraries(ctx context.Context, id, librariesJSON string, allLibraries bool) error {
|
func (m *MockPluginManager) UpdatePluginLibraries(ctx context.Context, id, librariesJSON string, allLibraries, allowWriteAccess bool) error {
|
||||||
m.UpdatePluginLibrariesCalls = append(m.UpdatePluginLibrariesCalls, struct {
|
m.UpdatePluginLibrariesCalls = append(m.UpdatePluginLibrariesCalls, struct {
|
||||||
ID string
|
ID string
|
||||||
LibrariesJSON string
|
LibrariesJSON string
|
||||||
AllLibraries bool
|
AllLibraries bool
|
||||||
}{ID: id, LibrariesJSON: librariesJSON, AllLibraries: allLibraries})
|
AllowWriteAccess bool
|
||||||
|
}{ID: id, LibrariesJSON: librariesJSON, AllLibraries: allLibraries, AllowWriteAccess: allowWriteAccess})
|
||||||
if m.UpdatePluginLibrariesFn != nil {
|
if m.UpdatePluginLibrariesFn != nil {
|
||||||
return m.UpdatePluginLibrariesFn(ctx, id, librariesJSON, allLibraries)
|
return m.UpdatePluginLibrariesFn(ctx, id, librariesJSON, allLibraries, allowWriteAccess)
|
||||||
}
|
}
|
||||||
return m.LibrariesError
|
return m.LibrariesError
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -355,7 +355,8 @@
|
|||||||
"allUsers": "Allow all users",
|
"allUsers": "Allow all users",
|
||||||
"selectedUsers": "Selected users",
|
"selectedUsers": "Selected users",
|
||||||
"allLibraries": "Allow all libraries",
|
"allLibraries": "Allow all libraries",
|
||||||
"selectedLibraries": "Selected libraries"
|
"selectedLibraries": "Selected libraries",
|
||||||
|
"allowWriteAccess": "Allow write access"
|
||||||
},
|
},
|
||||||
"sections": {
|
"sections": {
|
||||||
"status": "Status",
|
"status": "Status",
|
||||||
@@ -400,6 +401,7 @@
|
|||||||
"allLibrariesHelp": "When enabled, the plugin will have access to all libraries, including those created in the future.",
|
"allLibrariesHelp": "When enabled, the plugin will have access to all libraries, including those created in the future.",
|
||||||
"noLibraries": "No libraries selected",
|
"noLibraries": "No libraries selected",
|
||||||
"librariesRequired": "This plugin requires access to library information. Select which libraries the plugin can access, or enable 'Allow all libraries'.",
|
"librariesRequired": "This plugin requires access to library information. Select which libraries the plugin can access, or enable 'Allow all libraries'.",
|
||||||
|
"allowWriteAccessHelp": "When enabled, the plugin can modify files in the library directories. By default, plugins have read-only access.",
|
||||||
"requiredHosts": "Required hosts"
|
"requiredHosts": "Required hosts"
|
||||||
},
|
},
|
||||||
"placeholders": {
|
"placeholders": {
|
||||||
|
|||||||
@@ -23,8 +23,10 @@ export const LibraryPermissionCard = ({
|
|||||||
classes,
|
classes,
|
||||||
selectedLibraries,
|
selectedLibraries,
|
||||||
allLibraries,
|
allLibraries,
|
||||||
|
allowWriteAccess,
|
||||||
onSelectedLibrariesChange,
|
onSelectedLibrariesChange,
|
||||||
onAllLibrariesChange,
|
onAllLibrariesChange,
|
||||||
|
onAllowWriteAccessChange,
|
||||||
}) => {
|
}) => {
|
||||||
const translate = useTranslate()
|
const translate = useTranslate()
|
||||||
|
|
||||||
@@ -58,9 +60,17 @@ export const LibraryPermissionCard = ({
|
|||||||
[onAllLibrariesChange],
|
[onAllLibrariesChange],
|
||||||
)
|
)
|
||||||
|
|
||||||
|
const handleAllowWriteAccessToggle = React.useCallback(
|
||||||
|
(event) => {
|
||||||
|
onAllowWriteAccessChange(event.target.checked)
|
||||||
|
},
|
||||||
|
[onAllowWriteAccessChange],
|
||||||
|
)
|
||||||
|
|
||||||
// Get permission reason from manifest
|
// Get permission reason from manifest
|
||||||
const libraryPermission = manifest?.permissions?.library
|
const libraryPermission = manifest?.permissions?.library
|
||||||
const reason = libraryPermission?.reason
|
const reason = libraryPermission?.reason
|
||||||
|
const hasFilesystem = libraryPermission?.filesystem === true
|
||||||
|
|
||||||
// Check if permission is required but not configured
|
// Check if permission is required but not configured
|
||||||
const isConfigurationRequired =
|
const isConfigurationRequired =
|
||||||
@@ -107,6 +117,24 @@ export const LibraryPermissionCard = ({
|
|||||||
</Typography>
|
</Typography>
|
||||||
</Box>
|
</Box>
|
||||||
|
|
||||||
|
{hasFilesystem && (
|
||||||
|
<Box mb={2}>
|
||||||
|
<FormControlLabel
|
||||||
|
control={
|
||||||
|
<Switch
|
||||||
|
checked={allowWriteAccess}
|
||||||
|
onChange={handleAllowWriteAccessToggle}
|
||||||
|
color="primary"
|
||||||
|
/>
|
||||||
|
}
|
||||||
|
label={translate('resources.plugin.fields.allowWriteAccess')}
|
||||||
|
/>
|
||||||
|
<Typography variant="body2" color="textSecondary">
|
||||||
|
{translate('resources.plugin.messages.allowWriteAccessHelp')}
|
||||||
|
</Typography>
|
||||||
|
</Box>
|
||||||
|
)}
|
||||||
|
|
||||||
{!allLibraries && (
|
{!allLibraries && (
|
||||||
<Box className={classes.usersList}>
|
<Box className={classes.usersList}>
|
||||||
<Typography variant="subtitle2" gutterBottom>
|
<Typography variant="subtitle2" gutterBottom>
|
||||||
@@ -166,6 +194,8 @@ LibraryPermissionCard.propTypes = {
|
|||||||
classes: PropTypes.object.isRequired,
|
classes: PropTypes.object.isRequired,
|
||||||
selectedLibraries: PropTypes.array.isRequired,
|
selectedLibraries: PropTypes.array.isRequired,
|
||||||
allLibraries: PropTypes.bool.isRequired,
|
allLibraries: PropTypes.bool.isRequired,
|
||||||
|
allowWriteAccess: PropTypes.bool.isRequired,
|
||||||
onSelectedLibrariesChange: PropTypes.func.isRequired,
|
onSelectedLibrariesChange: PropTypes.func.isRequired,
|
||||||
onAllLibrariesChange: PropTypes.func.isRequired,
|
onAllLibrariesChange: PropTypes.func.isRequired,
|
||||||
|
onAllowWriteAccessChange: PropTypes.func.isRequired,
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -48,8 +48,11 @@ const PluginShowLayout = () => {
|
|||||||
// Libraries permission state
|
// Libraries permission state
|
||||||
const [selectedLibraries, setSelectedLibraries] = useState([])
|
const [selectedLibraries, setSelectedLibraries] = useState([])
|
||||||
const [allLibraries, setAllLibraries] = useState(false)
|
const [allLibraries, setAllLibraries] = useState(false)
|
||||||
|
const [allowWriteAccess, setAllowWriteAccess] = useState(false)
|
||||||
const [lastRecordLibraries, setLastRecordLibraries] = useState(null)
|
const [lastRecordLibraries, setLastRecordLibraries] = useState(null)
|
||||||
const [lastRecordAllLibraries, setLastRecordAllLibraries] = useState(null)
|
const [lastRecordAllLibraries, setLastRecordAllLibraries] = useState(null)
|
||||||
|
const [lastRecordAllowWriteAccess, setLastRecordAllowWriteAccess] =
|
||||||
|
useState(null)
|
||||||
|
|
||||||
// Parse JSON config to object
|
// Parse JSON config to object
|
||||||
const jsonToObject = useCallback((jsonString) => {
|
const jsonToObject = useCallback((jsonString) => {
|
||||||
@@ -99,10 +102,12 @@ const PluginShowLayout = () => {
|
|||||||
if (record && !isDirty) {
|
if (record && !isDirty) {
|
||||||
const recordLibraries = record.libraries || ''
|
const recordLibraries = record.libraries || ''
|
||||||
const recordAllLibraries = record.allLibraries || false
|
const recordAllLibraries = record.allLibraries || false
|
||||||
|
const recordAllowWriteAccess = record.allowWriteAccess || false
|
||||||
|
|
||||||
if (
|
if (
|
||||||
recordLibraries !== lastRecordLibraries ||
|
recordLibraries !== lastRecordLibraries ||
|
||||||
recordAllLibraries !== lastRecordAllLibraries
|
recordAllLibraries !== lastRecordAllLibraries ||
|
||||||
|
recordAllowWriteAccess !== lastRecordAllowWriteAccess
|
||||||
) {
|
) {
|
||||||
try {
|
try {
|
||||||
setSelectedLibraries(
|
setSelectedLibraries(
|
||||||
@@ -112,11 +117,19 @@ const PluginShowLayout = () => {
|
|||||||
setSelectedLibraries([])
|
setSelectedLibraries([])
|
||||||
}
|
}
|
||||||
setAllLibraries(recordAllLibraries)
|
setAllLibraries(recordAllLibraries)
|
||||||
|
setAllowWriteAccess(recordAllowWriteAccess)
|
||||||
setLastRecordLibraries(recordLibraries)
|
setLastRecordLibraries(recordLibraries)
|
||||||
setLastRecordAllLibraries(recordAllLibraries)
|
setLastRecordAllLibraries(recordAllLibraries)
|
||||||
|
setLastRecordAllowWriteAccess(recordAllowWriteAccess)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}, [record, lastRecordLibraries, lastRecordAllLibraries, isDirty])
|
}, [
|
||||||
|
record,
|
||||||
|
lastRecordLibraries,
|
||||||
|
lastRecordAllLibraries,
|
||||||
|
lastRecordAllowWriteAccess,
|
||||||
|
isDirty,
|
||||||
|
])
|
||||||
|
|
||||||
const handleConfigDataChange = useCallback(
|
const handleConfigDataChange = useCallback(
|
||||||
(newData, errors) => {
|
(newData, errors) => {
|
||||||
@@ -152,6 +165,11 @@ const PluginShowLayout = () => {
|
|||||||
setIsDirty(true)
|
setIsDirty(true)
|
||||||
}, [])
|
}, [])
|
||||||
|
|
||||||
|
const handleAllowWriteAccessChange = useCallback((newAllowWriteAccess) => {
|
||||||
|
setAllowWriteAccess(newAllowWriteAccess)
|
||||||
|
setIsDirty(true)
|
||||||
|
}, [])
|
||||||
|
|
||||||
const [updatePlugin, { loading }] = useUpdate(
|
const [updatePlugin, { loading }] = useUpdate(
|
||||||
'plugin',
|
'plugin',
|
||||||
record?.id,
|
record?.id,
|
||||||
@@ -167,6 +185,7 @@ const PluginShowLayout = () => {
|
|||||||
setLastRecordAllUsers(null)
|
setLastRecordAllUsers(null)
|
||||||
setLastRecordLibraries(null)
|
setLastRecordLibraries(null)
|
||||||
setLastRecordAllLibraries(null)
|
setLastRecordAllLibraries(null)
|
||||||
|
setLastRecordAllowWriteAccess(null)
|
||||||
notify('resources.plugin.notifications.updated', 'info')
|
notify('resources.plugin.notifications.updated', 'info')
|
||||||
},
|
},
|
||||||
onFailure: (err) => {
|
onFailure: (err) => {
|
||||||
@@ -199,6 +218,7 @@ const PluginShowLayout = () => {
|
|||||||
if (parsedManifest?.permissions?.library) {
|
if (parsedManifest?.permissions?.library) {
|
||||||
data.libraries = JSON.stringify(selectedLibraries)
|
data.libraries = JSON.stringify(selectedLibraries)
|
||||||
data.allLibraries = allLibraries
|
data.allLibraries = allLibraries
|
||||||
|
data.allowWriteAccess = allowWriteAccess
|
||||||
}
|
}
|
||||||
|
|
||||||
updatePlugin('plugin', record.id, data, record)
|
updatePlugin('plugin', record.id, data, record)
|
||||||
@@ -210,6 +230,7 @@ const PluginShowLayout = () => {
|
|||||||
allUsers,
|
allUsers,
|
||||||
selectedLibraries,
|
selectedLibraries,
|
||||||
allLibraries,
|
allLibraries,
|
||||||
|
allowWriteAccess,
|
||||||
])
|
])
|
||||||
|
|
||||||
// Parse manifest
|
// Parse manifest
|
||||||
@@ -294,8 +315,10 @@ const PluginShowLayout = () => {
|
|||||||
classes={classes}
|
classes={classes}
|
||||||
selectedLibraries={selectedLibraries}
|
selectedLibraries={selectedLibraries}
|
||||||
allLibraries={allLibraries}
|
allLibraries={allLibraries}
|
||||||
|
allowWriteAccess={allowWriteAccess}
|
||||||
onSelectedLibrariesChange={handleSelectedLibrariesChange}
|
onSelectedLibrariesChange={handleSelectedLibrariesChange}
|
||||||
onAllLibrariesChange={handleAllLibrariesChange}
|
onAllLibrariesChange={handleAllLibrariesChange}
|
||||||
|
onAllowWriteAccessChange={handleAllowWriteAccessChange}
|
||||||
/>
|
/>
|
||||||
|
|
||||||
<Box display="flex" justifyContent="flex-end">
|
<Box display="flex" justifyContent="flex-end">
|
||||||
|
|||||||
Reference in New Issue
Block a user